glasswall.multiprocessing.task_watcher

  1import time
  2from multiprocessing import Process, Queue
  3from typing import Optional
  4
  5from glasswall.multiprocessing.memory_usage import get_total_memory_usage_in_gib
  6from glasswall.multiprocessing.tasks import Task, TaskResult, execute_task_and_put_in_queue
  7
  8
  9class TaskWatcher:
 10    process: Process
 11    start_time: float
 12    end_time: float
 13    elapsed_time: float
 14
 15    def __init__(
 16        self,
 17        task: Task,
 18        task_results_queue: "Queue[TaskResult]",
 19        timeout_seconds: Optional[float] = None,
 20        memory_limit_in_gib: Optional[float] = None,
 21        sleep_time: float = 0.001,
 22        memory_limit_polling_rate: float = 0.1,
 23        auto_start: bool = True,
 24    ):
 25        self.task = task
 26        self.task_results_queue = task_results_queue
 27        self.timeout_seconds = timeout_seconds
 28        self.memory_limit_in_gib = memory_limit_in_gib
 29        self.sleep_time = sleep_time
 30        self.memory_limit_polling_rate = memory_limit_polling_rate
 31        self.auto_start = auto_start
 32
 33        self.watcher_queue: "Queue[TaskResult]" = Queue()
 34        self.watcher_results = []
 35
 36        self.exception = None
 37        self.timed_out: bool = False
 38        self.out_of_memory: bool = False
 39        self.max_memory_used_in_gib: float = 0
 40
 41        if self.auto_start:
 42            self.start_task()
 43            self.watch_task()
 44            self.update_queue()
 45
 46    def start_task(self) -> None:
 47        self.process = Process(
 48            target=execute_task_and_put_in_queue,
 49            args=(self.task, self.watcher_queue,)
 50        )
 51        self.process.start()
 52        self.start_time = time.time()
 53
 54    def terminate_task(self) -> None:
 55        self.process.terminate()
 56
 57    def terminate_task_with_timeout(self) -> None:
 58        self.terminate_task()
 59        self.timed_out = True
 60        self.exception = TimeoutError()
 61
 62    def terminate_task_with_out_of_memory(self) -> None:
 63        self.terminate_task()
 64        self.out_of_memory = True
 65        self.exception = MemoryError()
 66
 67    def clean_watcher_queue(self):
 68        while not self.watcher_queue.empty():
 69            self.watcher_results.append(self.watcher_queue.get())
 70
 71    def watch_task(self) -> None:
 72        last_memory_limit_check = time.time()
 73        while self.process.is_alive():
 74            self.clean_watcher_queue()
 75
 76            now = time.time()
 77
 78            # Monitor for timeout exceeded
 79            if self.timeout_seconds:
 80                if now - self.start_time > self.timeout_seconds:
 81                    self.terminate_task_with_timeout()
 82                    break
 83
 84            # Monitor for memory limit exceeded
 85            if self.memory_limit_in_gib:
 86                if now - last_memory_limit_check > self.memory_limit_polling_rate:
 87                    last_memory_limit_check = now
 88                    memory_usage_in_gib = get_total_memory_usage_in_gib(self.process.pid)
 89                    if memory_usage_in_gib > self.max_memory_used_in_gib:
 90                        self.max_memory_used_in_gib = memory_usage_in_gib
 91                    if memory_usage_in_gib > self.memory_limit_in_gib:
 92                        self.terminate_task_with_out_of_memory()
 93                        break
 94
 95            if self.sleep_time:
 96                time.sleep(self.sleep_time)
 97
 98        self.clean_watcher_queue()
 99        self.end_time = time.time()
100        self.elapsed_time = self.end_time - self.start_time
101
102    def update_queue(self) -> None:
103        if self.exception or not self.watcher_results:
104            # TimeoutError, MemoryError, or process was killed (SIGABRT etc)
105            task_result = TaskResult(
106                self.task,
107                success=False,
108                exception=self.exception,
109            )
110        else:
111            task_result = self.watcher_results[0]
112
113        task_result.exit_code = self.process.exitcode
114        task_result.task = self.task
115        task_result.timeout_seconds = self.timeout_seconds
116        task_result.memory_limit_in_gib = self.memory_limit_in_gib
117
118        task_result.start_time = self.start_time
119        task_result.end_time = self.end_time
120        task_result.elapsed_time = self.elapsed_time
121
122        if self.timeout_seconds:
123            task_result.timed_out = self.timed_out
124
125        if self.memory_limit_in_gib:
126            task_result.max_memory_used_in_gib = self.max_memory_used_in_gib
127            task_result.out_of_memory = self.out_of_memory
128
129        self.task_results_queue.put(task_result)
class TaskWatcher:
 12class TaskWatcher:
 13    process: Process
 14    start_time: float
 15    end_time: float
 16    elapsed_time: float
 17
 18    def __init__(
 19        self,
 20        task: Task,
 21        task_results_queue: "Queue[TaskResult]",
 22        timeout_seconds: Optional[float] = None,
 23        memory_limit_in_gib: Optional[float] = None,
 24        sleep_time: float = 0.001,
 25        memory_limit_polling_rate: float = 0.1,
 26        auto_start: bool = True,
 27    ):
 28        self.task = task
 29        self.task_results_queue = task_results_queue
 30        self.timeout_seconds = timeout_seconds
 31        self.memory_limit_in_gib = memory_limit_in_gib
 32        self.sleep_time = sleep_time
 33        self.memory_limit_polling_rate = memory_limit_polling_rate
 34        self.auto_start = auto_start
 35
 36        self.watcher_queue: "Queue[TaskResult]" = Queue()
 37        self.watcher_results = []
 38
 39        self.exception = None
 40        self.timed_out: bool = False
 41        self.out_of_memory: bool = False
 42        self.max_memory_used_in_gib: float = 0
 43
 44        if self.auto_start:
 45            self.start_task()
 46            self.watch_task()
 47            self.update_queue()
 48
 49    def start_task(self) -> None:
 50        self.process = Process(
 51            target=execute_task_and_put_in_queue,
 52            args=(self.task, self.watcher_queue,)
 53        )
 54        self.process.start()
 55        self.start_time = time.time()
 56
 57    def terminate_task(self) -> None:
 58        self.process.terminate()
 59
 60    def terminate_task_with_timeout(self) -> None:
 61        self.terminate_task()
 62        self.timed_out = True
 63        self.exception = TimeoutError()
 64
 65    def terminate_task_with_out_of_memory(self) -> None:
 66        self.terminate_task()
 67        self.out_of_memory = True
 68        self.exception = MemoryError()
 69
 70    def clean_watcher_queue(self):
 71        while not self.watcher_queue.empty():
 72            self.watcher_results.append(self.watcher_queue.get())
 73
 74    def watch_task(self) -> None:
 75        last_memory_limit_check = time.time()
 76        while self.process.is_alive():
 77            self.clean_watcher_queue()
 78
 79            now = time.time()
 80
 81            # Monitor for timeout exceeded
 82            if self.timeout_seconds:
 83                if now - self.start_time > self.timeout_seconds:
 84                    self.terminate_task_with_timeout()
 85                    break
 86
 87            # Monitor for memory limit exceeded
 88            if self.memory_limit_in_gib:
 89                if now - last_memory_limit_check > self.memory_limit_polling_rate:
 90                    last_memory_limit_check = now
 91                    memory_usage_in_gib = get_total_memory_usage_in_gib(self.process.pid)
 92                    if memory_usage_in_gib > self.max_memory_used_in_gib:
 93                        self.max_memory_used_in_gib = memory_usage_in_gib
 94                    if memory_usage_in_gib > self.memory_limit_in_gib:
 95                        self.terminate_task_with_out_of_memory()
 96                        break
 97
 98            if self.sleep_time:
 99                time.sleep(self.sleep_time)
