Skip to main content

Asynchronous job processing framework with dynamic worker scaling

Project description

flowrhythm

CI codecov License: MIT Python 3.13+ Ruff

Asynchronous, auto-scaling job pipeline for Python

flowrhythm is an asyncio-native framework for stream processing pipelines. Define a pipeline as a sequence of plain async functions, then tune scaling and queues per stage at runtime.

Contents


When to use flowrhythm

flowrhythm is the right fit when:

  • You need to process a stream of items through several async stages
  • The work is I/O- or external-process-bound (HTTP calls, subprocesses, DB writes)
  • You want per-stage auto-scaling so slow stages don't stall the pipeline
  • You need to stay in-process — no broker, no external queue infrastructure

It's the wrong fit when:

  • Items are independent fire-and-forget tasks with no chain — use Celery or arq
  • The work is CPU-bound in Python — use multiprocessing or hand off to subprocesses
  • You need distributed processing across machines — use Dask, Ray, or Beam
  • You need persistent / replayable streams — use Kafka + Faust or a real stream processor

Why not just asyncio.Queue + tasks?

You can build a pipeline with asyncio.Queue and a few create_task calls, and for a one-stage pipeline that's the right call. flowrhythm becomes worth it when you have multiple stages with different throughput, want auto-scaling per stage, branching, per-worker resources (DB connections, subprocesses), and graceful drain — wiring those by hand is tedious and easy to get wrong.

Why not Celery or arq?

Those are task queues — items don't flow through stages, they're independent jobs. flowrhythm is pipeline-shaped: each item goes through the same chain of transformations. If your work is "process this one job, return a result," use a task queue. If it's "stream items through filter → transform → enrich → store," use flowrhythm.

Why not Faust or Apache Beam?

Those are stream-processing frameworks for real Kafka-scale stream workloads, with persistent state, exactly-once semantics, and distributed execution. flowrhythm is in-process, asyncio-native, and explicitly does not provide persistence or exactly-once delivery (see Guarantees). It's the right choice when your stream lives entirely in one process and you want a lightweight async-native API.


Installation

pip install flowrhythm

Not yet published. Use pip install . locally from source.


Quick Start

A flow is a chain of async stages. The last stage consumes the items (the "sink"). Activate the chain by passing a source generator to run().

from flowrhythm import flow

async def double(x):
    return x * 2

async def write(x):
    print("stored:", x)

async def items():
    for i in range(10):
        yield i

await flow(double, write).run(items)

That's it — three async functions, one chain, one call to drive it. Real-world pipelines add branching, scaling, error handling, and more — see Designing a flow and Driving a flow.


Stages

A flow is a sequence of stages. Items enter at the first stage's input queue, flow through each stage, and the last stage consumes them (the sink). Where items come from is decided when you activate the flow — see Driving a flow.

What's inside a stage

Each stage owns:

  • Input queue — items waiting to be processed (configurable: FIFO, LIFO, priority)
  • Worker pool — N async tasks pulling from the input queue, processing items, pushing results into the next stage's input queue
  • Scaling strategy — decides N based on live stats (queue length, worker utilization)

A queue lives between two stages. The downstream stage owns it. Configuring a queue (flow.configure("normalize", queue=priority_queue, queue_size=10)) configures that stage's input queue — which is the upstream stage's destination.

                                    ┌─ stage owns input queue ─┐
                                    │                          │
                                    ▼                          ▼
   ┌──────────────┐   queue1   ┌──────────────┐   queue2   ┌──────────────┐   queue3   ┌──────┐
   │ item source  │ ─────────▶ │ transformer1 │ ─────────▶ │ transformer2 │ ─────────▶ │ sink │
   │ (external)   │            │   (N wkrs)   │            │   (M wkrs)   │            │(K wkr)│
   └──────────────┘            └──────────────┘            └──────────────┘            └──────┘
                                  │   │   │
                                  ▼   ▼   ▼
                              scaling strategy
  • 3 stages → 3 queues (one fronts each stage; the source feeds queue1)
  • queue1 is transformer1's input (and the item source's destination)
  • queue3 is sink's input (and transformer2's destination)
  • Sink has no output queue — items terminate there

The item source is external to the flow. It's whatever drives items into queue1 — see Driving a flow.

Async-only. All stages must be async. Sync functions and sync context managers are rejected at construction. For sync code, wrap with asyncio.to_thread or use the sync_stage() helper (see Public API).

Transformer

A middle stage. Multiple workers, auto-scaled (including scale-to-zero).

There are two per-item transformers (the framework invokes them once per item):

Shape you write When to use
async def f(x) -> y Simple stateless processing — parse, transform, enrich
Factory () -> AsyncContextManager whose __aenter__ returns an async def fn(x) -> y Per-worker resource lifecycle — subprocess, DB connection, HTTP session

And two sub-pipelines (multi-stage units that the framework expands into the parent pipeline at construction):

Shape you write What happens
A Flow (from flow(...)) Sub-flow's stages are stitched into the parent pipeline; each keeps its own queue + workers + scaling. See Composing flows.
A Router (from router(...)) Each arm becomes a sub-pipeline; classifier dispatches items to the chosen arm. See Routing.

Plain async function

async def normalize(x):
    return x.strip().lower()

CM factory — function form

Use when the stage needs setup/teardown per worker (a connection, a subprocess):

from contextlib import asynccontextmanager

@asynccontextmanager
async def with_db():
    db = await connect_db()
    async def fn(x):
        return await db.process(x)
    yield fn
    await db.close()

CM factory — class form

A class with no-arg constructor and __aenter__ / __aexit__ is also a factory:

class WithDB:
    async def __aenter__(self):
        self.db = await connect_db()
        async def fn(x):
            return await self.db.process(x)
        return fn
    async def __aexit__(self, *exc):
        await self.db.close()

Sub-flow

A Flow used as a stage. Inlined into the parent at construction:

preprocess = flow(decode, validate)        # see Composing flows

Router

A Router used as a stage. Dispatches items to one of several arms:

split = router(classify, fast=quick, slow=heavy)   # see Routing

Sink (implicit — last stage)

There is no separate "sink" type. The last stage in flow() plays the sink role when run autonomously: its return value is dropped, and it is what the items "land on."

async def db_write(x):
    await db.insert(x)

chain = flow(normalize, db_write)    # db_write is the last stage → sink under run()
await chain.run(reader)              # reader's items → normalize → db_write (consumes)

The same chain composed inside another flow has no sink — its last stage's output flows into the parent's downstream queue:

inner = flow(normalize, enrich)      # last stage = enrich
outer = flow(inner, db_write)        # inner is a transformer; enrich's output → db_write
await outer.run(reader)

Same flow(normalize, enrich) definition; sink behavior is determined by context, not declaration.

How each shape behaves at runtime

Knowing how each shape executes helps you reason about resources, lifecycle, and side effects. This section describes what you should expect — not the framework's internals.

Plain async function — async def fn(x) -> y

Called once per item. No setup, no teardown.

The framework guarantees:

  • Your function receives one item, returns one result.
  • Multiple workers call your function concurrently — design it to be safe under concurrency, or use the CM factory shape for per-worker state.
  • Function-local state (counters, buffers) does NOT persist across calls. If you need per-worker state, use the CM factory shape.

CM factory — () -> AsyncContextManager[Callable]

Called once per worker to set up resources, then the yielded callable is invoked once per item until the worker stops.

The framework guarantees:

  • Your factory is called as factory() once at worker startup; __aenter__ runs; the yielded callable is reused for every item that worker processes.
  • __aexit__ runs once when the worker stops (whether on normal shutdown, scale-down, stop(), or even unhandled exceptions).
  • Anything you set up in the factory body lives for the worker's lifetime — that's where you bind per-worker state (a connection, a subprocess, a model).
  • Each worker has its own context — N workers means N independent setup/teardown cycles. No sharing.

You pass the factory itself, not a built CM:

flow(with_db, db_write)              # ✓ pass the factory
flow(with_db(), db_write)            # ✗ pass a built CM (only enterable once)

Both @asynccontextmanager-decorated functions and classes implementing __aenter__/__aexit__ (with a no-arg constructor) satisfy the factory shape.

Sub-flow — Flow

When a Flow appears inside another flow(...), its stages are folded into the parent pipeline at construction. See Composing flows under "Designing a flow" for the full treatment, including how sub-flow stage names work and how to override sub-flow config from the parent.

Router — Router

When a Router appears inside flow(...), each arm becomes its own sub-pipeline. See Routing under "Designing a flow" for the full treatment, including the effective pipeline shape, arm dispatch, and behavior on classifier miss.

Worker lifecycle and scaling

Workers come and go based on the scaling strategy:

Event Plain async fn CM factory
Worker spawned (scale up) Reference captured factory() called → __aenter__ runs → resource acquired
Worker processes item await fn(item) await fn(item) (using resource from __aenter__)
Worker stopped (scale down) Coroutine cancelled __aexit__ runs → resource released
Stage scales 0 → 1 First worker spawned, processes pending items First worker spawned, factory called, resource acquired (latency on first item)
Stage scales N → 0 All workers cancelled All __aexit__ run; all resources released

Error Handler

Pipeline-level. Receives typed events describing failures and drops. One per flow. See Error Handling for the full set of event types and behavior.

from flowrhythm import TransformerError, SourceError, Dropped

async def on_error(event):
    match event:
        case TransformerError(item, exc, stage):
            log.error("stage %s failed on %r: %s", stage, item, exc)
        case SourceError(exc):
            log.critical("source failed: %s", exc); raise
        case Dropped(item, stage, reason):
            log.warn("dropped %r at %s: %s", item, stage, reason.name)

Scaling Strategy

Decides how many workers a stage runs based on live StageSnapshot. Built-in: FixedScaling, UtilizationScaling. Or implement the protocol yourself.


Designing a flow

A flow is pure structure — the chain of stages. How items get fed in is decided at activation time (run() or push()). Configuration (scaling, queues) is separate from both structure and activation.

Linear

chain = flow(normalize, db_write)
await chain.run(reader)

Routing

router() dispatches each item to one of several arms based on a classifier function. It's a sub-pipeline, not a function call — each arm becomes its own mini-pipeline inside the parent, with its own input queue and worker pool.

Signature

router(classifier, **arms, default=None)
  • classifierasync def (item) -> label — returns the keyword name of the arm to dispatch to
  • **arms — keyword args mapping label → Transformer (function, CM factory, chain, or full flow)
  • default — optional fallback Transformer for unmatched labels (if omitted, unmatched items become Dropped events)

Example

heavy_path = flow(decode, heavy)

main = flow(
    normalize,
    router(classify,
        fast=quick,             # plain async function
        slow=heavy_path,        # transform chain (a Flow)
        default=passthrough,    # optional fallback
    ),
    db_write,
)
await main.run(items)

What runs at runtime

