Skip to content

Parallel Processing

Functions for parallel processing of agent workers.

Functions:

lazy_thread_pool_processor(stream, worker_func, n_workers, initializer=None, initargs=(), futures_queue_timeout=1.0, producer_thread_shutdown_timeout=10.0)

Processes a stream of items in a thread pool with pull semantics.

If n_workers is set to 0, the items are processed using a simple loop without threading (for debugging purposes).

Tasks from the input stream are submitted to the thread pool only when the main generator is asked for a value. This ensures tasks are executed on-demand, as their results are consumed.

Parameters:

  • stream (Generator[InputType, None, None]) –

    InputType generator that yields items to process.

  • n_workers (int) –

    The number of worker threads to use.

  • worker_func (Callable[[InputType], OutputType]) –

    The function to process each item.

  • initializer (Optional[Callable[..., None]], default: None ) –

    An optional initializer function to call for each worker thread.

  • initargs (tuple[Any, ...], default: () ) –

    Arguments for the initializer.

  • futures_queue_timeout (float, default: 1.0 ) –

    The timeout for the futures queue (default: 1.).

  • producer_thread_shutdown_timeout (float, default: 10.0 ) –

    The timeout for the producer thread (default: 10.).

Yields:

  • OutputType | Exception

    OutputType | Exception: The processed result for each input item, or an exception raised during processing of the corresponding input item.

Source code in tapeagents/parallel_processing.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def lazy_thread_pool_processor(
    stream: Iterable[InputType],
    worker_func: Callable[[InputType], OutputType],
    n_workers: int,
    initializer: None | Callable[..., None] = None,
    initargs: tuple[Any, ...] = (),
    futures_queue_timeout: float = 1.0,
    producer_thread_shutdown_timeout: float = 10.0,
) -> Generator[OutputType | Exception, None, None]:
    """
    Processes a stream of items in a thread pool with pull semantics.

    If n_workers is set to 0, the items are processed using a simple loop
    without threading (for debugging purposes).

    Tasks from the input stream are submitted to the thread pool only when
    the main generator is asked for a value. This ensures tasks are executed
    on-demand, as their results are consumed.

    Args:
        stream (Generator[InputType, None, None]): InputType generator that yields items to process.
        n_workers (int): The number of worker threads to use.
        worker_func (Callable[[InputType], OutputType]): The function to process each item.
        initializer (Optional[Callable[..., None]]): An optional initializer function
            to call for each worker thread.
        initargs (tuple[Any, ...]): Arguments for the initializer.
        futures_queue_timeout (float): The timeout for the futures queue (default: 1.).
        producer_thread_shutdown_timeout (float): The timeout for the producer thread (default: 10.).

    Yields:
        OutputType | Exception: The processed result for each input item, or an exception raised during processing
            of the corresponding input item.
    """
    if n_workers == 0:
        # Process items using a simple loop without threading
        yield from sequential_processor(stream, worker_func, 1, initializer, initargs)
    else:
        with ThreadPoolExecutor(
            max_workers=n_workers,
            initializer=initializer,
            initargs=initargs,
        ) as executor:
            # Bounded queue to store futures
            futures_queue: Queue[Future[OutputType]] = Queue(maxsize=n_workers)

            # Events for inter-thread communication
            producer_done_event = Event()
            consumer_stopped_event = Event()

            # Producer thread function
            def producer():
                for item in stream:
                    if consumer_stopped_event.is_set():
                        return
                    # Submit task and add to the queue
                    future = executor.submit(worker_func, item)
                    # Use put with a timeout to periodically check consumer_stopped_event
                    while True:
                        try:
                            futures_queue.put(future, timeout=futures_queue_timeout)
                            break
                        except Full:
                            if consumer_stopped_event.is_set():
                                return
                producer_done_event.set()

            producer_thread = Thread(target=producer)
            producer_thread.start()

            try:
                # Main (consumer) thread
                producer_was_done_before_get = None
                while True:
                    try:
                        # Get the next completed future
                        producer_was_done_before_get = producer_done_event.is_set()
                        future = futures_queue.get_nowait()
                        try:
                            yield future.result()
                        except Exception as e:
                            yield e
                    except Empty:
                        # If queue is empty and producer was done before we checked the queue,
                        # break out of loop
                        if producer_was_done_before_get:
                            break

            finally:
                # If consumer stops early, signal the producer to stop
                consumer_stopped_event.set()
                producer_thread.join(timeout=producer_thread_shutdown_timeout)

                if producer_thread.is_alive():
                    raise RuntimeError("Producer thread is still alive after timeout")

process_pool_processor(stream, worker_func, n_workers, initializer=None, initargs=(), keep_order=False)

Processes a stream of items using a process pool with push semantics.

If n_workers is set to 0, the items are processed using a simple loop without multiprocessing (for debugging purposes).

This function submits all tasks from the input stream to a process pool for concurrent processing. Unlike a thread pool, a process pool uses multiple processes, which can provide true parallelism especially useful for CPU-bound tasks. The tasks are executed immediately and run to completion irrespective of whether their results are consumed.

Parameters:

  • stream (Generator[InputType, None, None]) –

    A generator that yields items to process.

  • n_workers (int) –

    The number of worker processes to use.

  • worker_func (Callable[[InputType], OutputType]) –

    The function to process each item. This function should be picklable since it will be passed across process boundaries.

  • initializer (None | Callable[..., None], default: None ) –

    An optional initializer function to call for each worker process. Useful for setting up process-specific resources.

  • initargs (tuple[Any, ...], default: () ) –

    Arguments to pass to the initializer.

Yields:

  • OutputType | Exception

    OutputType | Exception: The processed result for each input item, or an exception raised during processing of the corresponding input item. Results are yielded as they become available.

Note

Since this uses multiple processes, the worker_func and the items in the stream should not rely on shared state with the main process unless that state is safe to share across processes (like using multiprocessing-safe constructs).

Source code in tapeagents/parallel_processing.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def process_pool_processor(
    stream: Iterable[InputType],
    worker_func: Callable[[InputType], OutputType],
    n_workers: int,
    initializer: None | Callable[..., None] = None,
    initargs: tuple[Any, ...] = (),
    keep_order=False,
) -> Generator[OutputType | Exception, None, None]:
    """
    Processes a stream of items using a process pool with push semantics.

    If n_workers is set to 0, the items are processed using a simple loop
    without multiprocessing (for debugging purposes).

    This function submits all tasks from the input stream to a process pool
    for concurrent processing. Unlike a thread pool, a process pool uses
    multiple processes, which can provide true parallelism especially useful
    for CPU-bound tasks. The tasks are executed immediately and run to
    completion irrespective of whether their results are consumed.

    Args:
        stream (Generator[InputType, None, None]): A generator that yields items to process.
        n_workers (int): The number of worker processes to use.
        worker_func (Callable[[InputType], OutputType]): The function to process each item. This
            function should be picklable since it will be passed across process boundaries.
        initializer (None | Callable[..., None]): An optional initializer function
            to call for each worker process. Useful for setting up process-specific resources.
        initargs (tuple[Any, ...]): Arguments to pass to the initializer.

    Yields:
        OutputType | Exception: The processed result for each input item, or an exception raised during processing
            of the corresponding input item. Results are yielded as they become available.

    Note:
        Since this uses multiple processes, the `worker_func` and the items in the stream
        should not rely on shared state with the main process unless that state is safe to
        share across processes (like using multiprocessing-safe constructs).
    """
    if n_workers == 0:
        # Process items using a simple loop without threading
        yield from sequential_processor(stream, worker_func, 1, initializer, initargs)
    else:
        with ProcessPoolExecutor(
            max_workers=n_workers,
            initializer=initializer,
            initargs=initargs,
        ) as executor:
            futures: Iterable[Future[OutputType]] = [executor.submit(worker_func, a) for a in stream]
            if not keep_order:
                futures = as_completed(futures)
            for future in futures:
                try:
                    yield future.result()
                except Exception as e:
                    yield e