glasswall.multiprocessing.tasks
1from multiprocessing import Queue 2from typing import Any, Callable, Optional, Union 3 4import glasswall 5 6 7class Task: 8 def __init__( 9 self, 10 func: Callable, 11 args: Optional[tuple] = None, 12 kwargs: Optional[dict] = None, 13 ): 14 self.func = func 15 self.args = args or tuple() 16 self.kwargs = kwargs or dict() 17 18 # Convert Policy objects to text (has attributes that are modules, and modules cannot be pickled) 19 # args 20 processed_args = [] 21 for arg in self.args: 22 if isinstance(arg, glasswall.content_management.policies.Policy): 23 arg = arg.text 24 processed_args.append(arg) 25 self.args = tuple(processed_args) 26 # kwargs 27 for key, value in self.kwargs.items(): 28 if isinstance(value, glasswall.content_management.policies.Policy): 29 self.kwargs[key] = value.text 30 31 def __eq__(self, other): 32 if isinstance(other, Task): 33 return (self.func, self.args, self.kwargs) == (other.func, other.args, other.kwargs) 34 return False 35 36 def __hash__(self): 37 kwargs_tuple = tuple(sorted(self.kwargs.items())) 38 return hash((self.func, self.args, kwargs_tuple)) 39 40 def __repr__(self): 41 max_length = 100 42 args_str = ", ".join(repr(arg)[:max_length] + ('...' if len(repr(arg)) > max_length else '') for arg in self.args) 43 kwargs_str = ", ".join(f"{key}={repr(value)[:max_length] + ('...' if len(repr(value)) > max_length else '')}" for key, value in self.kwargs.items()) 44 return f"{self.__class__.__name__}(func={self.func.__name__}, args=({args_str}), kwargs=({kwargs_str}))" 45 46 47class TaskResult: 48 timeout_seconds: Optional[float] 49 memory_limit_in_gib: Optional[float] 50 start_time: float 51 end_time: float 52 elapsed_time: float 53 out_of_memory: bool 54 timed_out: bool 55 max_memory_used_in_gib: float 56 exit_code: Union[int, None] 57 58 def __init__( 59 self, 60 task: Task, 61 success: bool, 62 result: Any = None, 63 exception: Union[Exception, None] = None, 64 ): 65 self.task = task 66 self.success = success 67 self.result = result 68 self.exception = exception 69 70 def __eq__(self, other): 71 if isinstance(other, TaskResult): 72 return (self.task, self.success, self.result, self.exception) == \ 73 (other.task, other.success, other.result, other.exception) 74 return False 75 76 def __hash__(self): 77 return hash((self.task, self.success, self.result, self.exception)) 78 79 def __repr__(self): 80 max_length = 100 81 attributes_str = ', '.join(f"{k}={v!r}"[:max_length] + ('...' if len(repr(v)) > max_length else '') for k, v in self.__dict__.items()) 82 return f"{self.__class__.__name__}({attributes_str})" 83 84 85def execute_task_and_put_in_queue(task: Task, queue: "Queue[TaskResult]") -> None: 86 try: 87 func_result = task.func(*task.args, **task.kwargs) 88 task_result = TaskResult(task=task, success=True, result=func_result) 89 except Exception as e: 90 task_result = TaskResult(task=task, success=False, exception=e) 91 92 queue.put(task_result)
class
Task:
10class Task: 11 def __init__( 12 self, 13 func: Callable, 14 args: Optional[tuple] = None, 15 kwargs: Optional[dict] = None, 16 ): 17 self.func = func 18 self.args = args or tuple() 19 self.kwargs = kwargs or dict() 20 21 # Convert Policy objects to text (has attributes that are modules, and modules cannot be pickled) 22 # args 23 processed_args = [] 24 for arg in self.args: 25 if isinstance(arg, glasswall.content_management.policies.Policy): 26 arg = arg.text 27 processed_args.append(arg) 28 self.args = tuple(processed_args) 29 # kwargs 30 for key, value in self.kwargs.items(): 31 if isinstance(value, glasswall.content_management.policies.Policy): 32 self.kwargs[key] = value.text 33 34 def __eq__(self, other): 35 if isinstance(other, Task): 36 return (self.func, self.args, self.kwargs) == (other.func, other.args, other.kwargs) 37 return False 38 39 def __hash__(self): 40 kwargs_tuple = tuple(sorted(self.kwargs.items())) 41 return hash((self.func, self.args, kwargs_tuple)) 42 43 def __repr__(self): 44 max_length = 100 45 args_str = ", ".join(repr(arg)[:max_length] + ('...' if len(repr(arg)) > max_length else '') for arg in self.args) 46 kwargs_str = ", ".join(f"{key}={repr(value)[:max_length] + ('...' if len(repr(value)) > max_length else '')}" for key, value in self.kwargs.items()) 47 return f"{self.__class__.__name__}(func={self.func.__name__}, args=({args_str}), kwargs=({kwargs_str}))"
Task( func: Callable, args: Optional[tuple] = None, kwargs: Optional[dict] = None)
11 def __init__( 12 self, 13 func: Callable, 14 args: Optional[tuple] = None, 15 kwargs: Optional[dict] = None, 16 ): 17 self.func = func 18 self.args = args or tuple() 19 self.kwargs = kwargs or dict() 20 21 # Convert Policy objects to text (has attributes that are modules, and modules cannot be pickled) 22 # args 23 processed_args = [] 24 for arg in self.args: 25 if isinstance(arg, glasswall.content_management.policies.Policy): 26 arg = arg.text 27 processed_args.append(arg) 28 self.args = tuple(processed_args) 29 # kwargs 30 for key, value in self.kwargs.items(): 31 if isinstance(value, glasswall.content_management.policies.Policy): 32 self.kwargs[key] = value.text
class
TaskResult:
50class TaskResult: 51 timeout_seconds: Optional[float] 52 memory_limit_in_gib: Optional[float] 53 start_time: float 54 end_time: float 55 elapsed_time: float 56 out_of_memory: bool 57 timed_out: bool 58 max_memory_used_in_gib: float 59 exit_code: Union[int, None] 60 61 def __init__( 62 self, 63 task: Task, 64 success: bool, 65 result: Any = None, 66 exception: Union[Exception, None] = None, 67 ): 68 self.task = task 69 self.success = success 70 self.result = result 71 self.exception = exception 72 73 def __eq__(self, other): 74 if isinstance(other, TaskResult): 75 return (self.task, self.success, self.result, self.exception) == \ 76 (other.task, other.success, other.result, other.exception) 77 return False 78 79 def __hash__(self): 80 return hash((self.task, self.success, self.result, self.exception)) 81 82 def __repr__(self): 83 max_length = 100 84 attributes_str = ', '.join(f"{k}={v!r}"[:max_length] + ('...' if len(repr(v)) > max_length else '') for k, v in self.__dict__.items()) 85 return f"{self.__class__.__name__}({attributes_str})"
TaskResult( task: Task, success: bool, result: Any = None, exception: Optional[Exception] = None)
88def execute_task_and_put_in_queue(task: Task, queue: "Queue[TaskResult]") -> None: 89 try: 90 func_result = task.func(*task.args, **task.kwargs) 91 task_result = TaskResult(task=task, success=True, result=func_result) 92 except Exception as e: 93 task_result = TaskResult(task=task, success=False, exception=e) 94 95 queue.put(task_result)