The router's classifier runs as a single stage. Each arm becomes a sub-graph that flows back into the same downstream queue:

                              ┌─ quick ──────────────────────────┐
normalize → [q] → classify ──┤                                  ├──► [q] → db_write
                              └─ heavy_path.decode → heavy_path.heavy ─┘

Each arm has its own queue and worker pool. Outputs of all arms feed the same downstream queue (the stage after the router). Sub-flow arms are inlined the same way as sub-flow stages elsewhere — see Composing flows.

Behavior on classifier miss

If the classifier returns a label that has no matching arm and no default is set, the item is dropped and the error handler receives a Dropped(item, stage, reason=DropReason.ROUTER_MISS) event. The pipeline continues by default. If you want to abort on misses, raise from your error handler — see Error Handling.

Composing flows

A flow can be embedded as a stage in another flow. The framework folds the sub-flow's stages into the parent pipeline at construction — sub-flows are not function calls; their stages become real stages of the parent. Each sub-stage retains its own queue, worker pool, scaling, and config. Activation (run, push) only happens on the outermost flow.

ingest = flow(parse, validate)
ingest.configure("validate", scaling=UtilizationScaling(min_workers=1, max_workers=8))

main = flow(ingest, db_write)
await main.run(items)

What runs at runtime

The effective graph that runs is:

items → [queue] → ingest.parse → [queue] → ingest.validate → [queue] → db_write

Four stages, three queues. ingest.validate keeps its 1–8 worker pool exactly as ingest.configure specified — composition does not change scaling. The first ingest.parse worker pushes its output into ingest.validate's input queue and moves on. Items flow through autonomously.

Stage names in the parent

Sub-flow stages get the sub-flow's variable name as a prefix in the parent: ingest.parse, ingest.validate. You can override their config from the parent using the dotted name:

main.configure("ingest.parse", scaling=FixedScaling(workers=2))

Sub-flows can contain sub-flows; names compose dotted (outer.middle.inner.stage).

The same flow runs standalone or composed

await ingest.run(items)   # standalone — last stage (validate) is the sink

The same ingest definition behaves identically whether you run() it directly or embed it in another flow. Composition is purely structural; nothing about the sub-flow's behavior changes.

Naming

Stage names are auto-derived from function names. Collisions get a numeric suffix:

chain = flow(normalize, normalize, db_write)
# stage names: "normalize", "normalize_2", "db_write"

Override when needed:

from flowrhythm import stage

chain = flow(
    stage(normalize, name="parse"),
    db_write,
)

Configuring a flow

Configuration is operational — it tunes how the flow runs without changing what it does. Two equivalent styles are available: constructor keywords (one-shot setup) and methods (incremental, e.g. reading from a config file).

Constructor keywords (shorthand)

from flowrhythm import flow, FixedScaling, priority_queue

chain = flow(
    normalize, db_write,
    on_error=on_error,
    default_scaling=FixedScaling(workers=2),
    default_queue=priority_queue,
    default_queue_size=10,
)
await chain.run(items)

on_error, default_scaling, default_queue, and default_queue_size cover flow-level configuration. Per-stage configuration (different scaling for one specific stage) needs the method form.

Methods (incremental)

from flowrhythm import flow, FixedScaling, UtilizationScaling, priority_queue

chain = flow(normalize, db_write)

# Pipeline-wide defaults
chain.configure_default(scaling=FixedScaling(workers=2))

# Per-stage tuning (only via method — no constructor shorthand)
chain.configure("normalize", scaling=UtilizationScaling(max_workers=8))
chain.configure("normalize", queue=priority_queue, queue_size=10)
chain.configure("db_write", queue_size=20)   # bump just the buffer; default queue type

# Error handler
chain.set_error_handler(on_error)

await chain.run(items)

The two forms are fully equivalent — pick whichever fits your code shape.

Same chain, different environments

def build():
    return flow(normalize, db_write)

dev = build()
dev.configure_default(scaling=FixedScaling(workers=1))
await dev.run(test_source)              # batch from a small fixture

prod = build()
prod.configure("normalize", scaling=UtilizationScaling(max_workers=32))
await prod.run(kafka_source)            # stream from production

Scaling Strategies

FixedScaling

Constant worker count. Requires workers >= 1 (use UtilizationScaling for scale-to-zero).

FixedScaling(workers=4)

UtilizationScaling

Adjusts worker count based on the busy/idle ratio. Supports scale-to-zero via min_workers=0.

UtilizationScaling(
    min_workers=0,           # never scale below this
    max_workers=8,           # never scale above this
    lower_utilization=0.2,   # if busy/total < 0.2, consider scaling down
    upper_utilization=0.8,   # if busy/total > 0.8, consider scaling up
    upscaling_rate=2,        # max workers added per scale-up decision
    downscaling_rate=1,      # max workers removed per scale-down decision
    cooldown_seconds=5.0,    # minimum seconds between scale events (anti-flap)
    dampening=0.5,           # multiplier on rate — 0.5 means add/remove half
    sampling_period=2.0,     # only consider scaling every N seconds
    sampling_events=50,      # only consider scaling every N items
)

How it works

On every item enqueue/dequeue, the strategy checks whether to scale. Two gates run first:

  1. Sampling — if sampling_period or sampling_events is set, scaling decisions are throttled (only consider every N seconds or every N events). This avoids checking on every single item in high-throughput stages. If both are set, both must pass.
  2. Cooldown — if a scale event happened within the last cooldown_seconds, do nothing. Prevents flapping between scale-up and scale-down.

