Skip to main content

Composable async pipelines: sequential stages, hooks, middleware, bounded map, retries, and timeouts.

Project description

async-pipeline

PyPI version Python versions CI Release License: MIT Typing: typed

Composable async pipelines for Python 3.14+: run stages in sequence, share context, add hooks and middleware, batch with Pipeline.map, and tune retry / timeout per stage.

Overview

  • Pipeline — ordered list of Stage handlers; each output feeds the next input.
  • Execution context — optional mutable dict shared by stages and hooks for one run (or a shallow copy per item in map).
  • Hooks — lightweight before_stage / after_stage observers (errors inside hooks are ignored by design).
  • Middleware — data-path wrappers around each Stage.run (logging, timing, retries, timeouts, OpenTelemetry, …).
  • Typing — PEP 695 generics on Pipeline / Stage, mypy --strict clean, py.typed included.

Public API: async_pipeline, async_pipeline.middlewares, async_pipeline.telemetry (optional OTEL extra).

Installation

uv

uv add async-pipeline

pip

pip install async-pipeline

OpenTelemetry extra

Tracing middleware is optional:

uv add "async-pipeline[otel]"
# or
pip install "async-pipeline[otel]"

Quickstart

import asyncio

from async_pipeline import Pipeline, Stage


async def add_one(value: int) -> int:
    return value + 1


async def multiply_by_two(value: int) -> int:
    return value * 2


async def main() -> None:
    pipeline = Pipeline(
        [
            Stage("add_one", add_one),
            Stage("multiply_by_two", multiply_by_two),
        ],
    )
    assert await pipeline.run(10) == 22


asyncio.run(main())

Synchronous handlers are allowed (Stage.run remains async):

def add_one(value: int) -> int:
    return value + 1


pipeline = Pipeline([Stage("add_one", add_one)])

Core concepts

Pipeline

Pipeline(stages=[...], *, before_stage=..., after_stage=..., middlewares=...) runs stages in order. The first middleware in the list is the outermost wrapper (see Middleware order).

Stage

Stage(name, handler, *, timeout=..., retries=..., retry_delay=..., backoff=...). Failures surface as StageExecutionError after handler retries are exhausted. Invalid constructor arguments raise ValueError.

Execution context

Pass context= to Pipeline.run or Pipeline.map. If omitted, run uses a new empty dict for that execution. Handlers may use (value) or (value, context) when the callable accepts at least two positional parameters (detected at Stage construction). Hooks may take an extra context argument using the same arity rules.

Typed context

The context can be any mutable object, not only dict. Common choices:

  • dict[str, Any]
  • TypedDict
  • dataclass or simple mutable class with attributes
from dataclasses import dataclass

from async_pipeline import Pipeline, Stage


@dataclass
class Ctx:
    request_id: str
    count: int = 0


async def handler(value: int, context: Ctx) -> int:
    context.count += 1
    return value + 1


pipeline = Pipeline([Stage("handler", handler)])
result = await pipeline.run(1, context=Ctx(request_id="abc"))

Pipeline.map(..., context=...) creates one shallow copy of the context per item, so workers do not share the same mutable instance.

Batch processing with Pipeline.map

results = await pipeline.map([1, 2, 3], concurrency=5)
  • Uses asyncio.TaskGroup and a semaphore for bounded concurrency.
  • Output list matches input order.
  • Optional context= is shallow-copied per item for safe parallelism.
  • return_exceptions=True stores failures in the result list instead of raising ExceptionGroup.

Error handling

Handler failures become StageExecutionError with stage_name and original_error. An empty stages sequence raises ValueError at pipeline construction.

from async_pipeline import Pipeline, Stage, StageExecutionError

async def broken(value: int) -> int:
    raise RuntimeError("boom")


pipeline = Pipeline([Stage("broken", broken)])

try:
    await pipeline.run(1)
except StageExecutionError as exc:
    assert exc.stage_name == "broken"
    assert isinstance(exc.original_error, RuntimeError)

Timeout and retry on Stage

  • timeout — applied with asyncio.timeout around awaitable handlers only.
  • retries, retry_delay, backoff ("fixed" | "exponential") — total attempts = 1 + retries.
  • KeyboardInterrupt and asyncio.CancelledError are not retried.

See examples/retry_timeout.py.

Hooks

before_stage runs immediately before the middleware chain + Stage.run; after_stage runs after completion (success or failure). Hook callables may be sync or async. Hook failures do not stop the pipeline or mask stage errors.

