Skip to main content

Yield from multiple iterators as values become available

Project description

Project Status: Active — The project has reached a stable, usable state and is being actively developed. CI Status https://codecov.io/gh/jwodder/interleave/branch/master/graph/badge.svg https://img.shields.io/pypi/pyversions/interleave.svg MIT License

GitHub | PyPI | Issues | Changelog

The interleave package provides a function of the same name that takes a number of iterators, runs them in separate threads, and yields the values produced as soon as each one is available.

Installation

interleave requires Python 3.7 or higher. Just use pip for Python 3 (You have pip, right?) to install interleave and its dependencies:

python3 -m pip install interleave

Example

>>> from time import sleep
>>> from interleave import interleave
>>>
>>> def sleeper(idno, delays):
...     for i, d in enumerate(delays):
...         sleep(d)
...         yield (idno, i)
...
>>> with interleave(
...     [
...         sleeper(0, [0, 1, 2]),
...         sleeper(1, [2, 2, 2]),
...         sleeper(2, [5, 2, 1]),
...     ]
... ) as it:
...     for x in it:
...         print(x)
...
(0, 0)
(0, 1)
(1, 0)
(0, 2)
(1, 1)
(2, 0)
(1, 2)
(2, 1)
(2, 2)

API

interleave.interleave(
    iterators: Iterable[Iterator[T]],
    *,
    max_workers: Optional[int] = None,
    thread_name_prefix: str = "",
    queue_size: Optional[int] = None,
    onerror: interleave.OnError = interleave.STOP,
) -> interleave.Interleaver[T]

interleave() runs the given iterators in separate threads and returns an iterator that yields the values yielded by them as they become available. (See below for details on the Interleaver class.)

The max_workers and thread_name_prefix parameters are passed through to the underlying concurrent.futures.ThreadPoolExecutor (q.v.). max_workers determines the maximum number of iterators to run at one time.

The queue_size parameter sets the maximum size of the queue used internally to pipe values yielded by the iterators; when the queue is full, any iterator with a value to yield will block waiting for the next value to be dequeued by a call to the interleaver’s __next__. When queue_size is None (the default), interleave() uses a queue.SimpleQueue, which has no maximum size. When queue_size is non-None (including zero, signifying no maximum size), interleave() uses a queue.Queue, whose get() method is uninterruptible (including by KeyboardInterrupt) on Windows.

The onerror parameter is an enum that determines how interleave() should behave if one of the iterators raises an exception. The possible values are:

STOP

(default) Stop iterating over all iterators, cancel any outstanding iterators that haven’t been started yet, wait for all running threads to finish, and reraise the exception. Note that, due to the inability to stop an iterator between yields, the “waiting” step involves waiting for each currently-running iterator to yield its next value before stopping. This can deadlock if the queue fills up in the interim.

DRAIN

Like STOP, but any remaining values yielded by the iterators before they finish are yielded by the interleaver before raising the exception

FINISH_ALL

Continue running as normal and reraise the exception once all iterators have finished

FINISH_CURRENT

Like FINISH_ALL, except that only currently-running iterators are run to completion; any iterators whose threads haven’t yet been started when the exception is raised will have their jobs cancelled

Regardless of the value of onerror, any later exceptions raised by iterators after the initial exception are discarded.

class Interleaver(Generic[T])

An iterator and context manager. As an iterator, it yields the values generated by the iterators passed to the corresponding interleave() call as they become available. As a context manager, it returns itself on entry and, on exit, cleans up any unfinished threads by calling the shutdown(wait=True) method (see below).

An Interleaver will shut down its ThreadPoolExecutor and wait for the threads to finish after yielding its final value (specifically, when a call is made to __next__ that would result in StopIteration or another exception being raised). In the event that an Interleaver is abandoned before iteration completes, the associated resources may not be properly cleaned up, and threads may continue running indefinitely. For this reason, it is strongly recommended that you wrap any iteration over an Interleaver in the context manager in order to handle a premature end to iteration (including from a KeyboardInterrupt).

Besides the iterator and context manager APIs, an Interleaver has the following public method:

Interleaver.shutdown(wait: bool = True) -> None

Tell all running iterators to stop iterating, cancel any outstanding iterators that haven’t been started yet, and shut down the ThreadPoolExecutor. The wait parameter is passed through to the call to ThreadPoolExecutor.shutdown().

The Interleaver can continue to be iterated over after calling shutdown() and will yield any remaining values produced by the iterators before they stopped completely.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

interleave-0.1.1.tar.gz (13.8 kB view hashes)

Uploaded Source

Built Distribution

interleave-0.1.1-py3-none-any.whl (9.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page