Composable async pipelines: sequential stages, hooks, middleware, bounded map, retries, and timeouts.
Project description
async-pipeline
Composable async pipelines for Python 3.14+: run stages in sequence, share context, add hooks and middleware, batch with Pipeline.map, and tune retry / timeout per stage.
Overview
- Pipeline — ordered list of Stage handlers; each output feeds the next input.
- Execution context — optional mutable
dictshared by stages and hooks for one run (or a shallow copy per item inmap). - Hooks — lightweight
before_stage/after_stageobservers (errors inside hooks are ignored by design). - Middleware — data-path wrappers around each
Stage.run(logging, timing, retries, timeouts, OpenTelemetry, …). - Typing — PEP 695 generics on
Pipeline/Stage,mypy --strictclean,py.typedincluded.
Public API: async_pipeline, async_pipeline.middlewares, async_pipeline.telemetry (optional OTEL extra).
Installation
uv
uv add async-pipeline
pip
pip install async-pipeline
OpenTelemetry extra
Tracing middleware is optional:
uv add "async-pipeline[otel]"
# or
pip install "async-pipeline[otel]"
Quickstart
import asyncio
from async_pipeline import Pipeline, Stage
async def add_one(value: int) -> int:
return value + 1
async def multiply_by_two(value: int) -> int:
return value * 2
async def main() -> None:
pipeline = Pipeline(
[
Stage("add_one", add_one),
Stage("multiply_by_two", multiply_by_two),
],
)
assert await pipeline.run(10) == 22
asyncio.run(main())
Synchronous handlers are allowed (Stage.run remains async):
def add_one(value: int) -> int:
return value + 1
pipeline = Pipeline([Stage("add_one", add_one)])
Core concepts
Pipeline
Pipeline(stages=[...], *, before_stage=..., after_stage=..., middlewares=...) runs stages in order. The first middleware in the list is the outermost wrapper (see Middleware order).
Stage
Stage(name, handler, *, timeout=..., retries=..., retry_delay=..., backoff=...). Failures surface as StageExecutionError after handler retries are exhausted. Invalid constructor arguments raise ValueError.
Execution context
Pass context= to Pipeline.run or Pipeline.map. If omitted, run uses a new empty dict for that execution. Handlers may use (value) or (value, context) when the callable accepts at least two positional parameters (detected at Stage construction). Hooks may take an extra context argument using the same arity rules.
Typed context
The context can be any mutable object, not only dict. Common choices:
dict[str, Any]TypedDict- dataclass or simple mutable class with attributes
from dataclasses import dataclass
from async_pipeline import Pipeline, Stage
@dataclass
class Ctx:
request_id: str
count: int = 0
async def handler(value: int, context: Ctx) -> int:
context.count += 1
return value + 1
pipeline = Pipeline([Stage("handler", handler)])
result = await pipeline.run(1, context=Ctx(request_id="abc"))
Pipeline.map(..., context=...) creates one shallow copy of the context per item, so workers do not share the same mutable instance.
Batch processing with Pipeline.map
results = await pipeline.map([1, 2, 3], concurrency=5)
- Uses
asyncio.TaskGroupand a semaphore for bounded concurrency. - Output list matches input order.
- Optional
context=is shallow-copied per item for safe parallelism. return_exceptions=Truestores failures in the result list instead of raisingExceptionGroup.
Error handling
Handler failures become StageExecutionError with stage_name and original_error. An empty stages sequence raises ValueError at pipeline construction.
from async_pipeline import Pipeline, Stage, StageExecutionError
async def broken(value: int) -> int:
raise RuntimeError("boom")
pipeline = Pipeline([Stage("broken", broken)])
try:
await pipeline.run(1)
except StageExecutionError as exc:
assert exc.stage_name == "broken"
assert isinstance(exc.original_error, RuntimeError)
Timeout and retry on Stage
timeout— applied withasyncio.timeoutaround awaitable handlers only.retries,retry_delay,backoff("fixed"|"exponential") — total attempts =1 + retries.KeyboardInterruptandasyncio.CancelledErrorare not retried.
See examples/retry_timeout.py.
Hooks
before_stage runs immediately before the middleware chain + Stage.run; after_stage runs after completion (success or failure). Hook callables may be sync or async. Hook failures do not stop the pipeline or mask stage errors.
Middlewares
Each middleware is an async callable:
async def mw(next, stage_name, value, context) -> Any:
return await next(value)
Or a class with async def __call__(self, next, stage_name, value, context).
Unlike hooks, middleware participates in the data path (transform values, enforce policies).
Built-in middlewares
from async_pipeline.middlewares import (
LoggingMiddleware,
RetryMiddleware,
TimeoutMiddleware,
TimingMiddleware,
)
LoggingMiddleware
import logging
pipeline = Pipeline(
[...],
middlewares=[LoggingMiddleware(logger=logging.getLogger("app"))],
)
TimingMiddleware
context = {}
pipeline = Pipeline(
[...],
middlewares=[TimingMiddleware()],
)
result = await pipeline.run(1, context=context)
print(context["timings"])
RetryMiddleware
Middleware-level retries in addition to Stage(retries=...).
pipeline = Pipeline(
[...],
middlewares=[
RetryMiddleware(
retries=3,
delay=0.5,
backoff="exponential",
),
],
)
TimeoutMiddleware
pipeline = Pipeline(
[...],
middlewares=[TimeoutMiddleware(timeout=5.0)],
)
Middleware order
List order is outside → in toward the stage, matching execution: first middleware runs first around the rest of the chain.
middlewares=[
LoggingMiddleware(),
TimingMiddleware(),
RetryMiddleware(retries=3),
TimeoutMiddleware(timeout=5.0),
]
OpenTelemetry
Optional tracing (install async-pipeline[otel]):
from async_pipeline import Pipeline, Stage
from async_pipeline.telemetry import OpenTelemetryMiddleware
pipeline = Pipeline(
[...],
middlewares=[OpenTelemetryMiddleware()],
)
Custom span attributes from context:
await pipeline.run(
1,
context={
"trace_attributes": {
"request_id": "abc-123",
}
},
)
Examples
Runnable scripts live under examples/ (see examples/README.md):
uv run python examples/basic_pipeline.py
uv run python examples/batch_processing.py
uv run python examples/typed_context.py
Development
Clone the repo, then:
uv sync --extra otel
uv run pytest
uv run ruff check .
uv run mypy src
uv build
uv run twine check dist/*
See CONTRIBUTING.md for branch conventions, PR flow, and maintainer release steps.
Release process
On each push to master, .github/workflows/release.yml:
- Reads
versionfrompyproject.toml. - Skips publish if git tag
v<version>already exists. - Runs tests, lint, typecheck,
uv build,twine check. - Publishes to PyPI with Trusted Publishing (OIDC).
- Creates and pushes the
v<version>tag.
Configure the PyPI project to trust this GitHub repository (see PyPI trusted publishing documentation).
Versioning policy
Semantic Versioning. Public exports from async_pipeline, async_pipeline.middlewares, and async_pipeline.telemetry follow semver for 1.x unless called out in the changelog.
Changelog
Release history: CHANGELOG.md.
License
This project is licensed under the MIT License — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_pipeline-1.1.0.tar.gz.
File metadata
- Download URL: async_pipeline-1.1.0.tar.gz
- Upload date:
- Size: 28.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8a5ae9862c539d83c4a8ee42e453f2351b0e756a3db3f6659d6110bf55931ee8
|
|
| MD5 |
3e39c3faf34dd18f6ba5030dedec4423
|
|
| BLAKE2b-256 |
6f66f03024cb7f12546ea83270fff9700b589fba87e206eaf9a71745bcbe03ae
|
Provenance
The following attestation bundles were made for async_pipeline-1.1.0.tar.gz:
Publisher:
release.yml on HenriqueKoga/async-pipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
async_pipeline-1.1.0.tar.gz -
Subject digest:
8a5ae9862c539d83c4a8ee42e453f2351b0e756a3db3f6659d6110bf55931ee8 - Sigstore transparency entry: 1441565075
- Sigstore integration time:
-
Permalink:
HenriqueKoga/async-pipeline@2beb9256fc0172cf5eaab12a11b035b6f7ad1cca -
Branch / Tag:
refs/heads/master - Owner: https://github.com/HenriqueKoga
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@2beb9256fc0172cf5eaab12a11b035b6f7ad1cca -
Trigger Event:
push
-
Statement type:
File details
Details for the file async_pipeline-1.1.0-py3-none-any.whl.
File metadata
- Download URL: async_pipeline-1.1.0-py3-none-any.whl
- Upload date:
- Size: 20.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bad8612e27b42579798a6b737248cad1281d0f43aafdcf0e0a5a6e278f57eaf5
|
|
| MD5 |
156823735c5f6a8e0352751e020892d8
|
|
| BLAKE2b-256 |
b8b2eedb87d33a7fbf260ce825ae3685356e2135b824fb94c5819dde5c71e96a
|
Provenance
The following attestation bundles were made for async_pipeline-1.1.0-py3-none-any.whl:
Publisher:
release.yml on HenriqueKoga/async-pipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
async_pipeline-1.1.0-py3-none-any.whl -
Subject digest:
bad8612e27b42579798a6b737248cad1281d0f43aafdcf0e0a5a6e278f57eaf5 - Sigstore transparency entry: 1441565211
- Sigstore integration time:
-
Permalink:
HenriqueKoga/async-pipeline@2beb9256fc0172cf5eaab12a11b035b6f7ad1cca -
Branch / Tag:
refs/heads/master - Owner: https://github.com/HenriqueKoga
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@2beb9256fc0172cf5eaab12a11b035b6f7ad1cca -
Trigger Event:
push
-
Statement type: