Skip to main content

Lightweight composition of async pipelines where each stage receives the previous stage's output.

Project description

async-pipeline

PyPI version Python versions CI

A small library for composing async pipelines: each Stage receives the previous stage’s output, executed in order.

Requirements

  • Python 3.14 or newer

Install with uv

In your project:

uv add async-pipeline

To work on this library:

git clone <repo-url>
cd async-pipeline
uv sync

Basic usage

from async_pipeline import Pipeline, Stage

async def add_one(value: int) -> int:
    return value + 1

async def multiply_by_two(value: int) -> int:
    return value * 2

pipeline = Pipeline([
    Stage("add_one", add_one),
    Stage("multiply_by_two", multiply_by_two),
])

result = await pipeline.run(10)
assert result == 22

Synchronous handlers are supported as well (the stage’s run method remains async):

def add_one(value: int) -> int:
    return value + 1

pipeline = Pipeline([
    Stage("add_one", add_one),
])

result = await pipeline.run(1)
assert result == 2

Errors

Failures inside a handler are surfaced as StageExecutionError, including the stage name and the original exception:

from async_pipeline import Pipeline, Stage, StageExecutionError

async def broken(value: int) -> int:
    raise RuntimeError("boom")

pipeline = Pipeline([
    Stage("broken", broken),
])

try:
    await pipeline.run(1)
except StageExecutionError as exc:
    assert exc.stage_name == "broken"
    assert isinstance(exc.original_error, RuntimeError)

A Pipeline with no stages raises ValueError at construction time.

Batch processing

Run the same pipeline for many inputs in parallel, with a fixed concurrency limit and stable output order (aligned with the input sequence):

results = await pipeline.map([1, 2, 3], concurrency=5)

Implementation notes:

  • Uses asyncio.TaskGroup to run one async worker per item (not gather).
  • Uses asyncio.Semaphore so at most concurrency pipelines run at once; workers still start as tasks, but only concurrency of them proceed past the semaphore at a time.
  • Each worker calls run() for its item and writes into a pre-sized list by index, so results stay in input order even when tasks finish out of order.

Errors (default): if any item fails, TaskGroup surfaces an ExceptionGroup (and cancels the other workers). StageExecutionError from a stage is propagated like in run() (wrapped inside the group as needed).

Errors (return_exceptions=True): failures are stored in the result list in the matching position as the exception object; the TaskGroup completes without raising, so you get list entries that are either normal outputs or an Exception (often StageExecutionError).

Development commands

uv sync
uv run pytest
uv run ruff check .
uv run mypy src

Roadmap

  • Retry — retry policies per stage or for the whole pipeline
  • Timeout — cap how long a stage may run
  • Hooks — before/after each stage or the full pipeline

License

See the LICENSE file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_pipeline-0.2.0.tar.gz (7.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_pipeline-0.2.0-py3-none-any.whl (6.3 kB view details)

Uploaded Python 3

File details

Details for the file async_pipeline-0.2.0.tar.gz.

File metadata

  • Download URL: async_pipeline-0.2.0.tar.gz
  • Upload date:
  • Size: 7.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for async_pipeline-0.2.0.tar.gz
Algorithm Hash digest
SHA256 7df0ae3b46ddd3beced02ef843139ff2726ec52a40ac63513af75ffce8be1760
MD5 08caf146ac1c446379c5bd55161172f3
BLAKE2b-256 6a64305ced0eaba5a6bfac59e43d7bce0a3700d4383cfa4428939c92bedba7cb

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_pipeline-0.2.0.tar.gz:

Publisher: publish.yml on HenriqueKoga/async-pipeline

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file async_pipeline-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: async_pipeline-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 6.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for async_pipeline-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6e835be683de0f8dcb589778fa6a38fa61175b4f9b0f120eed6fa61f32bb7b60
MD5 5f8c4e74a4b54ca449f19718d3c7f2de
BLAKE2b-256 79c3987138ed9cf214b04153618776110f93c505e44e5cb506fa9f440037af02

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_pipeline-0.2.0-py3-none-any.whl:

Publisher: publish.yml on HenriqueKoga/async-pipeline

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page