glasswall.multiprocessing.manager

 1import os
 2import time
 3from collections import deque
 4from multiprocessing import Process, Queue
 5from typing import List, Generator, Optional
 6
 7from glasswall.multiprocessing.task_watcher import TaskWatcher
 8from glasswall.multiprocessing.tasks import Task, TaskResult
 9
10
11class GlasswallProcessManager:
12    def __init__(
13        self,
14        max_workers: Optional[int] = None,
15        worker_timeout_seconds: Optional[float] = None,
16        memory_limit_in_gib: Optional[float] = None,
17    ):
18        self.max_workers = max_workers or os.cpu_count() or 1
19        self.worker_timeout_seconds = worker_timeout_seconds
20        self.memory_limit_in_gib = memory_limit_in_gib
21        self._sleep_time: float = 0  # Time to sleep for while waiting for processes to complete
22        self._task_watcher_sleep_time: float = 0.001  # Time the TaskWatcher sleeps for while waiting for completed processes and monitoring timeout/memory
23        self._task_watcher_memory_limit_polling_rate: float = 0.1  # Polling rate for TaskWatcher to check the memory usage of a process
24
25        self.pending_processes: deque[Process] = deque()
26        self.active_processes: list[Process] = []
27        self.task_results_queue: "Queue[TaskResult]" = Queue()
28        self.task_results: List[TaskResult] = []
29
30    def __enter__(self):
31        return self
32
33    def __exit__(self, exc_type, exc_val, exc_tb):
34        self.start_tasks()
35
36    def queue_task(self, task: Task):
37        # Create and queue the process without starting it
38        process = Process(
39            target=TaskWatcher,
40            kwargs=dict(
41                task=task,
42                task_results_queue=self.task_results_queue,
43                timeout_seconds=self.worker_timeout_seconds,
44                memory_limit_in_gib=self.memory_limit_in_gib,
45                sleep_time=self._task_watcher_sleep_time,
46                memory_limit_polling_rate=self._task_watcher_memory_limit_polling_rate,
47            ),
48        )
49        self.pending_processes.append(process)
50
51    def as_completed(self) -> Generator[TaskResult, None, None]:
52        while self.pending_processes or self.active_processes:
53            if self.active_processes:
54                self.wait_for_completed_process()
55
56            while len(self.active_processes) < self.max_workers and self.pending_processes:
57                process = self.pending_processes.popleft()
58                self.active_processes.append(process)
59                process.start()
60
61            for result in self.task_results:
62                self.task_results.remove(result)
63                yield result
64
65    def start_tasks(self):
66        self.task_results = list(self.as_completed())
67
68    def wait_for_completed_process(self):
69        self.remove_completed_active_processes()
70        while len(self.active_processes) >= self.max_workers:
71            if self._sleep_time:
72                time.sleep(self._sleep_time)
73            self.remove_completed_active_processes()
74
75    def remove_completed_active_processes(self):
76        self.active_processes = [process for process in self.active_processes if process.is_alive()]
77        self.clean_task_results_queue()
78
79    def clean_task_results_queue(self):
80        while not self.task_results_queue.empty():
81            self.task_results.append(self.task_results_queue.get())
class GlasswallProcessManager:
14class GlasswallProcessManager:
15    def __init__(
16        self,
17        max_workers: Optional[int] = None,
18        worker_timeout_seconds: Optional[float] = None,
19        memory_limit_in_gib: Optional[float] = None,
20    ):
21        self.max_workers = max_workers or os.cpu_count() or 1
22        self.worker_timeout_seconds = worker_timeout_seconds
23        self.memory_limit_in_gib = memory_limit_in_gib
24        self._sleep_time: float = 0  # Time to sleep for while waiting for processes to complete
25        self._task_watcher_sleep_time: float = 0.001  # Time the TaskWatcher sleeps for while waiting for completed processes and monitoring timeout/memory
26        self._task_watcher_memory_limit_polling_rate: float = 0.1  # Polling rate for TaskWatcher to check the memory usage of a process
27
28        self.pending_processes: deque[Process] = deque()
29        self.active_processes: list[Process] = []
30        self.task_results_queue: "Queue[TaskResult]" = Queue()
31        self.task_results: List[TaskResult] = []
32
33    def __enter__(self):
34        return self
35
36    def __exit__(self, exc_type, exc_val, exc_tb):
37        self.start_tasks()
38
39    def queue_task(self, task: Task):
40        # Create and queue the process without starting it
41        process = Process(
42            target=TaskWatcher,
43            kwargs=dict(
44                task=task,
45                task_results_queue=self.task_results_queue,
46                timeout_seconds=self.worker_timeout_seconds,
47                memory_limit_in_gib=self.memory_limit_in_gib,
48                sleep_time=self._task_watcher_sleep_time,
49                memory_limit_polling_rate=self._task_watcher_memory_limit_polling_rate,
50            ),
51        )
52        self.pending_processes.append(process)
53
54    def as_completed(self) -> Generator[TaskResult, None, None]:
55        while self.pending_processes or self.active_processes:
56            if self.active_processes:
57                self.wait_for_completed_process()
58
59            while len(self.active_processes) < self.max_workers and self.pending_processes:
60                process = self.pending_processes.popleft()
61                self.active_processes.append(process)
62                process.start()
63
64            for result in self.task_results:
65                self.task_results.remove(result)
66                yield result
67
68    def start_tasks(self):
69        self.task_results = list(self.as_completed())
70
71    def wait_for_completed_process(self):
72        self.remove_completed_active_processes()
73        while len(self.active_processes) >= self.max_workers:
74            if self._sleep_time:
75                time.sleep(self._sleep_time)
76            self.remove_completed_active_processes()
77
78    def remove_completed_active_processes(self):
79        self.active_processes = [process for process in self.active_processes if process.is_alive()]
80        self.clean_task_results_queue()
81
82    def clean_task_results_queue(self):
83        while not self.task_results_queue.empty():
84            self.task_results.append(self.task_results_queue.get())
GlasswallProcessManager( max_workers: Optional[int] = None, worker_timeout_seconds: Optional[float] = None, memory_limit_in_gib: Optional[float] = None)
15    def __init__(
16        self,
17        max_workers: Optional[int] = None,
18        worker_timeout_seconds: Optional[float] = None,
19        memory_limit_in_gib: Optional[float] = None,
20    ):
21        self.max_workers = max_workers or os.cpu_count() or 1
22        self.worker_timeout_seconds = worker_timeout_seconds
23        self.memory_limit_in_gib = memory_limit_in_gib
24        self._sleep_time: float = 0  # Time to sleep for while waiting for processes to complete
25        self._task_watcher_sleep_time: float = 0.001  # Time the TaskWatcher sleeps for while waiting for completed processes and monitoring timeout/memory
26        self._task_watcher_memory_limit_polling_rate: float = 0.1  # Polling rate for TaskWatcher to check the memory usage of a process
27
28        self.pending_processes: deque[Process] = deque()
29        self.active_processes: list[Process] = []
30        self.task_results_queue: "Queue[TaskResult]" = Queue()
31        self.task_results: List[TaskResult] = []
max_workers
worker_timeout_seconds
memory_limit_in_gib
pending_processes: collections.deque[multiprocessing.context.Process]
active_processes: list[multiprocessing.context.Process]
task_results_queue: "'Queue[TaskResult]'"
def queue_task(self, task: glasswall.multiprocessing.tasks.Task):
39    def queue_task(self, task: Task):
40        # Create and queue the process without starting it
41        process = Process(
42            target=TaskWatcher,
43            kwargs=dict(
44                task=task,
45                task_results_queue=self.task_results_queue,
46                timeout_seconds=self.worker_timeout_seconds,
47                memory_limit_in_gib=self.memory_limit_in_gib,
48                sleep_time=self._task_watcher_sleep_time,
49                memory_limit_polling_rate=self._task_watcher_memory_limit_polling_rate,
50            ),
51        )
52        self.pending_processes.append(process)
def as_completed( self) -> Generator[glasswall.multiprocessing.tasks.TaskResult, NoneType, NoneType]:
54    def as_completed(self) -> Generator[TaskResult, None, None]:
55        while self.pending_processes or self.active_processes:
56            if self.active_processes:
57                self.wait_for_completed_process()
58
59            while len(self.active_processes) < self.max_workers and self.pending_processes:
60                process = self.pending_processes.popleft()
61                self.active_processes.append(process)
62                process.start()
63
64            for result in self.task_results:
65                self.task_results.remove(result)
66                yield result
def start_tasks(self):
68    def start_tasks(self):
69        self.task_results = list(self.as_completed())
def wait_for_completed_process(self):
71    def wait_for_completed_process(self):
72        self.remove_completed_active_processes()
73        while len(self.active_processes) >= self.max_workers:
74            if self._sleep_time:
75                time.sleep(self._sleep_time)
76            self.remove_completed_active_processes()
def remove_completed_active_processes(self):
78    def remove_completed_active_processes(self):
79        self.active_processes = [process for process in self.active_processes if process.is_alive()]
80        self.clean_task_results_queue()
def clean_task_results_queue(self):
82    def clean_task_results_queue(self):
83        while not self.task_results_queue.empty():
84            self.task_results.append(self.task_results_queue.get())