glasswall.multiprocessing.manager
1import os 2import time 3from collections import deque 4from multiprocessing import Process, Queue 5from typing import List, Generator, Optional 6 7from glasswall.multiprocessing.task_watcher import TaskWatcher 8from glasswall.multiprocessing.tasks import Task, TaskResult 9 10 11class GlasswallProcessManager: 12 def __init__( 13 self, 14 max_workers: Optional[int] = None, 15 worker_timeout_seconds: Optional[float] = None, 16 memory_limit_in_gib: Optional[float] = None, 17 ): 18 self.max_workers = max_workers or os.cpu_count() or 1 19 self.worker_timeout_seconds = worker_timeout_seconds 20 self.memory_limit_in_gib = memory_limit_in_gib 21 self._sleep_time: float = 0 # Time to sleep for while waiting for processes to complete 22 self._task_watcher_sleep_time: float = 0.001 # Time the TaskWatcher sleeps for while waiting for completed processes and monitoring timeout/memory 23 self._task_watcher_memory_limit_polling_rate: float = 0.1 # Polling rate for TaskWatcher to check the memory usage of a process 24 25 self.pending_processes: deque[Process] = deque() 26 self.active_processes: list[Process] = [] 27 self.task_results_queue: "Queue[TaskResult]" = Queue() 28 self.task_results: List[TaskResult] = [] 29 30 def __enter__(self): 31 return self 32 33 def __exit__(self, exc_type, exc_val, exc_tb): 34 self.start_tasks() 35 36 def queue_task(self, task: Task): 37 # Create and queue the process without starting it 38 process = Process( 39 target=TaskWatcher, 40 kwargs=dict( 41 task=task, 42 task_results_queue=self.task_results_queue, 43 timeout_seconds=self.worker_timeout_seconds, 44 memory_limit_in_gib=self.memory_limit_in_gib, 45 sleep_time=self._task_watcher_sleep_time, 46 memory_limit_polling_rate=self._task_watcher_memory_limit_polling_rate, 47 ), 48 ) 49 self.pending_processes.append(process) 50 51 def as_completed(self) -> Generator[TaskResult, None, None]: 52 while self.pending_processes or self.active_processes: 53 if self.active_processes: 54 self.wait_for_completed_process() 55 56 while len(self.active_processes) < self.max_workers and self.pending_processes: 57 process = self.pending_processes.popleft() 58 self.active_processes.append(process) 59 process.start() 60 61 for result in self.task_results: 62 self.task_results.remove(result) 63 yield result 64 65 def start_tasks(self): 66 self.task_results = list(self.as_completed()) 67 68 def wait_for_completed_process(self): 69 self.remove_completed_active_processes() 70 while len(self.active_processes) >= self.max_workers: 71 if self._sleep_time: 72 time.sleep(self._sleep_time) 73 self.remove_completed_active_processes() 74 75 def remove_completed_active_processes(self): 76 self.active_processes = [process for process in self.active_processes if process.is_alive()] 77 self.clean_task_results_queue() 78 79 def clean_task_results_queue(self): 80 while not self.task_results_queue.empty(): 81 self.task_results.append(self.task_results_queue.get())
class
GlasswallProcessManager:
14class GlasswallProcessManager: 15 def __init__( 16 self, 17 max_workers: Optional[int] = None, 18 worker_timeout_seconds: Optional[float] = None, 19 memory_limit_in_gib: Optional[float] = None, 20 ): 21 self.max_workers = max_workers or os.cpu_count() or 1 22 self.worker_timeout_seconds = worker_timeout_seconds 23 self.memory_limit_in_gib = memory_limit_in_gib 24 self._sleep_time: float = 0 # Time to sleep for while waiting for processes to complete 25 self._task_watcher_sleep_time: float = 0.001 # Time the TaskWatcher sleeps for while waiting for completed processes and monitoring timeout/memory 26 self._task_watcher_memory_limit_polling_rate: float = 0.1 # Polling rate for TaskWatcher to check the memory usage of a process 27 28 self.pending_processes: deque[Process] = deque() 29 self.active_processes: list[Process] = [] 30 self.task_results_queue: "Queue[TaskResult]" = Queue() 31 self.task_results: List[TaskResult] = [] 32 33 def __enter__(self): 34 return self 35 36 def __exit__(self, exc_type, exc_val, exc_tb): 37 self.start_tasks() 38 39 def queue_task(self, task: Task): 40 # Create and queue the process without starting it 41 process = Process( 42 target=TaskWatcher, 43 kwargs=dict( 44 task=task, 45 task_results_queue=self.task_results_queue, 46 timeout_seconds=self.worker_timeout_seconds, 47 memory_limit_in_gib=self.memory_limit_in_gib, 48 sleep_time=self._task_watcher_sleep_time, 49 memory_limit_polling_rate=self._task_watcher_memory_limit_polling_rate, 50 ), 51 ) 52 self.pending_processes.append(process) 53 54 def as_completed(self) -> Generator[TaskResult, None, None]: 55 while self.pending_processes or self.active_processes: 56 if self.active_processes: 57 self.wait_for_completed_process() 58 59 while len(self.active_processes) < self.max_workers and self.pending_processes: 60 process = self.pending_processes.popleft() 61 self.active_processes.append(process) 62 process.start() 63 64 for result in self.task_results: 65 self.task_results.remove(result) 66 yield result 67 68 def start_tasks(self): 69 self.task_results = list(self.as_completed()) 70 71 def wait_for_completed_process(self): 72 self.remove_completed_active_processes() 73 while len(self.active_processes) >= self.max_workers: 74 if self._sleep_time: 75 time.sleep(self._sleep_time) 76 self.remove_completed_active_processes() 77 78 def remove_completed_active_processes(self): 79 self.active_processes = [process for process in self.active_processes if process.is_alive()] 80 self.clean_task_results_queue() 81 82 def clean_task_results_queue(self): 83 while not self.task_results_queue.empty(): 84 self.task_results.append(self.task_results_queue.get())
GlasswallProcessManager( max_workers: Optional[int] = None, worker_timeout_seconds: Optional[float] = None, memory_limit_in_gib: Optional[float] = None)
15 def __init__( 16 self, 17 max_workers: Optional[int] = None, 18 worker_timeout_seconds: Optional[float] = None, 19 memory_limit_in_gib: Optional[float] = None, 20 ): 21 self.max_workers = max_workers or os.cpu_count() or 1 22 self.worker_timeout_seconds = worker_timeout_seconds 23 self.memory_limit_in_gib = memory_limit_in_gib 24 self._sleep_time: float = 0 # Time to sleep for while waiting for processes to complete 25 self._task_watcher_sleep_time: float = 0.001 # Time the TaskWatcher sleeps for while waiting for completed processes and monitoring timeout/memory 26 self._task_watcher_memory_limit_polling_rate: float = 0.1 # Polling rate for TaskWatcher to check the memory usage of a process 27 28 self.pending_processes: deque[Process] = deque() 29 self.active_processes: list[Process] = [] 30 self.task_results_queue: "Queue[TaskResult]" = Queue() 31 self.task_results: List[TaskResult] = []
task_results: List[glasswall.multiprocessing.tasks.TaskResult]
39 def queue_task(self, task: Task): 40 # Create and queue the process without starting it 41 process = Process( 42 target=TaskWatcher, 43 kwargs=dict( 44 task=task, 45 task_results_queue=self.task_results_queue, 46 timeout_seconds=self.worker_timeout_seconds, 47 memory_limit_in_gib=self.memory_limit_in_gib, 48 sleep_time=self._task_watcher_sleep_time, 49 memory_limit_polling_rate=self._task_watcher_memory_limit_polling_rate, 50 ), 51 ) 52 self.pending_processes.append(process)
def
as_completed( self) -> Generator[glasswall.multiprocessing.tasks.TaskResult, NoneType, NoneType]:
54 def as_completed(self) -> Generator[TaskResult, None, None]: 55 while self.pending_processes or self.active_processes: 56 if self.active_processes: 57 self.wait_for_completed_process() 58 59 while len(self.active_processes) < self.max_workers and self.pending_processes: 60 process = self.pending_processes.popleft() 61 self.active_processes.append(process) 62 process.start() 63 64 for result in self.task_results: 65 self.task_results.remove(result) 66 yield result