Asynchronous pipeline builder
Project description
Coroflow: Easy and Fast Pipelines
Coroflow makes it easy to run pipelines with coroutines and also support mixing in blocking functions and generators.
Coroflow does a lot of heavy-lifting for you:
- Manage all tasks in the pipelinen concurently in one thread using coroutines
- Pass data between tasks with queues
- Easily specify concurrency limits
- Connect stages of the pipeline with fan-out/fan-in patterns or load-balancer patterns
- Define tasks as coroutines, normal (blocking) functions, async generators or normal generators; coroflow will run it appropriately in either the event-loop, a thread pool, or optionally in a processes pool
- Provides an apache-ariflow-like api for connecting tasks
Getting Started
Coroflow makes it easy to run pipelines with coroutines and also support mixing in blocking functions and generators
from coroflow import Node, Pipeline
import asyncio
import time
class GenNode(Node):
async def execute():
"""
The execute method of the first/root Node has to be a generator,
either async or synchronous.
"""
for url in ['img_url_1', 'img_url_2', 'img_url_3']:
print(f"Yielding {url}")
await asyncio.sleep(1)
yield url
print("Generator is exhausted")
return
class DoSomething(Node):
async def execute(inpt, param=None):
"""
The execute method of all non-root Nodes should be a async
or synchronous method.
"""
# do your async pipelined work
await asyncio.sleep(1) # simulated IO delay
outp = inpt
print(f"func1: T1 sending {inpt}")
return outp
p = Pipeline()
t0 = GenNode('gen', p)
t1 = DoSomething('func1', p, kwargs={'param': 'param_t1'})
t2 = DoSomething('func2', p, kwargs={'param': 'param_t2'})
t0.set_downstream(t1)
t1.set_downstream(t2)
start_time = time.time()
p.run()
print(f"Asynchronous duration: {time.time() - start_time}s.")
Tests
Run like so:
$ pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
coroflow-4.0.1.tar.gz
(8.9 kB
view details)
Built Distribution
File details
Details for the file coroflow-4.0.1.tar.gz
.
File metadata
- Download URL: coroflow-4.0.1.tar.gz
- Upload date:
- Size: 8.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.11 CPython/3.7.3 Linux/5.10.68+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5e550cf9779799b0af6fe5f83e3ae23b11bf18bd5dbed4762f59b46a7ab11912 |
|
MD5 | 8361b136b09111c7f34680f1a815c367 |
|
BLAKE2b-256 | 262d7de067dd493cdbb6e1d93a6f124aa9da1ac95235ecc673d42fd3d52dc56c |
File details
Details for the file coroflow-4.0.1-py3-none-any.whl
.
File metadata
- Download URL: coroflow-4.0.1-py3-none-any.whl
- Upload date:
- Size: 8.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.11 CPython/3.7.3 Linux/5.10.68+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 577679d41efdc5d02716791da35f841dccc180086a9ec80db26b10021295f298 |
|
MD5 | 685d95fcfa3e15696c43a95f4814ca38 |
|
BLAKE2b-256 | 4bc869bd268da2b97456c81ecc23b1928d56f80b6519448f263b9319f5b08134 |