Parallelize pipelines of Python async iterables/generators
Project description
asyncio-buffered-pipeline
Parallelise pipelines of Python async iterables/generators.
Installation
pip install asyncio-buffered-pipeline
Usage / What problem does this solve?
If you have a pipeline of async generators, even though each is async, only one runs at any given time. For example, the below runs in (just over) 30 seconds.
import asyncio
async def gen_1():
for value in range(0, 10):
await asyncio.sleep(1) # Could be a slow HTTP request
yield value
async def gen_2(it):
async for value in it:
await asyncio.sleep(1) # Could be a slow HTTP request
yield value * 2
async def gen_3(it):
async for value in it:
await asyncio.sleep(1) # Could be a slow HTTP request
yield value + 3
async def main():
it_1 = gen_1()
it_2 = gen_2(it_1)
it_3 = gen_3(it_2)
async for val in it_3:
print(val)
asyncio.run(main())
The buffered_pipeline
function allows you to make to a small change, passing each generator through its return value, to parallelise the generators to reduce this to (just over) 12 seconds.
import asyncio
from asyncio_buffer_iterable import buffered_pipeline
async def gen_1():
for value in range(0, 10):
await asyncio.sleep(1) # Could be a slow HTTP request
yield value
async def gen_2(it):
async for value in it:
await asyncio.sleep(1) # Could be a slow HTTP request
yield value * 2
async def gen_3(it):
async for value in it:
await asyncio.sleep(1) # Could be a slow HTTP request
yield value + 3
async def main():
buffer_iterable = buffered_pipeline()
it_1 = buffer_iterable(gen_1())
it_2 = buffer_iterable(gen_2(it_1))
it_3 = buffer_iterable(gen_3(it_2))
async for val in it_3:
print(val)
asyncio.run(main())
The buffered_pipeline
ensures internal tasks are cancelled on any exception.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for asyncio-buffered-pipeline-0.0.3.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 529dff060d2d9438ea2b2d6b0dc69d93a6654fe2a2c802b3381b7d9ecd254eff |
|
MD5 | e8152975d0d0f3fba3e415cdb1c88fd5 |
|
BLAKE2b-256 | d41bc603cee05b40c4fac45a15fbdd1c85e966add663f365cbdb8af4160c82de |
Close
Hashes for asyncio_buffered_pipeline-0.0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1cc7f72217ab87980d99f415de52679179d053029f146a3d1cb8bd7f3fff89bc |
|
MD5 | e1930a5b77cc6a2bf9f89a3736221b50 |
|
BLAKE2b-256 | 77a0ddb5900ae32e89129d9b5e8e7010d32d3fdf8f0d1fe918a0a819efb17a07 |