glasswall.multiprocessing.task_watcher
1import time 2from multiprocessing import Process, Queue 3from typing import Optional 4 5from glasswall.multiprocessing.memory_usage import get_total_memory_usage_in_gib 6from glasswall.multiprocessing.tasks import Task, TaskResult, execute_task_and_put_in_queue 7 8 9class TaskWatcher: 10 process: Process 11 start_time: float 12 end_time: float 13 elapsed_time: float 14 15 def __init__( 16 self, 17 task: Task, 18 task_results_queue: "Queue[TaskResult]", 19 timeout_seconds: Optional[float] = None, 20 memory_limit_in_gib: Optional[float] = None, 21 sleep_time: float = 0.001, 22 memory_limit_polling_rate: float = 0.1, 23 auto_start: bool = True, 24 ): 25 self.task = task 26 self.task_results_queue = task_results_queue 27 self.timeout_seconds = timeout_seconds 28 self.memory_limit_in_gib = memory_limit_in_gib 29 self.sleep_time = sleep_time 30 self.memory_limit_polling_rate = memory_limit_polling_rate 31 self.auto_start = auto_start 32 33 self.watcher_queue: "Queue[TaskResult]" = Queue() 34 self.watcher_results = [] 35 36 self.exception = None 37 self.timed_out: bool = False 38 self.out_of_memory: bool = False 39 self.max_memory_used_in_gib: float = 0 40 41 if self.auto_start: 42 self.start_task() 43 self.watch_task() 44 self.update_queue() 45 46 def start_task(self) -> None: 47 self.process = Process( 48 target=execute_task_and_put_in_queue, 49 args=(self.task, self.watcher_queue,) 50 ) 51 self.process.start() 52 self.start_time = time.time() 53 54 def terminate_task(self) -> None: 55 self.process.terminate() 56 57 def terminate_task_with_timeout(self) -> None: 58 self.terminate_task() 59 self.timed_out = True 60 self.exception = TimeoutError() 61 62 def terminate_task_with_out_of_memory(self) -> None: 63 self.terminate_task() 64 self.out_of_memory = True 65 self.exception = MemoryError() 66 67 def clean_watcher_queue(self): 68 while not self.watcher_queue.empty(): 69 self.watcher_results.append(self.watcher_queue.get()) 70 71 def watch_task(self) -> None: 72 last_memory_limit_check = time.time() 73 while self.process.is_alive(): 74 self.clean_watcher_queue() 75 76 now = time.time() 77 78 # Monitor for timeout exceeded 79 if self.timeout_seconds: 80 if now - self.start_time > self.timeout_seconds: 81 self.terminate_task_with_timeout() 82 break 83 84 # Monitor for memory limit exceeded 85 if self.memory_limit_in_gib: 86 if now - last_memory_limit_check > self.memory_limit_polling_rate: 87 last_memory_limit_check = now 88 memory_usage_in_gib = get_total_memory_usage_in_gib(self.process.pid) 89 if memory_usage_in_gib > self.max_memory_used_in_gib: 90 self.max_memory_used_in_gib = memory_usage_in_gib 91 if memory_usage_in_gib > self.memory_limit_in_gib: 92 self.terminate_task_with_out_of_memory() 93 break 94 95 if self.sleep_time: 96 time.sleep(self.sleep_time) 97 98 self.clean_watcher_queue() 99 self.end_time = time.time() 100 self.elapsed_time = self.end_time - self.start_time 101 102 def update_queue(self) -> None: 103 if self.exception or not self.watcher_results: 104 # TimeoutError, MemoryError, or process was killed (SIGABRT etc) 105 task_result = TaskResult( 106 self.task, 107 success=False, 108 exception=self.exception, 109 ) 110 else: 111 task_result = self.watcher_results[0] 112 113 task_result.exit_code = self.process.exitcode 114 task_result.task = self.task 115 task_result.timeout_seconds = self.timeout_seconds 116 task_result.memory_limit_in_gib = self.memory_limit_in_gib 117 118 task_result.start_time = self.start_time 119 task_result.end_time = self.end_time 120 task_result.elapsed_time = self.elapsed_time 121 122 if self.timeout_seconds: 123 task_result.timed_out = self.timed_out 124 125 if self.memory_limit_in_gib: 126 task_result.max_memory_used_in_gib = self.max_memory_used_in_gib 127 task_result.out_of_memory = self.out_of_memory 128 129 self.task_results_queue.put(task_result)
class
TaskWatcher:
12class TaskWatcher: 13 process: Process 14 start_time: float 15 end_time: float 16 elapsed_time: float 17 18 def __init__( 19 self, 20 task: Task, 21 task_results_queue: "Queue[TaskResult]", 22 timeout_seconds: Optional[float] = None, 23 memory_limit_in_gib: Optional[float] = None, 24 sleep_time: float = 0.001, 25 memory_limit_polling_rate: float = 0.1, 26 auto_start: bool = True, 27 ): 28 self.task = task 29 self.task_results_queue = task_results_queue 30 self.timeout_seconds = timeout_seconds 31 self.memory_limit_in_gib = memory_limit_in_gib 32 self.sleep_time = sleep_time 33 self.memory_limit_polling_rate = memory_limit_polling_rate 34 self.auto_start = auto_start 35 36 self.watcher_queue: "Queue[TaskResult]" = Queue() 37 self.watcher_results = [] 38 39 self.exception = None 40 self.timed_out: bool = False 41 self.out_of_memory: bool = False 42 self.max_memory_used_in_gib: float = 0 43 44 if self.auto_start: 45 self.start_task() 46 self.watch_task() 47 self.update_queue() 48 49 def start_task(self) -> None: 50 self.process = Process( 51 target=execute_task_and_put_in_queue, 52 args=(self.task, self.watcher_queue,) 53 ) 54 self.process.start() 55 self.start_time = time.time() 56 57 def terminate_task(self) -> None: 58 self.process.terminate() 59 60 def terminate_task_with_timeout(self) -> None: 61 self.terminate_task() 62 self.timed_out = True 63 self.exception = TimeoutError() 64 65 def terminate_task_with_out_of_memory(self) -> None: 66 self.terminate_task() 67 self.out_of_memory = True 68 self.exception = MemoryError() 69 70 def clean_watcher_queue(self): 71 while not self.watcher_queue.empty(): 72 self.watcher_results.append(self.watcher_queue.get()) 73 74 def watch_task(self) -> None: 75 last_memory_limit_check = time.time() 76 while self.process.is_alive(): 77 self.clean_watcher_queue() 78 79 now = time.time() 80 81 # Monitor for timeout exceeded 82 if self.timeout_seconds: 83 if now - self.start_time > self.timeout_seconds: 84 self.terminate_task_with_timeout() 85 break 86 87 # Monitor for memory limit exceeded 88 if self.memory_limit_in_gib: 89 if now - last_memory_limit_check > self.memory_limit_polling_rate: 90 last_memory_limit_check = now 91 memory_usage_in_gib = get_total_memory_usage_in_gib(self.process.pid) 92 if memory_usage_in_gib > self.max_memory_used_in_gib: 93 self.max_memory_used_in_gib = memory_usage_in_gib 94 if memory_usage_in_gib > self.memory_limit_in_gib: 95 self.terminate_task_with_out_of_memory() 96 break 97 98 if self.sleep_time: 99 time.sleep(self.sleep_time) 100 101 self.clean_watcher_queue() 102 self.end_time = time.time() 103 self.elapsed_time = self.end_time - self.start_time 104 105 def update_queue(self) -> None: 106 if self.exception or not self.watcher_results: 107 # TimeoutError, MemoryError, or process was killed (SIGABRT etc) 108 task_result = TaskResult( 109 self.task, 110 success=False, 111 exception=self.exception, 112 ) 113 else: 114 task_result = self.watcher_results[0] 115 116 task_result.exit_code = self.process.exitcode 117 task_result.task = self.task 118 task_result.timeout_seconds = self.timeout_seconds 119 task_result.memory_limit_in_gib = self.memory_limit_in_gib 120 121 task_result.start_time = self.start_time 122 task_result.end_time = self.end_time 123 task_result.elapsed_time = self.elapsed_time 124 125 if self.timeout_seconds: 126 task_result.timed_out = self.timed_out 127 128 if self.memory_limit_in_gib: 129 task_result.max_memory_used_in_gib = self.max_memory_used_in_gib 130 task_result.out_of_memory = self.out_of_memory 131 132 self.task_results_queue.put(task_result)
TaskWatcher( task: glasswall.multiprocessing.tasks.Task, task_results_queue: 'Queue[TaskResult]', timeout_seconds: Optional[float] = None, memory_limit_in_gib: Optional[float] = None, sleep_time: float = 0.001, memory_limit_polling_rate: float = 0.1, auto_start: bool = True)
18 def __init__( 19 self, 20 task: Task, 21 task_results_queue: "Queue[TaskResult]", 22 timeout_seconds: Optional[float] = None, 23 memory_limit_in_gib: Optional[float] = None, 24 sleep_time: float = 0.001, 25 memory_limit_polling_rate: float = 0.1, 26 auto_start: bool = True, 27 ): 28 self.task = task 29 self.task_results_queue = task_results_queue 30 self.timeout_seconds = timeout_seconds 31 self.memory_limit_in_gib = memory_limit_in_gib 32 self.sleep_time = sleep_time 33 self.memory_limit_polling_rate = memory_limit_polling_rate 34 self.auto_start = auto_start 35 36 self.watcher_queue: "Queue[TaskResult]" = Queue() 37 self.watcher_results = [] 38 39 self.exception = None 40 self.timed_out: bool = False 41 self.out_of_memory: bool = False 42 self.max_memory_used_in_gib: float = 0 43 44 if self.auto_start: 45 self.start_task() 46 self.watch_task() 47 self.update_queue()
def
watch_task(self) -> None:
74 def watch_task(self) -> None: 75 last_memory_limit_check = time.time() 76 while self.process.is_alive(): 77 self.clean_watcher_queue() 78 79 now = time.time() 80 81 # Monitor for timeout exceeded 82 if self.timeout_seconds: 83 if now - self.start_time > self.timeout_seconds: 84 self.terminate_task_with_timeout() 85 break 86 87 # Monitor for memory limit exceeded 88 if self.memory_limit_in_gib: 89 if now - last_memory_limit_check > self.memory_limit_polling_rate: 90 last_memory_limit_check = now 91 memory_usage_in_gib = get_total_memory_usage_in_gib(self.process.pid) 92 if memory_usage_in_gib > self.max_memory_used_in_gib: 93 self.max_memory_used_in_gib = memory_usage_in_gib 94 if memory_usage_in_gib > self.memory_limit_in_gib: 95 self.terminate_task_with_out_of_memory() 96 break 97 98 if self.sleep_time: 99 time.sleep(self.sleep_time) 100 101 self.clean_watcher_queue() 102 self.end_time = time.time() 103 self.elapsed_time = self.end_time - self.start_time
def
update_queue(self) -> None:
105 def update_queue(self) -> None: 106 if self.exception or not self.watcher_results: 107 # TimeoutError, MemoryError, or process was killed (SIGABRT etc) 108 task_result = TaskResult( 109 self.task, 110 success=False, 111 exception=self.exception, 112 ) 113 else: 114 task_result = self.watcher_results[0] 115 116 task_result.exit_code = self.process.exitcode 117 task_result.task = self.task 118 task_result.timeout_seconds = self.timeout_seconds 119 task_result.memory_limit_in_gib = self.memory_limit_in_gib 120 121 task_result.start_time = self.start_time 122 task_result.end_time = self.end_time 123 task_result.elapsed_time = self.elapsed_time 124 125 if self.timeout_seconds: 126 task_result.timed_out = self.timed_out 127 128 if self.memory_limit_in_gib: 129 task_result.max_memory_used_in_gib = self.max_memory_used_in_gib 130 task_result.out_of_memory = self.out_of_memory 131 132 self.task_results_queue.put(task_result)