glasswall.multiprocessing.tasks

 1from multiprocessing import Queue
 2from typing import Any, Callable, Optional, Union
 3
 4import glasswall
 5
 6
 7class Task:
 8    def __init__(
 9        self,
10        func: Callable,
11        args: Optional[tuple] = None,
12        kwargs: Optional[dict] = None,
13    ):
14        self.func = func
15        self.args = args or tuple()
16        self.kwargs = kwargs or dict()
17
18        # Convert Policy objects to text (has attributes that are modules, and modules cannot be pickled)
19        # args
20        processed_args = []
21        for arg in self.args:
22            if isinstance(arg, glasswall.content_management.policies.Policy):
23                arg = arg.text
24            processed_args.append(arg)
25        self.args = tuple(processed_args)
26        # kwargs
27        for key, value in self.kwargs.items():
28            if isinstance(value, glasswall.content_management.policies.Policy):
29                self.kwargs[key] = value.text
30
31    def __eq__(self, other):
32        if isinstance(other, Task):
33            return (self.func, self.args, self.kwargs) == (other.func, other.args, other.kwargs)
34        return False
35
36    def __hash__(self):
37        kwargs_tuple = tuple(sorted(self.kwargs.items()))
38        return hash((self.func, self.args, kwargs_tuple))
39
40    def __repr__(self):
41        max_length = 100
42        args_str = ", ".join(repr(arg)[:max_length] + ('...' if len(repr(arg)) > max_length else '') for arg in self.args)
43        kwargs_str = ", ".join(f"{key}={repr(value)[:max_length] + ('...' if len(repr(value)) > max_length else '')}" for key, value in self.kwargs.items())
44        return f"{self.__class__.__name__}(func={self.func.__name__}, args=({args_str}), kwargs=({kwargs_str}))"
45
46
47class TaskResult:
48    timeout_seconds: Optional[float]
49    memory_limit_in_gib: Optional[float]
50    start_time: float
51    end_time: float
52    elapsed_time: float
53    out_of_memory: bool
54    timed_out: bool
55    max_memory_used_in_gib: float
56    exit_code: Union[int, None]
57
58    def __init__(
59        self,
60        task: Task,
61        success: bool,
62        result: Any = None,
63        exception: Union[Exception, None] = None,
64    ):
65        self.task = task
66        self.success = success
67        self.result = result
68        self.exception = exception
69
70    def __eq__(self, other):
71        if isinstance(other, TaskResult):
72            return (self.task, self.success, self.result, self.exception) == \
73                (other.task, other.success, other.result, other.exception)
74        return False
75
76    def __hash__(self):
77        return hash((self.task, self.success, self.result, self.exception))
78
79    def __repr__(self):
80        max_length = 100
81        attributes_str = ', '.join(f"{k}={v!r}"[:max_length] + ('...' if len(repr(v)) > max_length else '') for k, v in self.__dict__.items())
82        return f"{self.__class__.__name__}({attributes_str})"
83
84
85def execute_task_and_put_in_queue(task: Task, queue: "Queue[TaskResult]") -> None:
86    try:
87        func_result = task.func(*task.args, **task.kwargs)
88        task_result = TaskResult(task=task, success=True, result=func_result)
89    except Exception as e:
90        task_result = TaskResult(task=task, success=False, exception=e)
91
92    queue.put(task_result)
class Task:
10class Task:
11    def __init__(
12        self,
13        func: Callable,
14        args: Optional[tuple] = None,
15        kwargs: Optional[dict] = None,
16    ):
17        self.func = func
18        self.args = args or tuple()
19        self.kwargs = kwargs or dict()
20
21        # Convert Policy objects to text (has attributes that are modules, and modules cannot be pickled)
22        # args
23        processed_args = []
24        for arg in self.args:
25            if isinstance(arg, glasswall.content_management.policies.Policy):
26                arg = arg.text
27            processed_args.append(arg)
28        self.args = tuple(processed_args)
29        # kwargs
30        for key, value in self.kwargs.items():
31            if isinstance(value, glasswall.content_management.policies.Policy):
32                self.kwargs[key] = value.text
33
34    def __eq__(self, other):
35        if isinstance(other, Task):
36            return (self.func, self.args, self.kwargs) == (other.func, other.args, other.kwargs)
37        return False
38
39    def __hash__(self):
40        kwargs_tuple = tuple(sorted(self.kwargs.items()))
41        return hash((self.func, self.args, kwargs_tuple))
42
43    def __repr__(self):
44        max_length = 100
45        args_str = ", ".join(repr(arg)[:max_length] + ('...' if len(repr(arg)) > max_length else '') for arg in self.args)
46        kwargs_str = ", ".join(f"{key}={repr(value)[:max_length] + ('...' if len(repr(value)) > max_length else '')}" for key, value in self.kwargs.items())
47        return f"{self.__class__.__name__}(func={self.func.__name__}, args=({args_str}), kwargs=({kwargs_str}))"
Task( func: Callable, args: Optional[tuple] = None, kwargs: Optional[dict] = None)
11    def __init__(
12        self,
13        func: Callable,
14        args: Optional[tuple] = None,
15        kwargs: Optional[dict] = None,
16    ):
17        self.func = func
18        self.args = args or tuple()
19        self.kwargs = kwargs or dict()
20
21        # Convert Policy objects to text (has attributes that are modules, and modules cannot be pickled)
22        # args
23        processed_args = []
24        for arg in self.args:
25            if isinstance(arg, glasswall.content_management.policies.Policy):
26                arg = arg.text
27            processed_args.append(arg)
28        self.args = tuple(processed_args)
29        # kwargs
30        for key, value in self.kwargs.items():
31            if isinstance(value, glasswall.content_management.policies.Policy):
32                self.kwargs[key] = value.text
func
args
kwargs
class TaskResult:
50class TaskResult:
51    timeout_seconds: Optional[float]
52    memory_limit_in_gib: Optional[float]
53    start_time: float
54    end_time: float
55    elapsed_time: float
56    out_of_memory: bool
57    timed_out: bool
58    max_memory_used_in_gib: float
59    exit_code: Union[int, None]
60
61    def __init__(
62        self,
63        task: Task,
64        success: bool,
65        result: Any = None,
66        exception: Union[Exception, None] = None,
67    ):
68        self.task = task
69        self.success = success
70        self.result = result
71        self.exception = exception
72
73    def __eq__(self, other):
74        if isinstance(other, TaskResult):
75            return (self.task, self.success, self.result, self.exception) == \
76                (other.task, other.success, other.result, other.exception)
77        return False
78
79    def __hash__(self):
80        return hash((self.task, self.success, self.result, self.exception))
81
82    def __repr__(self):
83        max_length = 100
84        attributes_str = ', '.join(f"{k}={v!r}"[:max_length] + ('...' if len(repr(v)) > max_length else '') for k, v in self.__dict__.items())
85        return f"{self.__class__.__name__}({attributes_str})"
TaskResult( task: Task, success: bool, result: Any = None, exception: Optional[Exception] = None)
61    def __init__(
62        self,
63        task: Task,
64        success: bool,
65        result: Any = None,
66        exception: Union[Exception, None] = None,
67    ):
68        self.task = task
69        self.success = success
70        self.result = result
71        self.exception = exception
timeout_seconds: Optional[float]
memory_limit_in_gib: Optional[float]
start_time: float
end_time: float
elapsed_time: float
out_of_memory: bool
timed_out: bool
max_memory_used_in_gib: float
exit_code: Optional[int]
task
success
result
exception
def execute_task_and_put_in_queue( task: Task, queue: 'Queue[TaskResult]') -> None:
88def execute_task_and_put_in_queue(task: Task, queue: "Queue[TaskResult]") -> None:
89    try:
90        func_result = task.func(*task.args, **task.kwargs)
91        task_result = TaskResult(task=task, success=True, result=func_result)
92    except Exception as e:
93        task_result = TaskResult(task=task, success=False, exception=e)
94
95    queue.put(task_result)