Skip to main content

Composable async pipelines with structured concurrency — built on anyio, inspired by aiostream and Rust's Result type.

Project description

anyiostream

Composable async pipelines with structured concurrency

CI latest PyPI Python License


anyiostream provides lazy, composable async pipelines with true inter-stage concurrency, backpressure, and Rust-inspired error handling — all built on anyio for seamless asyncio + trio support.

Why anyiostream?

Python has excellent async primitives, but a gap exists between raw concurrency tools and declarative pipeline APIs:

  • aiostream pioneered composable | pipe syntax — but uses nested async generators in a single task. Stages execute sequentially via __anext__() pull chains, not as concurrent tasks. asyncio-only.
  • anyio provides the right primitives (TaskGroup, MemoryObjectStream, .clone()) — but no pipeline abstraction. Wiring a 3-stage concurrent pipeline requires ~30 lines of boilerplate.

anyiostream bridges this gap with the CSP (Communicating Sequential Processes) pattern: each stage runs as an independent task, connected by bounded channels — the same model as Go channels.

Feature aiostream anyio (raw) anyiostream
Inter-stage concurrency No — single-task generator pull Manual (~30 LOC per pipeline) Yes — task-per-stage in TaskGroup
Fan-out workers=N No (task_limit within one stage) Manual (clone streams yourself) Yes — load-balanced via clone()
Backpressure No (pull-based) Yes — manual wiring Yesbuffer_size per stage
Result Ok/Err types No No Yes — railway-oriented error handling
Backend asyncio only asyncio + trio asyncio + trio
Pipe | syntax Yes No Yes
Structured concurrency Partial Yes Yes — automatic cleanup
30 lines of raw anyio → 4 lines of anyiostream
# Raw anyio — manual channel wiring
async def manual_pipeline():
    s0, r0 = anyio.create_memory_object_stream(10)
    s1, r1 = anyio.create_memory_object_stream(10)
    s2, r2 = anyio.create_memory_object_stream(10)

    async def source(send):
        async with send:
            for url in urls:
                await send.send(url)

    async def worker(recv, send):
        async with recv, send:
            async for url in recv:
                await send.send(await fetch(url))

    async with anyio.create_task_group() as tg:
        tg.start_soon(source, s0)
        for _ in range(3):
            tg.start_soon(worker, r0.clone(), s1.clone())
        r0.close(); s1.close()
        # ... repeat for stage 2 ...

# anyiostream — same behavior
result = await (
    Stream.from_iterable(urls)
    .map(fetch, workers=3, buffer_size=10)
    .map(parse)
    .collect()
)

Features

  • Lazy pipelines — nothing runs until a terminal operation (collect, count, reduce, first, take)
  • True inter-stage concurrency — each stage runs in its own task, items flow between stages via bounded channels
  • Backpressure — bounded memory object streams prevent fast producers from overwhelming slow consumers
  • Fan-out workers — scale any stage horizontally with workers=N
  • Two composition styles — method chaining or aiostream-inspired | pipe syntax
  • Rust-inspired Ok/Errtry_map, try_filter, recover, collect_split for railway-oriented error handling
  • Backend-portable — runs on both asyncio and trio via anyio
  • Structured concurrency — automatic cleanup via TaskGroup, no leaked tasks

Installation

pip install anyiostream

Or with uv:

uv add anyiostream

Quick Start

Method Chaining

from anyiostream import Stream

result = await (
    Stream.from_iterable(range(100))
    .map(lambda x: x * 2, workers=4)
    .filter(lambda x: x > 50)
    .collect()
)

Pipe Operator (aiostream-style)

from anyiostream import Stream, pipe

result = await (
    Stream.from_iterable(urls)
    | pipe.map(fetch, workers=10)
    | pipe.flat_map(extract_links, workers=5)
    | pipe.filter(is_valid)
    | pipe.map(normalize)
    | pipe.collect()
)

Error Handling (Rust-style Ok/Err)

from anyiostream import Stream, pipe, Ok, Err

# Exceptions become Err(PipelineError(...)) instead of crashing
oks, errs = await (
    Stream.from_iterable(urls)
    | pipe.try_map(fetch, workers=5)        # Ok(response) or Err(PipelineError)
    | pipe.try_map(parse)                    # chains on Ok, passes Err through
    | pipe.collect_split()                   # partition into (successes, failures)
)

# Custom error handler — transform Err items instead of passing through
results = await (
    Stream.from_iterable(urls)
    | pipe.try_map(fetch, workers=5, err=lambda e: log_and_rewrap(e))
    | pipe.try_map(parse, err=lambda e: e)   # pass Err unchanged explicitly
    | pipe.collect()
)

# Or recover from errors
results = await (
    Stream.from_iterable(urls)
    | pipe.try_map(fetch, workers=5)
    | pipe.recover(lambda err: fallback(err.item))  # Err → fallback value
    | pipe.collect()
)

Context Manager (manual iteration)

from anyiostream import Stream

pipeline = (
    Stream.from_iterable(events)
    .map(process, workers=4)
    .filter(is_important)
)

async with pipeline.open() as items:
    async for item in items:
        await handle(item)

API Overview

Stream Constructors