If both gates pass, the strategy reads utilization = busy_workers / total_workers and decides:

  • utilization > upper_utilization and workers < max_workers → scale up by upscaling_rate × dampening (rounded down, minimum 1)
  • utilization < lower_utilization and workers > min_workers → scale down by downscaling_rate × dampening (rounded down, minimum 1)
  • Otherwise → no change

Tuning guide

  • Bursty workload — set higher upscaling_rate (e.g. 4-8) and lower cooldown_seconds (e.g. 1.0) to react quickly to spikes
  • Steady workload — keep defaults; cooldown_seconds=5.0 and rate of 1-2 is calm and predictable
  • Expensive workers (subprocesses, GPU models) — keep dampening low (0.3-0.5) to scale conservatively
  • High-throughput stage — set sampling_events=100 or sampling_period=1.0 to avoid overhead from per-item scaling decisions

Custom strategy

Implement the protocol — note the methods are synchronous:

class MyStrategy:
    def initial_workers(self) -> int: ...
    def on_enqueue(self, stats: StageSnapshot) -> int: ...
    def on_dequeue(self, stats: StageSnapshot) -> int: ...

Return positive to add workers, negative to remove, 0 for no change.

Strategies must be sync. Scaling decisions are called on every item event (potentially millions/sec); awaiting external services here would block the hot path. If you need external state for a decision, refresh it in a background task and read it synchronously here.

Worker rules

  • The source (when passed to run()) is consumed by exactly one task — async generators cannot be safely consumed concurrently
  • All stages in the chain can scale, including down to zero
  • A worker holds its async context manager for its lifetime; releasing happens on shutdown
  • First item after 0→1 transition pays the resource-acquire cost

Error Handling

Errors and drops are reported to a single pipeline-level error handler as typed events. The handler is observer-only — it logs, accounts, or forwards events, but it does not control pipeline flow:

  • Returns normally → pipeline continues; the failed item is dropped
  • Raises → the exception is logged to stderr; pipeline still continues; the failed item stays dropped