100
101        self.clean_watcher_queue()
102        self.end_time = time.time()
103        self.elapsed_time = self.end_time - self.start_time
104
105    def update_queue(self) -> None:
106        if self.exception or not self.watcher_results:
107            # TimeoutError, MemoryError, or process was killed (SIGABRT etc)
108            task_result = TaskResult(
109                self.task,
110                success=False,
111                exception=self.exception,
112            )
113        else:
114            task_result = self.watcher_results[0]
115
116        task_result.exit_code = self.process.exitcode
117        task_result.task = self.task
118        task_result.timeout_seconds = self.timeout_seconds
119        task_result.memory_limit_in_gib = self.memory_limit_in_gib
120
121        task_result.start_time = self.start_time
122        task_result.end_time = self.end_time
123        task_result.elapsed_time = self.elapsed_time
124
125        if self.timeout_seconds:
126            task_result.timed_out = self.timed_out
127
128        if self.memory_limit_in_gib:
129            task_result.max_memory_used_in_gib = self.max_memory_used_in_gib
130            task_result.out_of_memory = self.out_of_memory
131
132        self.task_results_queue.put(task_result)
TaskWatcher( task: glasswall.multiprocessing.tasks.Task, task_results_queue: 'Queue[TaskResult]', timeout_seconds: Optional[float] = None, memory_limit_in_gib: Optional[float] = None, sleep_time: float = 0.001, memory_limit_polling_rate: float = 0.1, auto_start: bool = True)
18    def __init__(
19        self,
20        task: Task,
21        task_results_queue: "Queue[TaskResult]",
22        timeout_seconds: Optional[float] = None,
23        memory_limit_in_gib: Optional[float] = None,
24        sleep_time: float = 0.001,
25        memory_limit_polling_rate: float = 0.1,
26        auto_start: bool = True,
27    ):
28        self.task = task
29        self.task_results_queue = task_results_queue
30        self.timeout_seconds = timeout_seconds
31        self.memory_limit_in_gib = memory_limit_in_gib
32        self.sleep_time = sleep_time
33        self.memory_limit_polling_rate = memory_limit_polling_rate
34        self.auto_start = auto_start
35
36        self.watcher_queue: "Queue[TaskResult]" = Queue()
37        self.watcher_results = []
38
39        self.exception = None
40        self.timed_out: bool = False
41        self.out_of_memory: bool = False
42        self.max_memory_used_in_gib: float = 0
43
44        if self.auto_start:
45            self.start_task()
46            self.watch_task()
47            self.update_queue()
process: multiprocessing.context.Process
start_time: float
end_time: float
elapsed_time: float
task
task_results_queue
timeout_seconds
memory_limit_in_gib
sleep_time
memory_limit_polling_rate
auto_start
watcher_queue: "'Queue[TaskResult]'"
watcher_results
exception
timed_out: bool
out_of_memory: bool
max_memory_used_in_gib: float
def start_task(self) -> None:
49    def start_task(self) -> None:
50        self.process = Process(
51            target=execute_task_and_put_in_queue,
52            args=(self.task, self.watcher_queue,)
53        )
54        self.process.start()
55        self.start_time = time.time()
def terminate_task(self) -> None:
57    def terminate_task(self) -> None:
58        self.process.terminate()
def terminate_task_with_timeout(self) -> None:
60    def terminate_task_with_timeout(self) -> None:
61        self.terminate_task()
62        self.timed_out = True
63        self.exception = TimeoutError()
def terminate_task_with_out_of_memory(self) -> None:
65    def terminate_task_with_out_of_memory(self) -> None:
66        self.terminate_task()
67        self.out_of_memory = True
68        self.exception = MemoryError()
def clean_watcher_queue(self):
70    def clean_watcher_queue(self):
71        while not self.watcher_queue.empty():
72            self.watcher_results.append(self.watcher_queue.get())
def watch_task(self) -> None:
 74    def watch_task(self) -> None:
 75        last_memory_limit_check = time.time()
 76        while self.process.is_alive():
 77            self.clean_watcher_queue()
 78
 79            now = time.time()
 80
 81            # Monitor for timeout exceeded
 82            if self.timeout_seconds:
 83                if now - self.start_time > self.timeout_seconds:
 84                    self.terminate_task_with_timeout()
 85                    break
 86
 87            # Monitor for memory limit exceeded
 88            if self.memory_limit_in_gib:
 89                if now - last_memory_limit_check > self.memory_limit_polling_rate:
 90                    last_memory_limit_check = now
 91                    memory_usage_in_gib = get_total_memory_usage_in_gib(self.process.pid)
 92                    if memory_usage_in_gib > self.max_memory_used_in_gib:
 93                        self.max_memory_used_in_gib = memory_usage_in_gib
 94                    if memory_usage_in_gib > self.memory_limit_in_gib:
 95                        self.terminate_task_with_out_of_memory()
 96                        break
 97
 98            if self.sleep_time:
 99                time.sleep(self.sleep_time)
100
101        self.clean_watcher_queue()
102        self.end_time = time.time()
103        self.elapsed_time = self.end_time - self.start_time
def update_queue(self) -> None:
105    def update_queue(self) -> None:
106        if self.exception or not self.watcher_results:
107            # TimeoutError, MemoryError, or process was killed (SIGABRT etc)
108            task_result = TaskResult(
109                self.task,
110                success=False,
111                exception=self.exception,
112            )
113        else:
114            task_result = self.watcher_results[0]
115
116        task_result.exit_code = self.process.exitcode
117        task_result.task = self.task
118        task_result.timeout_seconds = self.timeout_seconds
119        task_result.memory_limit_in_gib = self.memory_limit_in_gib
120
121        task_result.start_time = self.start_time
122        task_result.end_time = self.end_time
123        task_result.elapsed_time = self.elapsed_time
124
125        if self.timeout_seconds:
126            task_result.timed_out = self.timed_out
127
128        if self.memory_limit_in_gib:
129            task_result.max_memory_used_in_gib = self.max_memory_used_in_gib
130            task_result.out_of_memory = self.out_of_memory
131
132        self.task_results_queue.put(task_result)