Method Description
Stream.from_iterable(items) Create from sync or async iterable
Stream.from_callable(factory) Lazy — factory called at execution time

Transform Stages

Method Pipe Syntax Description
.map(fn) | pipe.map(fn) 1:1 transform (sync or async)
.flat_map(fn) | pipe.flat_map(fn) 1:N transform
.filter(pred) | pipe.filter(pred) Keep items where pred is truthy
.foreach(fn) | pipe.foreach(fn) Side-effect, passes items through

Result-Aware Stages

Method Pipe Syntax Description
.try_map(fn, err=handler) | pipe.try_map(fn, err=handler) Map with Ok/Err wrapping
.try_flat_map(fn, err=handler) | pipe.try_flat_map(fn, err=handler) Flat map with Ok/Err wrapping
.try_filter(pred) | pipe.try_filter(pred) Filter Ok values, Err passes through
.try_foreach(fn, err=handler) | pipe.try_foreach(fn, err=handler) Side-effect on Ok values
.recover(fn) | pipe.recover(fn) Convert Err → value, unwrap Ok
.ok_only() | pipe.ok_only() Keep Ok values, drop Err
.errors_only() | pipe.errors_only() Keep Err values, drop Ok

err=handler (optional): When provided, Err items are transformed by handler(error) instead of passing through unchanged. Omit to let errors flow downstream as-is.

Terminal Operations

Method Pipe Syntax Description
.collect() | pipe.collect() Collect all items into a list
.count() | pipe.count() Consume all, return count
.collect_split() | pipe.collect_split() Partition into (oks, errs)
.reduce(fn, init) Fold into single value
.first() Return first item or None
.take(n) Collect at most n items
.open() Context manager for manual iteration

Stage Options

Every stage accepts these keyword arguments:

Option Default Description
workers 1 Number of concurrent workers
buffer_size 0 Backpressure buffer (0 = rendezvous, math.inf = unbounded)
name None Human-readable label for debugging

How It Works

Source → [channel] → Stage 1 → [channel] → Stage 2 → [channel] → Terminal
           ↑ bounded      workers=N            workers=M
           backpressure    (fan-out)            (fan-out)
  1. Lazy recipeStream holds a list of Process descriptors. Nothing runs yet.
  2. Terminal triggers executioncollect(), count(), etc. materialize the pipeline.
  3. Channel chain — anyio MemoryObjectStream pairs connect each stage with bounded backpressure.
  4. Structured concurrency — all tasks run inside a single TaskGroup. Cleanup is automatic.
  5. Fan-outworkers=N clones the receive stream so N workers pull from the same channel (first-available-wins).

Development

# Install dependencies
uv sync --all-extras

# Run tests (both asyncio and trio backends)
uv run pytest

# Lint & format
uv run ruff check .
uv run ruff format .

Inspiration & Acknowledgements

anyiostream stands on the shoulders of excellent projects:

  • aiostream by Vincent Michel — pioneered composable async stream operators with pipe syntax for asyncio. anyiostream's | pipe.map(fn) API is directly inspired by aiostream's elegant design.

  • anyio by Alex Grönholm — the structured concurrency foundation that makes anyiostream backend-portable. Memory object streams and task groups from anyio are the core execution primitives.

  • trio by Nathaniel J. Smith — pioneered structured concurrency in Python and inspired anyio's design. Trio's philosophy of "make concurrency correct by default" deeply influences anyiostream's automatic cleanup guarantees.

  • Rust's Result<T, E> — the Ok/Err pattern for railway-oriented error handling. anyiostream's try_map, recover, and collect_split bring this pattern to async Python pipelines, letting errors flow as values instead of crashing silently.

License

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

anyiostream-0.0.1a0.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

anyiostream-0.0.1a0-py3-none-any.whl (16.2 kB view details)

Uploaded Python 3

File details

Details for the file anyiostream-0.0.1a0.tar.gz.

File metadata

  • Download URL: anyiostream-0.0.1a0.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for anyiostream-0.0.1a0.tar.gz
Algorithm Hash digest
SHA256 91fbfe3433f6846bc91e273822353e9c8c2f50e15b56a179ac7ae19a51883a79
MD5 cb305e9c4cdbf1589f3963898b322794
BLAKE2b-256 29b2365f4c3ac239a51b87351109d3618ea82a090a8529e6765ec10f42f202cb

See more details on using hashes here.

Provenance

The following attestation bundles were made for anyiostream-0.0.1a0.tar.gz:

Publisher: distribute.yml on hotung1027/anyiostream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file anyiostream-0.0.1a0-py3-none-any.whl.

File metadata

  • Download URL: anyiostream-0.0.1a0-py3-none-any.whl
  • Upload date:
  • Size: 16.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for anyiostream-0.0.1a0-py3-none-any.whl
Algorithm Hash digest
SHA256 db2d3bb38f19d81e3b39d627180715525a6817575c226dc0573721de721333b4
MD5 8e94522be1c9303f570ce471a039077f
BLAKE2b-256 53df8bd1530e31f03e85a77a9ba32720461eb92e40b178f8f9ae7264b8346fde

See more details on using hashes here.

Provenance

The following attestation bundles were made for anyiostream-0.0.1a0-py3-none-any.whl:

Publisher: distribute.yml on hotung1027/anyiostream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page