This means a buggy handler (a logger that throws, a metric backend that's down) cannot abort your pipeline. To stop a run based on what the handler observes, call chain.stop() from outside — see Aborting from inside the handler below.

Two layers of error handling, in order:

  1. Inside the transformer (preferred). Catch and handle there: retry, return a sentinel value, drop silently. Most error logic should live here because the transformer has full context.
  2. Pipeline error handler (last resort). Whatever escapes the transformer (or comes from the source / framework decisions like Dropped) is routed here.

Event types

from dataclasses import dataclass
from enum import Enum

@dataclass
class TransformerError:
    item: Any
    exception: Exception
    stage: str            # name of the stage that failed

@dataclass
class SourceError:
    exception: Exception  # raised inside the source generator

class DropReason(Enum):
    UPSTREAM_TERMINATED   # Last(value) upstream caused this item to be discarded
    ROUTER_MISS           # router classifier returned an unknown arm and no default

@dataclass
class Dropped:
    item: Any
    stage: str
    reason: DropReason

Writing a handler

from flowrhythm import flow, TransformerError, SourceError, Dropped

async def on_error(event):
    match event:
        case TransformerError(item, exc, stage):
            log.error("stage %s failed on %r: %s", stage, item, exc)
        case SourceError(exc):
            log.critical("source failed: %s", exc)
            # source is exhausted; pipeline drains naturally
        case Dropped(item, stage, reason):
            log.warn("dropped %r at %s (%s)", item, stage, reason.name)

chain = flow(normalize, db_write)
chain.set_error_handler(on_error)
await chain.run(items)

Default behavior (no handler set)

Event Default
TransformerError Logged to stderr, pipeline continues
SourceError Logged to stderr, pipeline drains (source is treated as exhausted)
Dropped Silent continue

Set a handler whenever you need different behavior for any of these.

Aborting from inside the handler

The handler cannot abort the pipeline directly — raising just gets logged. To stop a run based on what the handler observes (e.g. "too many errors"), track state and call chain.stop() from the task driving the run:

errors = 0

async def on_error(event):
    nonlocal errors
    if isinstance(event, TransformerError):
        errors += 1

run_task = asyncio.create_task(chain.run(items))
while not run_task.done():
    if errors > 100:
        await chain.stop()
        break
    await asyncio.sleep(0.5)
await run_task

This separates "observe errors" (handler) from "decide to stop" (caller).


Driving a flow

The flow definition (flow(...)) describes the chain of stages. To make items actually flow through it, you have to activate the flow. There are three ways:

Mode What feeds items into the chain Termination
Boundedawait chain.run(source) An async generator you supply Generator exhausts → drain → exit
Unboundedawait chain.run() The framework emits None signals indefinitely External chain.stop() or first stage raises
Pushasync with chain.push() as handle: await handle.send(item) You push items via a PushHandle handle.complete() (explicit or via async with exit)

Push mode returns a PushHandle from chain.push(); send() and complete() live on the handle, not on Flow. stop() is always available on Flow for graceful shutdown.

Bounded — run(source)

You pass an async generator — the source. The framework iterates it, pushing each yielded item into the chain's first queue. When the generator is exhausted, the pipeline drains and exits.

async def items():
    for i in range(100):
        yield i

chain = flow(normalize, db_write)
await chain.run(items)        # ← pass the function, not items()

Pass the generator function itself (items), not a call to it (items()). The framework owns iteration — this lets it manage the source lifecycle (close cleanly on drain(), re-iterate on retry in future versions). Passing items() raises with a clear error pointing this out.

Why is the source consumed by exactly one task?

The framework consumes the source generator with exactly one task — never more. Async generators hold internal iteration state (where in the loop, last value yielded), and they cannot be safely consumed by multiple tasks:

  • Two tasks calling the generator function → each gets its own independent generator → both yield the same sequence → duplicates downstream.
  • Two tasks sharing one generator instance → race conditions on the iteration state → undefined behavior, lost or repeated items.

So if you want to ingest from multiple sources in parallel, see parallel ingestion below — don't try to scale the source itself.

Source can be a CM factory

If your source needs setup/teardown (open a Kafka connection, then stream from it), pass a no-arg factory returning an AsyncContextManager whose __aenter__ yields an async generator:

@asynccontextmanager
async def kafka_source():
    consumer = await connect_kafka()
    async def gen():
        async for msg in consumer:
            yield msg
    yield gen()
    await consumer.close()

await chain.run(kafka_source)

Unbounded — run() (no source)

When you don't pass a source, the framework auto-emits None signals into the first stage's queue indefinitely. Useful for "just keep working" pipelines where the first stage knows how to fetch its own data.

async def fetch(_):
    return await kafka.next()

chain = flow(fetch, normalize, db_write)
asyncio.create_task(chain.run())
# ... later, when you want to stop
await chain.stop()

Push — chain.push() + send()

You drive the flow yourself by pushing items via a PushHandle. Useful for embedding flowrhythm into a web server, WebSocket handler, or any event-driven context where items arrive from outside the flow's control.

chain = flow(normalize, db_write)

async with chain.push() as handle:
    await handle.send(item1)
    await handle.send(item2)
# on exit: handle.complete() is called automatically; chain drains; workers shut down

handle.send() blocks if the downstream queue is full — natural backpressure.

If you need to signal end-of-stream before the async with exits (e.g., the producing loop ended but you still want to do work after), call await handle.complete() explicitly. Subsequent send() calls will raise.

Parallel ingestion

If your bottleneck is fetching from an upstream system (Kafka with high throughput, paginated API with many pages, SQS poller), don't try to parallelize the source. Instead, put the parallelism in a fetcher transformer — the first stage of the chain — which can be scaled freely.

Option 1: tiny trigger source + fetcher transformer

async def trigger():
    while True:
        yield None              # one signal per fetch attempt

async def fetch_message(_):
    return await kafka.poll()   # external state; safe to call from N workers

chain = flow(fetch_message, normalize, db_write)
chain.configure("fetch_message", scaling=UtilizationScaling(min_workers=4, max_workers=32))
await chain.run(trigger)

Option 2: omit the source; framework auto-emits None

chain = flow(fetch_message, normalize, db_write)
chain.configure("fetch_message", scaling=UtilizationScaling(min_workers=4, max_workers=32))
await chain.run()              # no source arg → framework emits None forever

Same outcome, less boilerplate. The fetcher stage scales to as many workers as you need; each calls the upstream system independently.

Stopping from inside a transformer — Last(value)

Sometimes the chain itself knows when to stop — a transformer that processes items until it sees a terminator marker, or a "process until X" pattern. Wrap the return value in Last(value) to signal "this is the final item":

from flowrhythm import Last

async def process_until_done(item):
    if item.is_terminator:
        return Last(process(item))      # process and signal end
    return process(item)

What the user observes:

  1. The wrapped value (process(item)) flows downstream as the final item the sink will see.
  2. The chain immediately stops accepting new items. Anything still upstream of this transformer is dropped — those items will never reach the sink.
  3. Items already past this transformer continue to the sink in order.
  4. chain.run(...) returns once the sink has received the final item.

The framework guarantees: the value wrapped in Last(value) is the absolute last item the sink processes. Nothing comes after it.

Dropped upstream items are reported to the error handler as Dropped(item, stage, reason=DropReason.UPSTREAM_TERMINATED) events — by default, dropped items continue silently. If you need to log or capture them, set an error handler.

Stopping a running flow

The three activation modes above are the only public ways to drive a flow. Combined with stop() (abort) and drain() (graceful), they cover every legitimate scenario.

# Graceful shutdown
task = asyncio.create_task(chain.run(source))
# ...
await chain.drain()       # waits for items in flight to finish
await task                # task returns normally

# Abort
task = asyncio.create_task(chain.run(source))
# ...
await chain.stop()        # cancels workers, drops in-flight items
await task

What drain() does to your source

Mode What the user observes
run(source) Framework calls source.aclose() on your generator; if you have try/finally cleanup in the generator, it runs. Then in-flight items finish.
run() (no source) Framework stops emitting None signals. In-flight items finish.
chain.push() The next send() raises. In-flight items finish. (Usually you don't need to call drain() explicitly here — exit the async with instead.)

In all cases, drain() returns once every item that entered the chain has reached the sink or the error handler.

Push-mode shortcuts

Exiting the async with chain.push() as h: block automatically calls h.complete() and waits for the chain to drain — no explicit drain() needed. To abort instead of draining, call chain.stop() from another task.


Inspecting a flow

chain.dump() renders the expanded pipeline graph for debugging. Three formats:

print(chain.dump())                       # text (default)
print(chain.dump(format="mermaid"))       # paste into a markdown file
data = json.loads(chain.dump(format="json"))

For a flow like:

chain = flow(
    parse,
    validate,
    stage(router(classify, fast=fast, slow=heavy_path), name="dispatch"),
    db_write,
)

chain.dump() shows:

flow (5 stages):
  [0] parse                              → [1]                          FixedScaling(workers=1) fifo_queue(maxsize=1)
  [1] validate                           → [2]                          FixedScaling(workers=1) fifo_queue(maxsize=1)
  [2] dispatch [classifier]              arms: {fast→[3], slow→[4]}     FixedScaling(workers=1) fifo_queue(maxsize=1)
  [3] dispatch.fast (arm-end)            → [5] (merge)                  FixedScaling(workers=1) fifo_queue(maxsize=1)
  [4] dispatch.slow (arm-end)            → [5] (merge)                  FixedScaling(workers=1) fifo_queue(maxsize=1)
  [5] db_write                           (sink)                         FixedScaling(workers=1) fifo_queue(maxsize=1)

Sub-flow stages and router arms appear with dotted names (dispatch.fast, ingest.parse) — composition is visible at a glance.

For live runtime stats, use chain.dump(mode="stats"). Readable both during a run AND after it finishes:

print(chain.dump(mode="stats"))
# flow stats (completed):
#   [0] parse              0 alive (0 busy, 0 idle)  queue=0 [drained]  processed=20  errors=0
#   [1] boom_sometimes     0 alive (0 busy, 0 idle)  queue=0 [drained]  processed=16  errors=4
#   [2] dispatch           0 alive (0 busy, 0 idle)  queue=0 [drained]  processed=16  errors=0
#   ...
#
# events:
#   transformer errors: 4
#   source errors:      0
#   drops: 0

Per-stage processed and errors counters plus aggregate event totals. Useful for spotting bottlenecks (look at queue_length and busy/idle ratios) and validating health (errors and drops). Mermaid format isn't supported for stats (stats aren't graph-shaped).


Troubleshooting

Common failure modes and what to do about them. Each entry is symptom → why → fix.

TypeError: pass the generator function, not the called generator

Symptom:

await chain.run(items())   # raises TypeError

Why: The framework owns iteration over your source — that's what lets it close the generator on drain(), manage retries, and otherwise control the source lifecycle. A pre-instantiated generator can only be iterated once and from one place; the framework can't hand it back.

Fix: Pass the function itself (no parentheses):

await chain.run(items)

TypeError: transformer 'foo' is sync

Symptom:

def foo(x):           # def, not async def
    return x.upper()

flow(foo)             # raises at construction

Why: Sync functions block the event loop, freezing every other stage. The framework rejects them at construction so the failure mode is loud, not silent.

Fix: Either rewrite as async def, or wrap with sync_stage() for code that has to stay sync (CPU-bound work, third-party libs):

from flowrhythm import sync_stage

flow(sync_stage(foo))   # runs each call in a thread via asyncio.to_thread

Items disappear without reaching the sink

Symptom: Some inputs never arrive at the last stage. No exception, no obvious error.

Why: Items can vanish for several reasons; the framework treats them all as observable events but stays silent unless you set a handler:

What happened How to spot it
Transformer raised; default handler logged to stderr but pipeline continued Check stderr; or chain.dump(mode="stats") shows errors > 0 and events.transformer_errors > 0
Router classifier returned an unknown label and there was no default arm Stats shows events.drops.ROUTER_MISS > 0
Last(value) fired upstream — items already in flight got cut off Stats shows events.drops.UPSTREAM_TERMINATED > 0
Source generator raised; default handler logged + drained Stats shows events.source_errors > 0

Fix: Set an on_error handler so you see what's being dropped:

async def on_error(event):
    match event:
        case TransformerError(item, exc, stage):
            log.error("stage %s failed on %r: %s", stage, item, exc)
        case Dropped(item, stage, reason):
            log.warn("dropped %r at %s: %s", item, stage, reason.name)
        case SourceError(exc):
            log.critical("source died: %s", exc)

chain = flow(..., on_error=on_error)

Pipeline hangs and never returns

Symptom: await chain.run(...) doesn't return. Source long since exhausted.

Why: Almost always a transformer that never finishes — infinite loop, blocking I/O, deadlock on an external lock.

Fix: Use a watchdog to dump stats while the run is in progress. Look for stages with high busy count that don't change between dumps — those are stuck:

async def watchdog():
    while True:
        await asyncio.sleep(5)
        print(chain.dump(mode="stats"))

asyncio.create_task(watchdog())
async with asyncio.timeout(60):    # bound the run while debugging
    await chain.run(items)

If you can't reproduce it locally, wrap the run in asyncio.timeout(...) in production and call chain.stop() on timeout — that cancels stuck workers and cleanly runs their resource cleanup (__aexit__).

Last(value) isn't actually the last item the sink sees

Symptom: You return Last(value) from a transformer, but the sink processes items after value.

Why: With multiple workers in a downstream stage, item completion order isn't preserved across workers. The framework guarantees value enters the destination queue last; with a single-worker downstream that's also the last item processed, but with N>1 workers two items can be in flight simultaneously and finish in either order.

Fix: Configure the receiving stage with one worker:

chain.configure("sink", scaling=FixedScaling(workers=1))

This applies to the stage that consumes value — usually the sink, sometimes a single-worker formatting stage right before it.

RuntimeError: cannot send() after complete() in push mode

Symptom:

async with chain.push() as h:
    await h.send(1)
await h.send(2)               # raises RuntimeError

Or:

async with chain.push() as h:
    await h.send(1)
    await h.complete()
    await h.send(2)            # raises RuntimeError

Why: complete() is irreversible — it signals end-of-stream and starts the drain cascade. Exiting the async with block calls it automatically.

Fix: Keep all send() calls inside the async with block (or before any explicit complete()). The async with exit handles complete() for you:

async with chain.push() as h:
    for item in source:
        await h.send(item)
    # complete() called automatically here

Worker count stays high under UtilizationScaling after load drops

Symptom: After a burst of items, dump(mode="stats") shows the worker count hasn't shrunk back.

Why (most common): Scaling decisions fire on item enqueue/dequeue events. If items stop flowing entirely, no decision happens — the worker count stays at whatever it was. Workers are idle (waiting on get) but alive.

Less common reasons:

  • Cooldown period. UtilizationScaling(cooldown_seconds=5.0) prevents another scale event within 5 seconds of the last one.
  • Sampling gates. If you set sampling_period or sampling_events, decisions are throttled to those rates.
  • min_workers > 0. Scaling never goes below min_workers. For full scale-to-zero, set min_workers=0 (and you must use UtilizationScalingFixedScaling requires workers >= 1).

Fix: Check your strategy parameters. To force a scale-down decision, send another item (or use a periodic "tick" trigger). To allow scale-to-zero, use UtilizationScaling(min_workers=0, ...).

Pipeline doesn't backpressure — memory grows unbounded

Symptom: Memory usage grows during long runs even though the sink can keep up.

Why: A stage with queue_size=0 (unbounded) doesn't backpressure — its upstream can push items faster than it processes them. Default queue size is 1 precisely to prevent this. If you've explicitly opted into a larger or unbounded queue, you're trading backpressure for buffering.

Fix: Default to small queues and only increase per stage where bursty buffering helps:

chain.configure("bursty_stage", queue_size=100)   # explicit, narrow scope

Avoid queue_size=0 (unbounded) unless you have an external bound on input rate.

Where to look first

If something's wrong:

  1. chain.dump(mode="structure") — does the topology match what you wrote? Are stage names what you expected?
  2. chain.dump(mode="stats") — are items flowing? Where are they piling up? How many errors / drops?
  3. Set an on_error handler if you haven't — silent drops become loud.
  4. Wrap the run in asyncio.timeout(...) when debugging hangs, and call chain.stop() on timeout to inspect cleanup.

Public API

Construction

Symbol Kind Purpose
flow(*stages, on_error=None, default_scaling=None, default_queue=None, default_queue_size=None) function Construct a flow from a sequence of stages. Optional kwargs are shorthand for set_error_handler / configure_default
router(classifier, **arms, default=...) function Construct a router for branching
stage(fn, name=...) function Override the auto-derived name of a stage
sync_stage(fn) function Wrap a sync function with asyncio.to_thread so it can be used as an async stage

Activating a flow

Method on Flow Purpose
flow.run(source=None) Run autonomously. With an async-generator source: bounded; without a source: unbounded (framework emits None signals)
flow.push() Enter push mode — returns an AsyncContextManager[PushHandle]
flow.drain() Graceful shutdown: stop the source (or aclose() your generator), wait for in-flight items to finish, then return
flow.stop() Abort: cancel workers, drop in-flight items, release resources
Method on PushHandle (returned by flow.push()) Purpose
handle.send(item) Push an item into the flow's first queue (blocks if queue is full)
handle.complete() Signal end-of-stream (called automatically on async with exit)

Configuration and inspection (Flow methods)

Method Purpose
flow.configure(name, scaling=..., queue=..., queue_size=...) Per-stage tuning. queue= is a queue factory (fifo_queue, priority_queue, …); queue_size= is the queue's maxsize. Either or both may be set independently.
flow.configure_default(scaling=..., queue=..., queue_size=...) Pipeline-wide defaults; same kwargs as configure()
flow.set_error_handler(handler) Set the error sink for uncaught transformer exceptions
flow.dump(mode="structure"|"stats", format="text"|"mermaid"|"json") Inspect the flow. mode="structure" renders the pipeline graph (text/mermaid/JSON). mode="stats" renders live runtime stats: per-stage worker counts, queue lengths, processed/error counters, drops by reason. Stats supports text + JSON only.

Types and helpers

Symbol Kind Purpose
Flow class Type hint only — def helper(f: Flow) -> Flow. Construct via flow().
Router class Type hint only — produced by router()
PushHandle class Type hint only — returned by flow.push(); provides send() and complete()
Last class Wrap a transformer's return value: return Last(result) to mark this as the final item
TransformerError, SourceError, Dropped dataclasses Event types passed to the error handler
DropReason enum Reasons items get dropped (UPSTREAM_TERMINATED, ROUTER_MISS)
FixedScaling, UtilizationScaling classes Built-in scaling strategies
ScalingStrategy, StageSnapshot protocol / dataclass For implementing custom strategies
fifo_queue, lifo_queue, priority_queue functions Queue factories

Guarantees

Promises the framework makes to the user. If any of these are violated, it's a bug.

Item processing

  • Every item that enters the chain reaches a terminal state before run() returns or async with chain.push() exits. A terminal state is: consumed by the sink, or routed to the error handler.
  • Last(value) is final. If a transformer returns Last(value), the sink will see value as its last item. No item can reach the sink after value.

Resources

  • Per-worker context managers always have __aexit__ called when the worker stops, even on stop() (immediate abort) or unhandled exceptions.
  • Resources are released before run() (or stop()) returns. When the call returns, no workers are alive and no resources are held.

Termination

  • chain.run(source) returns naturally when the source generator completes and the pipeline finishes draining.
  • chain.run(source) does not re-raise source exceptions. A source-generator failure is delivered to the error handler as a SourceError event; the pipeline drains and run() returns normally. To abort on source failure, call chain.stop() from outside (see Error Handling).
  • chain.stop() returns only after every worker has exited and every per-worker resource has been released.
  • chain.drain() returns only after the pipeline is fully drained — no items in flight, all workers idle/exited.

Sources

  • Async generators are consumed by exactly one task. The framework never forks a generator.
  • The same flow definition behaves identically standalone or composed. Embedding a flow as a stage in another flow does not change its per-stage scaling, queues, or configuration.

Order

  • Within a single-worker stage, item order is preserved. Items leave in the same order they arrived.
  • Across multi-worker stages, order is not preserved. With N > 1 workers, items may complete in any order. If you need order, use FixedScaling(workers=1) for that stage.

Backpressure

  • Slow downstream stages naturally throttle upstream. A stage with a full input queue causes the upstream stage to block on put(), which propagates back to the source. There is no item buffering beyond configured queue sizes.

What is not guaranteed

  • Exactly-once delivery. If a transformer raises and the error handler routes it to a log, the item is gone — no retry, no checkpointing.
  • Persistence across process restart. Items in flight when the process dies are lost.
  • Order across router arms. If two arms have different latencies, items from the faster arm may interleave with items from the slower arm in the downstream stage.

Architecture

Design Principles

  • Orchestrator, not a worker. flowrhythm coordinates external heavy work (subprocesses, services, I/O). It is not for running CPU-bound computation inside the Python process.
  • Stream processing pipeline, not a workflow engine. The graph is a DAG — no cycles. Items flow forward and terminate at a sink.
  • Structure and configuration are separate. flow() defines what flows where. .configure() tunes how it runs.
  • Composable. A flow plugs into another flow as a transformer. Complex topologies are built by composition, not by complicating the DSL.
  • Routing via router(). Branching is a regular transformer that dispatches to sub-pipelines by label. Arms converge after the router.
  • Retry/iteration belongs inside a stage, not in the graph topology.

Component overview

The full type/relationship class diagram lives in DESIGN.md. For day-to-day use, the Pipeline Flow diagram below is enough — it shows how items move through the system.

How flow() processes each kind at construction

flow() walks its arguments at construction. Some kinds are sub-pipelines (expanded into the parent pipeline); others are per-item transformers that occupy a single stage.

match stage:
    case Flow() as sub_flow:
        # SUB-PIPELINE: fold sub_flow's stages into the parent pipeline.
        # Stage names get the sub-flow's name as a prefix (e.g. "ingest.parse").
        # Sub-flow's per-stage configuration is preserved unless overridden by the parent.
    case Router() as r:
        # SUB-PIPELINE: each arm becomes its own sub-pipeline. The classifier
        # runs as a single stage that dispatches to arm pipelines.
    case ctx if callable(ctx) and len(inspect.signature(ctx).parameters) == 0:
        # PER-ITEM: CM factory; framework calls factory() per worker,
        # enters the context, uses yielded callable per item.
    case fn if callable(fn) and len(inspect.signature(fn).parameters) == 1:
        # PER-ITEM: plain async function; called per item.

After expansion, every stage in the runtime pipeline is a single per-item transformer (plain async fn, CM factory, or router classifier). Each stage owns one input queue, a worker pool, and a scaling strategy.

Pipeline Flow

flowchart TD
    Src[/Item source/] --> B[Transformer]
    B --> C{Exception?}
    C -- No --> D[Next stage / last stage drops]
    C -- Yes --> E[Error Sink]
    B -. "router()" .-> F{Classifier}
    F -- arm 1 --> G[Transformer / Chain / Flow]
    F -- arm 2 --> H[Transformer / Chain / Flow]
    F -- unknown --> I[default arm, or Dropped → Error Handler]
    G --> D
    H --> D
    I --> D
    I -. drop .-> E

Item source is one of:

  • A source generator passed to run(source) — bounded
  • Auto-emitted None signals from run() — unbounded
  • Items pushed via handle.send() after async with chain.push() as handle — push mode

Project status

flowrhythm is in early development. The DSL and runtime are still settling — see DESIGN.md for design decisions and open questions, and todos/INDEX.md for active plans (in priority order).


License

MIT License. See LICENSE.


Author

Andrey Maximov GitHub

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowrhythm-0.1.0.tar.gz (100.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowrhythm-0.1.0-py3-none-any.whl (44.1 kB view details)

Uploaded Python 3

File details

Details for the file flowrhythm-0.1.0.tar.gz.

File metadata

  • Download URL: flowrhythm-0.1.0.tar.gz
  • Upload date:
  • Size: 100.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for flowrhythm-0.1.0.tar.gz
Algorithm Hash digest
SHA256 f1df2b0ceec5c87e54994265d33df26a5cb5e26ebaf8a403d145e8873716fb80
MD5 373eeb32a1e18f5c45a6acb3794f7850
BLAKE2b-256 b1867b65ae76dcfe7fa0afc4cfce3d9f63c4ccda793fdb5a20953845d2858b40

See more details on using hashes here.

File details

Details for the file flowrhythm-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: flowrhythm-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 44.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for flowrhythm-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d83454c384bf6f3d763d2e5f30366c3d3c6680ad893c9bc2da74a2842077c289
MD5 54e921443a7008507e81dc546db51d2f
BLAKE2b-256 9a828338c47a796f87bf8457a4c2c32105ce137ce81d5b641e91db2fcce806d0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page