Lightweight composition of async pipelines where each stage receives the previous stage's output.
Project description
async-pipeline
A small library for composing async pipelines: each Stage receives the previous stage’s output, executed in order.
Requirements
- Python 3.14 or newer
Install with uv
In your project:
uv add async-pipeline
To work on this library:
git clone <repo-url>
cd async-pipeline
uv sync
Basic usage
from async_pipeline import Pipeline, Stage
async def add_one(value: int) -> int:
return value + 1
async def multiply_by_two(value: int) -> int:
return value * 2
pipeline = Pipeline([
Stage("add_one", add_one),
Stage("multiply_by_two", multiply_by_two),
])
result = await pipeline.run(10)
assert result == 22
Synchronous handlers are supported as well (the stage’s run method remains async):
def add_one(value: int) -> int:
return value + 1
pipeline = Pipeline([
Stage("add_one", add_one),
])
result = await pipeline.run(1)
assert result == 2
Errors
Failures inside a handler are surfaced as StageExecutionError, including the stage name and the original exception:
from async_pipeline import Pipeline, Stage, StageExecutionError
async def broken(value: int) -> int:
raise RuntimeError("boom")
pipeline = Pipeline([
Stage("broken", broken),
])
try:
await pipeline.run(1)
except StageExecutionError as exc:
assert exc.stage_name == "broken"
assert isinstance(exc.original_error, RuntimeError)
A Pipeline with no stages raises ValueError at construction time.
Stage timeout
Per-stage timeouts apply only when the handler returns an awaitable (async handler). Synchronous handlers are unchanged; the timeout argument is ignored for them.
Timeouts use asyncio.timeout (not wait_for). If the awaitable runs longer than the limit, the stage raises StageExecutionError with original_error set to TimeoutError.
import asyncio
from async_pipeline import Pipeline, Stage
async def fetch_data(value: int) -> int:
await asyncio.sleep(2)
return value
pipeline = Pipeline([
Stage("fetch_data", fetch_data, timeout=1.0),
])
Invalid values (timeout <= 0) raise ValueError with message timeout must be greater than 0.
Retry
Configure automatic re-runs when a handler raises an Exception (including TimeoutError from asyncio.timeout). On success, the stage returns immediately. If every attempt fails, the library raises StageExecutionError with the last exception as original_error.
retries— extra attempts after the first (retries=0is the default: no retries). Total tries are1 + retries.retry_delay— base seconds to wait after a failed attempt before the next one. If0, noasyncio.sleepis used between attempts.backoff—"fixed"(same delay after each failure) or"exponential"(delay multiplies by2 ** (failure_number - 1)from the baseretry_delay, e.g.0.5s,1.0s,2.0s, …).
CancelledError and KeyboardInterrupt are BaseException, not Exception, so they are not retried and propagate as usual.
from async_pipeline import Stage
stage = Stage(
"api_call",
api_call,
retries=3,
retry_delay=0.5,
backoff="exponential",
timeout=5.0,
)
With timeout: each attempt is wrapped in asyncio.timeout when timeout is set, so a slow await can fail with TimeoutError, trigger a retry (after the backoff sleep), and eventually surface as StageExecutionError if all tries time out or fail.
Invalid retries, retry_delay, or backoff values raise ValueError with a clear message.
Batch processing
Run the same pipeline for many inputs in parallel, with a fixed concurrency limit and stable output order (aligned with the input sequence):
results = await pipeline.map([1, 2, 3], concurrency=5)
Implementation notes:
- Uses
asyncio.TaskGroupto run one async worker per item (notgather). - Uses
asyncio.Semaphoreso at mostconcurrencypipelines run at once; workers still start as tasks, but onlyconcurrencyof them proceed past the semaphore at a time. - Each worker calls
run()for its item and writes into a pre-sized list by index, so results stay in input order even when tasks finish out of order.
Errors (default): if any item fails, TaskGroup surfaces an ExceptionGroup (and cancels the other workers). StageExecutionError from a stage is propagated like in run() (wrapped inside the group as needed).
Errors (return_exceptions=True): failures are stored in the result list in the matching position as the exception object; the TaskGroup completes without raising, so you get list entries that are either normal outputs or an Exception (often StageExecutionError).
Development commands
uv sync
uv run pytest
uv run ruff check .
uv run mypy src
Roadmap
- Hooks — before/after each stage or for the full pipeline
License
See the LICENSE file.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_pipeline-0.4.0.tar.gz.
File metadata
- Download URL: async_pipeline-0.4.0.tar.gz
- Upload date:
- Size: 9.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
118488f3e16411dfd0569286b988a1a2a92985e9341940bb5d0835ab726df8a8
|
|
| MD5 |
13a83ba4f70e7af7ca3d67e7de1a4be8
|
|
| BLAKE2b-256 |
a03d9545accd997bacd127400ff3ae246a66e0760ea59c76f463da71e6043785
|
Provenance
The following attestation bundles were made for async_pipeline-0.4.0.tar.gz:
Publisher:
publish.yml on HenriqueKoga/async-pipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
async_pipeline-0.4.0.tar.gz -
Subject digest:
118488f3e16411dfd0569286b988a1a2a92985e9341940bb5d0835ab726df8a8 - Sigstore transparency entry: 1436076907
- Sigstore integration time:
-
Permalink:
HenriqueKoga/async-pipeline@3abe6a0425624231a08c60315a84b20c76264d16 -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/HenriqueKoga
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3abe6a0425624231a08c60315a84b20c76264d16 -
Trigger Event:
push
-
Statement type:
File details
Details for the file async_pipeline-0.4.0-py3-none-any.whl.
File metadata
- Download URL: async_pipeline-0.4.0-py3-none-any.whl
- Upload date:
- Size: 7.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fdfbf1ee571a5961f41845ac1a08cde2083f17119a92df76a6fdeb6fb9ae5da7
|
|
| MD5 |
260bf7dc12082bf895ccfee3ec7f07a0
|
|
| BLAKE2b-256 |
130c47868520af6ca76eeaf93146c4b828abc1f1731993e1ba0a16b9a2830dd4
|
Provenance
The following attestation bundles were made for async_pipeline-0.4.0-py3-none-any.whl:
Publisher:
publish.yml on HenriqueKoga/async-pipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
async_pipeline-0.4.0-py3-none-any.whl -
Subject digest:
fdfbf1ee571a5961f41845ac1a08cde2083f17119a92df76a6fdeb6fb9ae5da7 - Sigstore transparency entry: 1436076908
- Sigstore integration time:
-
Permalink:
HenriqueKoga/async-pipeline@3abe6a0425624231a08c60315a84b20c76264d16 -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/HenriqueKoga
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3abe6a0425624231a08c60315a84b20c76264d16 -
Trigger Event:
push
-
Statement type: