Skip to main content

Parallelize pipelines of Python async iterables/generators

Project description

asyncio-buffered-pipeline CircleCI Test Coverage

Parallelise pipelines of Python async iterables/generators.

Installation

pip install asyncio-buffered-pipeline

Usage / What problem does this solve?

If you have a pipeline of async generators, even though each is async, only one runs at any given time. For example, the below runs in (just over) 30 seconds.

import asyncio

async def gen_1():
    for value in range(0, 10):
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value

async def gen_2(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value * 2

async def gen_3(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value + 3

async def main():
    it_1 = gen_1()
    it_2 = gen_2(it_1)
    it_3 = gen_3(it_2)

    async for val in it_3:
        print(val)

asyncio.run(main())

The buffered_pipeline function allows you to make to a small change, passing each generator through its return value, to parallelise the generators to reduce this to (just over) 12 seconds.

import asyncio
from asyncio_buffer_iterable import buffered_pipeline

async def gen_1():
    for value in range(0, 10):
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value

async def gen_2(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value * 2

async def gen_3(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value + 3

async def main():
    buffer_iterable = buffered_pipeline()
    it_1 = buffer_iterable(gen_1())
    it_2 = buffer_iterable(gen_2(it_1))
    it_3 = buffer_iterable(gen_3(it_2))

    async for val in it_3:
        print(val)

asyncio.run(main())

The buffered_pipeline ensures internal tasks are cancelled on any exception.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio-buffered-pipeline-0.0.3.tar.gz (2.4 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file asyncio-buffered-pipeline-0.0.3.tar.gz.

File metadata

  • Download URL: asyncio-buffered-pipeline-0.0.3.tar.gz
  • Upload date:
  • Size: 2.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.6.0.post20200814 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.8.5

File hashes

Hashes for asyncio-buffered-pipeline-0.0.3.tar.gz
Algorithm Hash digest
SHA256 529dff060d2d9438ea2b2d6b0dc69d93a6654fe2a2c802b3381b7d9ecd254eff
MD5 e8152975d0d0f3fba3e415cdb1c88fd5
BLAKE2b-256 d41bc603cee05b40c4fac45a15fbdd1c85e966add663f365cbdb8af4160c82de

See more details on using hashes here.

File details

Details for the file asyncio_buffered_pipeline-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: asyncio_buffered_pipeline-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 3.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.6.0.post20200814 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.8.5

File hashes

Hashes for asyncio_buffered_pipeline-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 1cc7f72217ab87980d99f415de52679179d053029f146a3d1cb8bd7f3fff89bc
MD5 e1930a5b77cc6a2bf9f89a3736221b50
BLAKE2b-256 77a0ddb5900ae32e89129d9b5e8e7010d32d3fdf8f0d1fe918a0a819efb17a07

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page