Parallelize pipelines of Python async iterables/generators
Project description
asyncio-buffered-pipeline
Parallelise pipelines of Python async iterables/generators.
Installation
pip install asyncio-buffered-pipeline
Usage / What problem does this solve?
If you have a pipeline of async generators, even though each is async, only one runs at any given time. For example, the below runs in (just over) 30 seconds.
import asyncio
async def gen_1():
for value in range(0, 10):
await asyncio.sleep(1) # Could be a slow HTTP request
yield value
async def gen_2(it):
async for value in it:
await asyncio.sleep(1) # Could be a slow HTTP request
yield value * 2
async def gen_3(it):
async for value in it:
await asyncio.sleep(1) # Could be a slow HTTP request
yield value + 3
async def main():
it_1 = gen_1()
it_2 = gen_2(it_1)
it_3 = gen_3(it_2)
async for val in it_3:
print(val)
asyncio.run(main())
The buffered_pipeline
function allows you to make to a small change, passing each generator through its return value, to parallelise the generators to reduce this to (just over) 12 seconds.
import asyncio
from asyncio_buffer_iterable import buffered_pipeline
async def gen_1():
for value in range(0, 10):
await asyncio.sleep(1) # Could be a slow HTTP request
yield value
async def gen_2(it):
async for value in it:
await asyncio.sleep(1) # Could be a slow HTTP request
yield value * 2
async def gen_3(it):
async for value in it:
await asyncio.sleep(1) # Could be a slow HTTP request
yield value + 3
async def main():
buffer_iterable = buffered_pipeline()
it_1 = buffer_iterable(gen_1())
it_2 = buffer_iterable(gen_2(it_1))
it_3 = buffer_iterable(gen_3(it_2))
async for val in it_3:
print(val)
asyncio.run(main())
The buffered_pipeline
ensures internal tasks are cleaned up on any exception.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file asyncio-buffered-pipeline-0.0.1.tar.gz
.
File metadata
- Download URL: asyncio-buffered-pipeline-0.0.1.tar.gz
- Upload date:
- Size: 2.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.6.0.post20200814 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.8.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a1bdbfce00b0947e1ae0663194e1dff643434b44f86fb6e0b04401368ec61fcd |
|
MD5 | edf37f5cf28a5ae1fd00aeaba00be0c3 |
|
BLAKE2b-256 | c4c881442cb114b1a6fa9d57f401b865350c3c9758e52762dcf0e48c0ee3ab99 |
File details
Details for the file asyncio_buffered_pipeline-0.0.1-py3-none-any.whl
.
File metadata
- Download URL: asyncio_buffered_pipeline-0.0.1-py3-none-any.whl
- Upload date:
- Size: 3.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.6.0.post20200814 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.8.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 765f611965c26194f82c099b84c24e8fc7824fb15b1a5bb7bc9bbbbbf3254ee5 |
|
MD5 | c82382876c007c9496ee7e9b831a2fa8 |
|
BLAKE2b-256 | ce4cb2d5a89e69d7dd6845adb4a2758be79d5b915d2c93ac6cb83776e7c00f78 |