Skip to main content

Lightweight composition of async pipelines where each stage receives the previous stage's output.

Project description

async-pipeline

A small library for composing async pipelines: each Stage receives the previous stage’s output, executed in order.

Requirements

  • Python 3.14 or newer

Install with uv

In your project:

uv add async-pipeline

To work on this library:

git clone <repo-url>
cd async-pipeline
uv sync

Basic usage

from async_pipeline import Pipeline, Stage

async def add_one(value: int) -> int:
    return value + 1

async def multiply_by_two(value: int) -> int:
    return value * 2

pipeline = Pipeline([
    Stage("add_one", add_one),
    Stage("multiply_by_two", multiply_by_two),
])

result = await pipeline.run(10)
assert result == 22

Synchronous handlers are supported as well (the stage’s run method remains async):

def add_one(value: int) -> int:
    return value + 1

pipeline = Pipeline([
    Stage("add_one", add_one),
])

result = await pipeline.run(1)
assert result == 2

Errors

Failures inside a handler are surfaced as StageExecutionError, including the stage name and the original exception:

from async_pipeline import Pipeline, Stage, StageExecutionError

async def broken(value: int) -> int:
    raise RuntimeError("boom")

pipeline = Pipeline([
    Stage("broken", broken),
])

try:
    await pipeline.run(1)
except StageExecutionError as exc:
    assert exc.stage_name == "broken"
    assert isinstance(exc.original_error, RuntimeError)

A Pipeline with no stages raises ValueError at construction time.

Development commands

uv sync
uv run pytest
uv run ruff check .
uv run mypy src

Roadmap

  • Retry — retry policies per stage or for the whole pipeline
  • Timeout — cap how long a stage may run
  • Hooks — before/after each stage or the full pipeline
  • Concurrent map — a stage that processes collections with bounded concurrency

License

See the LICENSE file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_pipeline-0.1.3.tar.gz (5.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_pipeline-0.1.3-py3-none-any.whl (5.3 kB view details)

Uploaded Python 3

File details

Details for the file async_pipeline-0.1.3.tar.gz.

File metadata

  • Download URL: async_pipeline-0.1.3.tar.gz
  • Upload date:
  • Size: 5.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for async_pipeline-0.1.3.tar.gz
Algorithm Hash digest
SHA256 63f22c9e4e180ca7454783ca622089b5eef763448d52dd9e104a4382f72364ba
MD5 df87cdabe8beb2de38de81f8952c6ee1
BLAKE2b-256 419920134fc9e2466c9005dc5c6616b5e36967be1b148c534b60b26f27a8b400

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_pipeline-0.1.3.tar.gz:

Publisher: publish.yml on HenriqueKoga/async-pipeline

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file async_pipeline-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: async_pipeline-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 5.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for async_pipeline-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 0cb12a041bc5c93ebc2a7f3f019c6911e24a4b9f6c3b6ee9c0f77815d86732c7
MD5 64ecdb7b0fa47f67a3ca7cc2617aa272
BLAKE2b-256 e44537ba18548e89c1a67ef2c5c675602b413590ba75a4c41e2531bd51a94f4c

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_pipeline-0.1.3-py3-none-any.whl:

Publisher: publish.yml on HenriqueKoga/async-pipeline

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page