Skip to main content

Parallelize pipelines of Python async iterables/generators

Project description

asyncio-buffered-pipeline CircleCI Test Coverage

Parallelise pipelines of Python async iterables/generators.

Installation

pip install asyncio-buffered-pipeline

Usage / What problem does this solve?

If you have a chain of async generators, even though each is async, only one runs at any given time. For example, the below runs in (just over) 30 seconds.

import asyncio

async def gen_1():
    for value in range(0, 10):
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value

async def gen_2(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value * 2

async def gen_3(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value + 3

async def main():
    it_1 = gen_1()
    it_2 = gen_2(it_1)
    it_3 = gen_3(it_2)

    async for val in it_3:
        print(val)

asyncio.run(main())

The buffered_pipeline function allows you to make to a small change, passing each generator through its return value, to parallelise the generators to reduce this to (just over) 12 seconds.

import asyncio
from asyncio_buffered_pipeline import buffered_pipeline

async def gen_1():
    for value in range(0, 10):
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value

async def gen_2(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value * 2

async def gen_3(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value + 3

async def main():
    buffer_iterable = buffered_pipeline()
    it_1 = buffer_iterable(gen_1())
    it_2 = buffer_iterable(gen_2(it_1))
    it_3 = buffer_iterable(gen_3(it_2))

    async for val in it_3:
        print(val)

asyncio.run(main())

The buffered_pipeline ensures internal tasks are cancelled on any exception.

Buffer size

The default buffer size is 1. This is suitable if each iteration takes approximately the same amount of time. If this is not the case, you may wish to change it using the buffer_size parameter of buffer_iterable.

it = buffer_iterable(gen(), buffer_size=2)

Features

  • Only one task is created for each buffer_iterable, in which the iterable is iterated over, with its values stored in an internal buffer.

  • All the tasks of the pipeline are cancelled if any of the generators raise an exception.

  • If a generator raises an exception, the exception is propagated to calling code.

  • The buffer size of each step in the pipeline is configurable.

  • The "chaining" is not abstracted away. You still have full control over the arguments passed to each step, and you don't need to buffer each iterable in the pipeline if you don't want to: just don't pass those through buffer_iterable.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio-buffered-pipeline-0.0.5.tar.gz (3.3 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file asyncio-buffered-pipeline-0.0.5.tar.gz.

File metadata

  • Download URL: asyncio-buffered-pipeline-0.0.5.tar.gz
  • Upload date:
  • Size: 3.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.6.0.post20200814 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.8.5

File hashes

Hashes for asyncio-buffered-pipeline-0.0.5.tar.gz
Algorithm Hash digest
SHA256 7ca02e6c0fcf4ec940bbc4c2963a8e91a91e20fbb22dae73c77a57a901d451dc
MD5 fdf5191fb635c02fe04f81e7932b1460
BLAKE2b-256 2e931413447ac0ba0cb25fb13028437b06f633c2ddfa3630425a2e63ee5b80f5

See more details on using hashes here.

File details

Details for the file asyncio_buffered_pipeline-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: asyncio_buffered_pipeline-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 4.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.6.0.post20200814 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.8.5

File hashes

Hashes for asyncio_buffered_pipeline-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 78dcf4c393fd948579b1311c2e6107f8dea676d2f7088d98e8dc3a2ec0a18de5
MD5 32e4f5d204ba84a2c3f2648622db9a81
BLAKE2b-256 f4781d8350c289748a6f886a270b35e3674d8a237a0ff70fd417e3b3f9467a63

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page