Middlewares

Each middleware is an async callable:

async def mw(next, stage_name, value, context) -> Any:
    return await next(value)

Or a class with async def __call__(self, next, stage_name, value, context).

Unlike hooks, middleware participates in the data path (transform values, enforce policies).

Built-in middlewares

from async_pipeline.middlewares import (
    LoggingMiddleware,
    RetryMiddleware,
    TimeoutMiddleware,
    TimingMiddleware,
)

LoggingMiddleware

import logging

pipeline = Pipeline(
    [...],
    middlewares=[LoggingMiddleware(logger=logging.getLogger("app"))],
)

TimingMiddleware

context = {}
pipeline = Pipeline(
    [...],
    middlewares=[TimingMiddleware()],
)
result = await pipeline.run(1, context=context)
print(context["timings"])

RetryMiddleware

Middleware-level retries in addition to Stage(retries=...).

pipeline = Pipeline(
    [...],
    middlewares=[
        RetryMiddleware(
            retries=3,
            delay=0.5,
            backoff="exponential",
        ),
    ],
)

TimeoutMiddleware

pipeline = Pipeline(
    [...],
    middlewares=[TimeoutMiddleware(timeout=5.0)],
)

Middleware order

List order is outside → in toward the stage, matching execution: first middleware runs first around the rest of the chain.

middlewares=[
    LoggingMiddleware(),
    TimingMiddleware(),
    RetryMiddleware(retries=3),
    TimeoutMiddleware(timeout=5.0),
]

OpenTelemetry

Optional tracing (install async-pipeline[otel]):

from async_pipeline import Pipeline, Stage
from async_pipeline.telemetry import OpenTelemetryMiddleware

pipeline = Pipeline(
    [...],
    middlewares=[OpenTelemetryMiddleware()],
)

Custom span attributes from context:

await pipeline.run(
    1,
    context={
        "trace_attributes": {
            "request_id": "abc-123",
        }
    },
)

Examples

Runnable scripts live under examples/ (see examples/README.md):

uv run python examples/basic_pipeline.py
uv run python examples/batch_processing.py
uv run python examples/typed_context.py

Development

Clone the repo, then:

uv sync --extra otel
uv run pytest
uv run ruff check .
uv run mypy src
uv build
uv run twine check dist/*

See CONTRIBUTING.md for branch conventions, PR flow, and maintainer release steps.

Release process

On each push to master, .github/workflows/release.yml:

  1. Reads version from pyproject.toml.
  2. Skips publish if git tag v<version> already exists.
  3. Runs tests, lint, typecheck, uv build, twine check.
  4. Publishes to PyPI with Trusted Publishing (OIDC).
  5. Creates and pushes the v<version> tag.

Configure the PyPI project to trust this GitHub repository (see PyPI trusted publishing documentation).

Versioning policy

Semantic Versioning. Public exports from async_pipeline, async_pipeline.middlewares, and async_pipeline.telemetry follow semver for 1.x unless called out in the changelog.

Changelog

Release history: CHANGELOG.md.

License

This project is licensed under the MIT License — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_pipeline-1.1.0.tar.gz (28.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_pipeline-1.1.0-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file async_pipeline-1.1.0.tar.gz.

File metadata

  • Download URL: async_pipeline-1.1.0.tar.gz
  • Upload date:
  • Size: 28.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for async_pipeline-1.1.0.tar.gz
Algorithm Hash digest
SHA256 8a5ae9862c539d83c4a8ee42e453f2351b0e756a3db3f6659d6110bf55931ee8
MD5 3e39c3faf34dd18f6ba5030dedec4423
BLAKE2b-256 6f66f03024cb7f12546ea83270fff9700b589fba87e206eaf9a71745bcbe03ae

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_pipeline-1.1.0.tar.gz:

Publisher: release.yml on HenriqueKoga/async-pipeline

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file async_pipeline-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: async_pipeline-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 20.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for async_pipeline-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bad8612e27b42579798a6b737248cad1281d0f43aafdcf0e0a5a6e278f57eaf5
MD5 156823735c5f6a8e0352751e020892d8
BLAKE2b-256 b8b2eedb87d33a7fbf260ce825ae3685356e2135b824fb94c5819dde5c71e96a

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_pipeline-1.1.0-py3-none-any.whl:

Publisher: release.yml on HenriqueKoga/async-pipeline

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page