Skip to main content

Parallelize pipelines of Python iterables

Project description

threaded-buffered-pipeline CircleCI Test Coverage

Parallelise pipelines of Python iterables

Installation

pip install threaded-buffered-pipeline

Usage / What problem does this solve?

If you have a chain of generators, only one runs at any given time. For example, the below runs in (just over) 30 seconds.

import time

def gen_1():
    for value in range(0, 10):
        time.sleep(1)  # Could be a slow HTTP request
        yield value

def gen_2(it):
    for value in it:
        time.sleep(1)  # Could be a slow HTTP request
        yield value * 2

def gen_3(it):
    for value in it:
        time.sleep(1)  # Could be a slow HTTP request
        yield value + 3

def main():
    it_1 = gen_1()
    it_2 = gen_2(it_1)
    it_3 = gen_3(it_2)

    for val in it_3:
        print(val)

main()

The buffered_pipeline function allows you to make to a small change, passing each generator through its return value, to parallelise the generators to reduce this to (just over) 12 seconds.

import time
from threaded_buffered_pipeline import buffered_pipeline

def gen_1():
    for value in range(0, 10):
        time.sleep(1)  # Could be a slow HTTP request
        yield value

def gen_2(it):
    for value in it:
        time.sleep(1)  # Could be a slow HTTP request
        yield value * 2

def gen_3(it):
    for value in it:
        time.sleep(1)  # Could be a slow HTTP request
        yield value + 3

def main():
    buffer_iterable = buffered_pipeline()
    it_1 = buffer_iterable(gen_1())
    it_2 = buffer_iterable(gen_2(it_1))
    it_3 = buffer_iterable(gen_3(it_2))

    for val in it_3:
        print(val)

main()

The buffered_pipeline ensures internal threads are stopped on any exception [the next time each thread attempts to pull from the iterator].

Buffer size

The default buffer size is 1. This is suitable if each iteration takes approximately the same amount of time. If this is not the case, you may wish to change it using the buffer_size parameter of buffer_iterable.

it = buffer_iterable(gen(), buffer_size=2)

Features

  • One thread is created for each buffer_iterable, in which the iterable is iterated over, with its values stored in an internal buffer.

  • All the threads of the pipeline are stopped if any of the generators raise an exception.

  • If a generator raises an exception, the exception is propagated to calling code.

  • The buffer size of each step in the pipeline is configurable.

  • The "chaining" is not abstracted away. You still have full control over the arguments passed to each step, and you don't need to buffer each iterable in the pipeline if you don't want to: just don't pass those through buffer_iterable.

Asyncio

A version for async iterables is available at https://github.com/michalc/asyncio-buffered-pipeline

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded-buffered-pipeline-0.0.8.tar.gz (3.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

threaded_buffered_pipeline-0.0.8-py3-none-any.whl (4.7 kB view details)

Uploaded Python 3

File details

Details for the file threaded-buffered-pipeline-0.0.8.tar.gz.

File metadata

  • Download URL: threaded-buffered-pipeline-0.0.8.tar.gz
  • Upload date:
  • Size: 3.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.2 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.8.5

File hashes

Hashes for threaded-buffered-pipeline-0.0.8.tar.gz
Algorithm Hash digest
SHA256 4b620cb56cb166ee83b0284422e37ddf764a727ffddbe04d3c5f43e6b79620b0
MD5 3b1d9ec69e3a17880f30832fc8f01bb7
BLAKE2b-256 59844c87a9da69eed0ff98eabe1a36cd91c52e8a5240fcc143bb152460824be6

See more details on using hashes here.

File details

Details for the file threaded_buffered_pipeline-0.0.8-py3-none-any.whl.

File metadata

  • Download URL: threaded_buffered_pipeline-0.0.8-py3-none-any.whl
  • Upload date:
  • Size: 4.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.2 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.8.5

File hashes

Hashes for threaded_buffered_pipeline-0.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 709318e5b8e52802b5c269d7166248dd10a7d4d4f239e5dd939c65e7358a3099
MD5 7aa37a28a3381f19b89ee8192deb6cf0
BLAKE2b-256 0d2e99503cbb378b29d1ecfd82de68f2d37ab9c3a6e4a25367cd0b6c3cb908d7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page