Parallel Processing
Functions for parallel processing of agent workers.
Functions:
-
lazy_thread_pool_processor
–Processes a stream of items in a thread pool with pull semantics.
-
process_pool_processor
–Processes a stream of items using a process pool with push semantics.
lazy_thread_pool_processor(stream, worker_func, n_workers, initializer=None, initargs=(), futures_queue_timeout=1.0, producer_thread_shutdown_timeout=10.0)
Processes a stream of items in a thread pool with pull semantics.
If n_workers is set to 0, the items are processed using a simple loop without threading (for debugging purposes).
Tasks from the input stream are submitted to the thread pool only when the main generator is asked for a value. This ensures tasks are executed on-demand, as their results are consumed.
Parameters:
-
stream
(Generator[InputType, None, None]
) –InputType generator that yields items to process.
-
n_workers
(int
) –The number of worker threads to use.
-
worker_func
(Callable[[InputType], OutputType]
) –The function to process each item.
-
initializer
(Optional[Callable[..., None]]
, default:None
) –An optional initializer function to call for each worker thread.
-
initargs
(tuple[Any, ...]
, default:()
) –Arguments for the initializer.
-
futures_queue_timeout
(float
, default:1.0
) –The timeout for the futures queue (default: 1.).
-
producer_thread_shutdown_timeout
(float
, default:10.0
) –The timeout for the producer thread (default: 10.).
Yields:
-
OutputType | Exception
–OutputType | Exception: The processed result for each input item, or an exception raised during processing of the corresponding input item.
Source code in tapeagents/parallel_processing.py
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
|
process_pool_processor(stream, worker_func, n_workers, initializer=None, initargs=(), keep_order=False)
Processes a stream of items using a process pool with push semantics.
If n_workers is set to 0, the items are processed using a simple loop without multiprocessing (for debugging purposes).
This function submits all tasks from the input stream to a process pool for concurrent processing. Unlike a thread pool, a process pool uses multiple processes, which can provide true parallelism especially useful for CPU-bound tasks. The tasks are executed immediately and run to completion irrespective of whether their results are consumed.
Parameters:
-
stream
(Generator[InputType, None, None]
) –A generator that yields items to process.
-
n_workers
(int
) –The number of worker processes to use.
-
worker_func
(Callable[[InputType], OutputType]
) –The function to process each item. This function should be picklable since it will be passed across process boundaries.
-
initializer
(None | Callable[..., None]
, default:None
) –An optional initializer function to call for each worker process. Useful for setting up process-specific resources.
-
initargs
(tuple[Any, ...]
, default:()
) –Arguments to pass to the initializer.
Yields:
-
OutputType | Exception
–OutputType | Exception: The processed result for each input item, or an exception raised during processing of the corresponding input item. Results are yielded as they become available.
Note
Since this uses multiple processes, the worker_func
and the items in the stream
should not rely on shared state with the main process unless that state is safe to
share across processes (like using multiprocessing-safe constructs).
Source code in tapeagents/parallel_processing.py
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
|