Asynchronous job processing framework with dynamic worker scaling
Project description
flowrhythm
Asynchronous, auto-scaling job pipeline for Python
flowrhythm is an asyncio-native framework for stream processing pipelines. Define a pipeline as a sequence of plain async functions, then tune scaling and queues per stage at runtime.
Contents
- When to use flowrhythm — fit signals, and "why not Celery / Faust / asyncio.Queue"
- Installation
- Real-world example — pointer to
examples/video_pipeline.py - Quick Start
- Stages — what's inside a stage, the four transformer shapes, how each is invoked
- Designing a flow — linear, reusable chains, routing, sub-flow composition, naming
- Configuring a flow — per-stage scaling, queues, error handler
- Scaling Strategies —
FixedScaling,UtilizationScaling, custom - Error Handling — typed events, handler-decides-policy
- Driving a flow —
run,push,drain,stop; bounded vs unbounded vs push - Inspecting a flow —
dump(mode="structure")anddump(mode="stats") - Troubleshooting — common failure modes; where to look first
- Public API — full method and type reference
- Guarantees — what the framework promises, and what it doesn't
- Architecture — design principles, component overview, pipeline flow
- Project status — roadmap and active plans
When to use flowrhythm
flowrhythm is the right fit when:
- You need to process a stream of items through several async stages
- The work is I/O- or external-process-bound (HTTP calls, subprocesses, DB writes)
- You want per-stage auto-scaling so slow stages don't stall the pipeline
- You need to stay in-process — no broker, no external queue infrastructure
It's the wrong fit when:
- Items are independent fire-and-forget tasks with no chain — use Celery or arq
- The work is CPU-bound in Python — use multiprocessing or hand off to subprocesses
- You need distributed processing across machines — use Dask, Ray, or Beam
- You need persistent / replayable streams — use Kafka + Faust or a real stream processor
Why not just asyncio.Queue + tasks?
You can build a pipeline with asyncio.Queue and a few create_task calls, and for a one-stage pipeline that's the right call. flowrhythm becomes worth it when you have multiple stages with different throughput, want auto-scaling per stage, branching, per-worker resources (DB connections, subprocesses), and graceful drain — wiring those by hand is tedious and easy to get wrong.
Why not Celery or arq?
Those are task queues — items don't flow through stages, they're independent jobs. flowrhythm is pipeline-shaped: each item goes through the same chain of transformations. If your work is "process this one job, return a result," use a task queue. If it's "stream items through filter → transform → enrich → store," use flowrhythm.
Why not Faust or Apache Beam?
Those are stream-processing frameworks for real Kafka-scale stream workloads, with persistent state, exactly-once semantics, and distributed execution. flowrhythm is in-process, asyncio-native, and explicitly does not provide persistence or exactly-once delivery (see Guarantees). It's the right choice when your stream lives entirely in one process and you want a lightweight async-native API.
Installation
pip install flowrhythm
Requires Python 3.13+. Zero runtime dependencies.
Real-world example
For a full worked example showing per-stage scaling, CM-factory resource handling, a router with sub-flow arms, error handling, and dump() output — see examples/video_pipeline.py. It models a video-processing job (fetch metadata → conditionally transcode with ffmpeg → upload → record in DB) with all external calls mocked, so it runs without yt-dlp / ffmpeg / S3 set up.
python examples/video_pipeline.py
Quick Start
A flow is a chain of async stages. The last stage consumes the items (the "sink"). Activate the chain by passing a source generator to run().
from flowrhythm import flow
async def double(x):
return x * 2
async def write(x):
print("stored:", x)
async def items():
for i in range(10):
yield i
await flow(double, write).run(items)
That's it — three async functions, one chain, one call to drive it. Real-world pipelines add branching, scaling, error handling, and more — see Designing a flow and Driving a flow.
Stages
A flow is a sequence of stages. Items enter at the first stage's input queue, flow through each stage, and the last stage consumes them (the sink). Where items come from is decided when you activate the flow — see Driving a flow.
What's inside a stage
Each stage owns:
- Input queue — items waiting to be processed (configurable: FIFO, LIFO, priority)
- Worker pool — N async tasks pulling from the input queue, processing items, pushing results into the next stage's input queue
- Scaling strategy — decides N based on live stats (queue length, worker utilization)
A queue lives between two stages. The downstream stage owns it. Configuring a queue (flow.configure("normalize", queue=priority_queue, queue_size=10)) configures that stage's input queue — which is the upstream stage's destination.
┌─ stage owns input queue ─┐
│ │
▼ ▼
┌──────────────┐ queue1 ┌──────────────┐ queue2 ┌──────────────┐ queue3 ┌──────┐
│ item source │ ─────────▶ │ transformer1 │ ─────────▶ │ transformer2 │ ─────────▶ │ sink │
│ (external) │ │ (N wkrs) │ │ (M wkrs) │ │(K wkr)│
└──────────────┘ └──────────────┘ └──────────────┘ └──────┘
│ │ │
▼ ▼ ▼
scaling strategy
- 3 stages → 3 queues (one fronts each stage; the source feeds queue1)
queue1is transformer1's input (and the item source's destination)queue3is sink's input (and transformer2's destination)- Sink has no output queue — items terminate there
The item source is external to the flow. It's whatever drives items into queue1 — see Driving a flow.
Async-only. All stages must be async. Sync functions and sync context managers are rejected at construction. For sync code, wrap with
asyncio.to_threador use thesync_stage()helper (see Public API).
Transformer
A middle stage. Multiple workers, auto-scaled (including scale-to-zero).
There are two per-item transformers (the framework invokes them once per item):
| Shape you write | When to use |
|---|---|
async def f(x) -> y |
Simple stateless processing — parse, transform, enrich |
Factory () -> AsyncContextManager whose __aenter__ returns an async def fn(x) -> y |
Per-worker resource lifecycle — subprocess, DB connection, HTTP session |
And two sub-pipelines (multi-stage units that the framework expands into the parent pipeline at construction):
| Shape you write | What happens |
|---|---|
A Flow (from flow(...)) |
Sub-flow's stages are stitched into the parent pipeline; each keeps its own queue + workers + scaling. See Composing flows. |
A Router (from router(...)) |
Each arm becomes a sub-pipeline; classifier dispatches items to the chosen arm. See Routing. |
Plain async function
async def normalize(x):
return x.strip().lower()
CM factory — function form
Use when the stage needs setup/teardown per worker (a connection, a subprocess):
from contextlib import asynccontextmanager
@asynccontextmanager
async def with_db():
db = await connect_db()
async def fn(x):
return await db.process(x)
yield fn
await db.close()
CM factory — class form
A class with no-arg constructor and __aenter__ / __aexit__ is also a factory:
class WithDB:
async def __aenter__(self):
self.db = await connect_db()
async def fn(x):
return await self.db.process(x)
return fn
async def __aexit__(self, *exc):
await self.db.close()
Sub-flow
A Flow used as a stage. Inlined into the parent at construction:
preprocess = flow(decode, validate) # see Composing flows
Router
A Router used as a stage. Dispatches items to one of several arms:
split = router(classify, fast=quick, slow=heavy) # see Routing
Sink (implicit — last stage)
There is no separate "sink" type. The last stage in flow() plays the sink role when run autonomously: its return value is dropped, and it is what the items "land on."
async def db_write(x):
await db.insert(x)
chain = flow(normalize, db_write) # db_write is the last stage → sink under run()
await chain.run(reader) # reader's items → normalize → db_write (consumes)
The same chain composed inside another flow has no sink — its last stage's output flows into the parent's downstream queue:
inner = flow(normalize, enrich) # last stage = enrich
outer = flow(inner, db_write) # inner is a transformer; enrich's output → db_write
await outer.run(reader)
Same flow(normalize, enrich) definition; sink behavior is determined by context, not declaration.
How each shape behaves at runtime
Knowing how each shape executes helps you reason about resources, lifecycle, and side effects. This section describes what you should expect — not the framework's internals.
Plain async function — async def fn(x) -> y
Called once per item. No setup, no teardown.
The framework guarantees:
- Your function receives one item, returns one result.
- Multiple workers call your function concurrently — design it to be safe under concurrency, or use the CM factory shape for per-worker state.
- Function-local state (counters, buffers) does NOT persist across calls. If you need per-worker state, use the CM factory shape.
CM factory — () -> AsyncContextManager[Callable]
Called once per worker to set up resources, then the yielded callable is invoked once per item until the worker stops.
The framework guarantees:
- Your factory is called as
factory()once at worker startup;__aenter__runs; the yielded callable is reused for every item that worker processes. __aexit__runs once when the worker stops (whether on normal shutdown, scale-down,stop(), or even unhandled exceptions).- Anything you set up in the factory body lives for the worker's lifetime — that's where you bind per-worker state (a connection, a subprocess, a model).
- Each worker has its own context — N workers means N independent setup/teardown cycles. No sharing.
You pass the factory itself, not a built CM:
flow(with_db, db_write) # ✓ pass the factory
flow(with_db(), db_write) # ✗ pass a built CM (only enterable once)
Both @asynccontextmanager-decorated functions and classes implementing __aenter__/__aexit__ (with a no-arg constructor) satisfy the factory shape.
Sub-flow — Flow
When a Flow appears inside another flow(...), its stages are folded into the parent pipeline at construction. See Composing flows under "Designing a flow" for the full treatment, including how sub-flow stage names work and how to override sub-flow config from the parent.
Router — Router
When a Router appears inside flow(...), each arm becomes its own sub-pipeline. See Routing under "Designing a flow" for the full treatment, including the effective pipeline shape, arm dispatch, and behavior on classifier miss.
Worker lifecycle and scaling
Workers come and go based on the scaling strategy:
| Event | Plain async fn | CM factory |
|---|---|---|
| Worker spawned (scale up) | Reference captured | factory() called → __aenter__ runs → resource acquired |
| Worker processes item | await fn(item) |
await fn(item) (using resource from __aenter__) |
| Worker stopped (scale down) | Coroutine cancelled | __aexit__ runs → resource released |
| Stage scales 0 → 1 | First worker spawned, processes pending items | First worker spawned, factory called, resource acquired (latency on first item) |
| Stage scales N → 0 | All workers cancelled | All __aexit__ run; all resources released |
Error Handler
Pipeline-level. Receives typed events describing failures and drops. One per flow. See Error Handling for the full set of event types and behavior.
from flowrhythm import TransformerError, SourceError, Dropped
async def on_error(event):
match event:
case TransformerError(item, exc, stage):
log.error("stage %s failed on %r: %s", stage, item, exc)
case SourceError(exc):
log.critical("source failed: %s", exc); raise
case Dropped(item, stage, reason):
log.warn("dropped %r at %s: %s", item, stage, reason.name)
Scaling Strategy
Decides how many workers a stage runs based on live StageSnapshot. Built-in: FixedScaling, UtilizationScaling. Or implement the protocol yourself.
Designing a flow
A flow is pure structure — the chain of stages. How items get fed in is decided at activation time (run() or push()). Configuration (scaling, queues) is separate from both structure and activation.
Linear
chain = flow(normalize, db_write)
await chain.run(reader)
Routing
router() dispatches each item to one of several arms based on a classifier function. It's a sub-pipeline, not a function call — each arm becomes its own mini-pipeline inside the parent, with its own input queue and worker pool.
Signature
router(classifier, **arms, default=None)
classifier—async def (item) -> label— returns the keyword name of the arm to dispatch to**arms— keyword args mapping label → Transformer (function, CM factory, chain, or full flow)default— optional fallback Transformer for unmatched labels (if omitted, unmatched items becomeDroppedevents)
Example
heavy_path = flow(decode, heavy)
main = flow(
normalize,
router(classify,
fast=quick, # plain async function
slow=heavy_path, # transform chain (a Flow)
default=passthrough, # optional fallback
),
db_write,
)
await main.run(items)
What runs at runtime
The router's classifier runs as a single stage. Each arm becomes a sub-graph that flows back into the same downstream queue:
┌─ quick ──────────────────────────┐
normalize → [q] → classify ──┤ ├──► [q] → db_write
└─ heavy_path.decode → heavy_path.heavy ─┘
Each arm has its own queue and worker pool. Outputs of all arms feed the same downstream queue (the stage after the router). Sub-flow arms are inlined the same way as sub-flow stages elsewhere — see Composing flows.
Behavior on classifier miss
If the classifier returns a label that has no matching arm and no default is set, the item is dropped and the error handler receives a Dropped(item, stage, reason=DropReason.ROUTER_MISS) event. The pipeline continues by default. If you want to abort on misses, raise from your error handler — see Error Handling.
Composing flows
A flow can be embedded as a stage in another flow. The framework folds the sub-flow's stages into the parent pipeline at construction — sub-flows are not function calls; their stages become real stages of the parent. Each sub-stage retains its own queue, worker pool, scaling, and config. Activation (run, push) only happens on the outermost flow.
ingest = flow(parse, validate)
ingest.configure("validate", scaling=UtilizationScaling(min_workers=1, max_workers=8))
main = flow(ingest, db_write)
await main.run(items)
What runs at runtime
The effective graph that runs is:
items → [queue] → ingest.parse → [queue] → ingest.validate → [queue] → db_write
Four stages, three queues. ingest.validate keeps its 1–8 worker pool exactly as ingest.configure specified — composition does not change scaling. The first ingest.parse worker pushes its output into ingest.validate's input queue and moves on. Items flow through autonomously.
Stage names in the parent
Sub-flow stages get the sub-flow's variable name as a prefix in the parent: ingest.parse, ingest.validate. You can override their config from the parent using the dotted name:
main.configure("ingest.parse", scaling=FixedScaling(workers=2))
Sub-flows can contain sub-flows; names compose dotted (outer.middle.inner.stage).
The same flow runs standalone or composed
await ingest.run(items) # standalone — last stage (validate) is the sink
The same ingest definition behaves identically whether you run() it directly or embed it in another flow. Composition is purely structural; nothing about the sub-flow's behavior changes.
Naming
Stage names are auto-derived from function names. Collisions get a numeric suffix:
chain = flow(normalize, normalize, db_write)
# stage names: "normalize", "normalize_2", "db_write"
Override when needed:
from flowrhythm import stage
chain = flow(
stage(normalize, name="parse"),
db_write,
)
Configuring a flow
Configuration is operational — it tunes how the flow runs without changing what it does. Two equivalent styles are available: constructor keywords (one-shot setup) and methods (incremental, e.g. reading from a config file).
Constructor keywords (shorthand)
from flowrhythm import flow, FixedScaling, priority_queue
chain = flow(
normalize, db_write,
on_error=on_error,
default_scaling=FixedScaling(workers=2),
default_queue=priority_queue,
default_queue_size=10,
)
await chain.run(items)
on_error, default_scaling, default_queue, and default_queue_size cover flow-level configuration. Per-stage configuration (different scaling for one specific stage) needs the method form.
Methods (incremental)
from flowrhythm import flow, FixedScaling, UtilizationScaling, priority_queue
chain = flow(normalize, db_write)
# Pipeline-wide defaults
chain.configure_default(scaling=FixedScaling(workers=2))
# Per-stage tuning (only via method — no constructor shorthand)
chain.configure("normalize", scaling=UtilizationScaling(max_workers=8))
chain.configure("normalize", queue=priority_queue, queue_size=10)
chain.configure("db_write", queue_size=20) # bump just the buffer; default queue type
# Error handler
chain.set_error_handler(on_error)
await chain.run(items)
The two forms are fully equivalent — pick whichever fits your code shape.
Same chain, different environments
def build():
return flow(normalize, db_write)
dev = build()
dev.configure_default(scaling=FixedScaling(workers=1))
await dev.run(test_source) # batch from a small fixture
prod = build()
prod.configure("normalize", scaling=UtilizationScaling(max_workers=32))
await prod.run(kafka_source) # stream from production
Scaling Strategies
FixedScaling
Constant worker count. Requires workers >= 1 (use UtilizationScaling for scale-to-zero).
FixedScaling(workers=4)
UtilizationScaling
Adjusts worker count based on the busy/idle ratio. Supports scale-to-zero via min_workers=0.
UtilizationScaling(
min_workers=0, # never scale below this
max_workers=8, # never scale above this
lower_utilization=0.2, # if busy/total < 0.2, consider scaling down
upper_utilization=0.8, # if busy/total > 0.8, consider scaling up
upscaling_rate=2, # max workers added per scale-up decision
downscaling_rate=1, # max workers removed per scale-down decision
cooldown_seconds=5.0, # minimum seconds between scale events (anti-flap)
dampening=0.5, # multiplier on rate — 0.5 means add/remove half
sampling_period=2.0, # only consider scaling every N seconds
sampling_events=50, # only consider scaling every N items
)
How it works
On every item enqueue/dequeue, the strategy checks whether to scale. Two gates run first:
- Sampling — if
sampling_periodorsampling_eventsis set, scaling decisions are throttled (only consider every N seconds or every N events). This avoids checking on every single item in high-throughput stages. If both are set, both must pass. - Cooldown — if a scale event happened within the last
cooldown_seconds, do nothing. Prevents flapping between scale-up and scale-down.
If both gates pass, the strategy reads utilization = busy_workers / total_workers and decides:
utilization > upper_utilizationandworkers < max_workers→ scale up byupscaling_rate × dampening(rounded down, minimum 1)utilization < lower_utilizationandworkers > min_workers→ scale down bydownscaling_rate × dampening(rounded down, minimum 1)- Otherwise → no change
Tuning guide
- Bursty workload — set higher
upscaling_rate(e.g. 4-8) and lowercooldown_seconds(e.g. 1.0) to react quickly to spikes - Steady workload — keep defaults;
cooldown_seconds=5.0and rate of 1-2 is calm and predictable - Expensive workers (subprocesses, GPU models) — keep
dampeninglow (0.3-0.5) to scale conservatively - High-throughput stage — set
sampling_events=100orsampling_period=1.0to avoid overhead from per-item scaling decisions
Custom strategy
Implement the protocol — note the methods are synchronous:
class MyStrategy:
def initial_workers(self) -> int: ...
def on_enqueue(self, stats: StageSnapshot) -> int: ...
def on_dequeue(self, stats: StageSnapshot) -> int: ...
Return positive to add workers, negative to remove, 0 for no change.
Strategies must be sync. Scaling decisions are called on every item event (potentially millions/sec); awaiting external services here would block the hot path. If you need external state for a decision, refresh it in a background task and read it synchronously here.
Worker rules
- The source (when passed to
run()) is consumed by exactly one task — async generators cannot be safely consumed concurrently - All stages in the chain can scale, including down to zero
- A worker holds its async context manager for its lifetime; releasing happens on shutdown
- First item after
0→1transition pays the resource-acquire cost
Error Handling
Errors and drops are reported to a single pipeline-level error handler as typed events. The handler is observer-only — it logs, accounts, or forwards events, but it does not control pipeline flow:
- Returns normally → pipeline continues; the failed item is dropped
- Raises → the exception is logged to stderr; pipeline still continues; the failed item stays dropped
This means a buggy handler (a logger that throws, a metric backend that's down) cannot abort your pipeline. To stop a run based on what the handler observes, call chain.stop() from outside — see Aborting from inside the handler below.
Two layers of error handling, in order:
- Inside the transformer (preferred). Catch and handle there: retry, return a sentinel value, drop silently. Most error logic should live here because the transformer has full context.
- Pipeline error handler (last resort). Whatever escapes the transformer (or comes from the source / framework decisions like
Dropped) is routed here.
Event types
from dataclasses import dataclass
from enum import Enum
@dataclass
class TransformerError:
item: Any
exception: Exception
stage: str # name of the stage that failed
@dataclass
class SourceError:
exception: Exception # raised inside the source generator
class DropReason(Enum):
UPSTREAM_TERMINATED # Last(value) upstream caused this item to be discarded
ROUTER_MISS # router classifier returned an unknown arm and no default
@dataclass
class Dropped:
item: Any
stage: str
reason: DropReason
Writing a handler
from flowrhythm import flow, TransformerError, SourceError, Dropped
async def on_error(event):
match event:
case TransformerError(item, exc, stage):
log.error("stage %s failed on %r: %s", stage, item, exc)
case SourceError(exc):
log.critical("source failed: %s", exc)
# source is exhausted; pipeline drains naturally
case Dropped(item, stage, reason):
log.warn("dropped %r at %s (%s)", item, stage, reason.name)
chain = flow(normalize, db_write)
chain.set_error_handler(on_error)
await chain.run(items)
Default behavior (no handler set)
| Event | Default |
|---|---|
TransformerError |
Logged to stderr, pipeline continues |
SourceError |
Logged to stderr, pipeline drains (source is treated as exhausted) |
Dropped |
Silent continue |
Set a handler whenever you need different behavior for any of these.
Aborting from inside the handler
The handler cannot abort the pipeline directly — raising just gets logged. To stop a run based on what the handler observes (e.g. "too many errors"), track state and call chain.stop() from the task driving the run:
errors = 0
async def on_error(event):
nonlocal errors
if isinstance(event, TransformerError):
errors += 1
run_task = asyncio.create_task(chain.run(items))
while not run_task.done():
if errors > 100:
await chain.stop()
break
await asyncio.sleep(0.5)
await run_task
This separates "observe errors" (handler) from "decide to stop" (caller).
Driving a flow
The flow definition (flow(...)) describes the chain of stages. To make items actually flow through it, you have to activate the flow. There are three ways:
| Mode | What feeds items into the chain | Termination |
|---|---|---|
Bounded — await chain.run(source) |
An async generator you supply | Generator exhausts → drain → exit |
Unbounded — await chain.run() |
The framework emits None signals indefinitely |
External chain.stop() or first stage raises |
Push — async with chain.push() as handle: await handle.send(item) |
You push items via a PushHandle |
handle.complete() (explicit or via async with exit) |
Push mode returns a PushHandle from chain.push(); send() and complete() live on the handle, not on Flow. stop() is always available on Flow for graceful shutdown.
Bounded — run(source)
You pass an async generator — the source. The framework iterates it, pushing each yielded item into the chain's first queue. When the generator is exhausted, the pipeline drains and exits.
async def items():
for i in range(100):
yield i
chain = flow(normalize, db_write)
await chain.run(items) # ← pass the function, not items()
Pass the generator function itself (
items), not a call to it (items()). The framework owns iteration — this lets it manage the source lifecycle (close cleanly ondrain(), re-iterate on retry in future versions). Passingitems()raises with a clear error pointing this out.
Why is the source consumed by exactly one task?
The framework consumes the source generator with exactly one task — never more. Async generators hold internal iteration state (where in the loop, last value yielded), and they cannot be safely consumed by multiple tasks:
- Two tasks calling the generator function → each gets its own independent generator → both yield the same sequence → duplicates downstream.
- Two tasks sharing one generator instance → race conditions on the iteration state → undefined behavior, lost or repeated items.
So if you want to ingest from multiple sources in parallel, see parallel ingestion below — don't try to scale the source itself.
Source can be a CM factory
If your source needs setup/teardown (open a Kafka connection, then stream from it), pass a no-arg factory returning an AsyncContextManager whose __aenter__ yields an async generator:
@asynccontextmanager
async def kafka_source():
consumer = await connect_kafka()
async def gen():
async for msg in consumer:
yield msg
yield gen()
await consumer.close()
await chain.run(kafka_source)
Unbounded — run() (no source)
When you don't pass a source, the framework auto-emits None signals into the first stage's queue indefinitely. Useful for "just keep working" pipelines where the first stage knows how to fetch its own data.
async def fetch(_):
return await kafka.next()
chain = flow(fetch, normalize, db_write)
asyncio.create_task(chain.run())
# ... later, when you want to stop
await chain.stop()
Push — chain.push() + send()
You drive the flow yourself by pushing items via a PushHandle. Useful for embedding flowrhythm into a web server, WebSocket handler, or any event-driven context where items arrive from outside the flow's control.
chain = flow(normalize, db_write)
async with chain.push() as handle:
await handle.send(item1)
await handle.send(item2)
# on exit: handle.complete() is called automatically; chain drains; workers shut down
handle.send() blocks if the downstream queue is full — natural backpressure.
If you need to signal end-of-stream before the async with exits (e.g., the producing loop ended but you still want to do work after), call await handle.complete() explicitly. Subsequent send() calls will raise.
Parallel ingestion
If your bottleneck is fetching from an upstream system (Kafka with high throughput, paginated API with many pages, SQS poller), don't try to parallelize the source. Instead, put the parallelism in a fetcher transformer — the first stage of the chain — which can be scaled freely.
Option 1: tiny trigger source + fetcher transformer
async def trigger():
while True:
yield None # one signal per fetch attempt
async def fetch_message(_):
return await kafka.poll() # external state; safe to call from N workers
chain = flow(fetch_message, normalize, db_write)
chain.configure("fetch_message", scaling=UtilizationScaling(min_workers=4, max_workers=32))
await chain.run(trigger)
Option 2: omit the source; framework auto-emits None
chain = flow(fetch_message, normalize, db_write)
chain.configure("fetch_message", scaling=UtilizationScaling(min_workers=4, max_workers=32))
await chain.run() # no source arg → framework emits None forever
Same outcome, less boilerplate. The fetcher stage scales to as many workers as you need; each calls the upstream system independently.
Stopping from inside a transformer — Last(value)
Sometimes the chain itself knows when to stop — a transformer that processes items until it sees a terminator marker, or a "process until X" pattern. Wrap the return value in Last(value) to signal "this is the final item":
from flowrhythm import Last
async def process_until_done(item):
if item.is_terminator:
return Last(process(item)) # process and signal end
return process(item)
What the user observes:
- The wrapped value (
process(item)) flows downstream as the final item the sink will see. - The chain immediately stops accepting new items. Anything still upstream of this transformer is dropped — those items will never reach the sink.
- Items already past this transformer continue to the sink in order.
chain.run(...)returns once the sink has received the final item.
The framework guarantees: the value wrapped in Last(value) is the absolute last item the sink processes. Nothing comes after it.
Dropped upstream items are reported to the error handler as Dropped(item, stage, reason=DropReason.UPSTREAM_TERMINATED) events — by default, dropped items continue silently. If you need to log or capture them, set an error handler.
Stopping a running flow
The three activation modes above are the only public ways to drive a flow. Combined with stop() (abort) and drain() (graceful), they cover every legitimate scenario.
# Graceful shutdown
task = asyncio.create_task(chain.run(source))
# ...
await chain.drain() # waits for items in flight to finish
await task # task returns normally
# Abort
task = asyncio.create_task(chain.run(source))
# ...
await chain.stop() # cancels workers, drops in-flight items
await task
What drain() does to your source
| Mode | What the user observes |
|---|---|
run(source) |
Framework calls source.aclose() on your generator; if you have try/finally cleanup in the generator, it runs. Then in-flight items finish. |
run() (no source) |
Framework stops emitting None signals. In-flight items finish. |
chain.push() |
The next send() raises. In-flight items finish. (Usually you don't need to call drain() explicitly here — exit the async with instead.) |
In all cases, drain() returns once every item that entered the chain has reached the sink or the error handler.
Push-mode shortcuts
Exiting the async with chain.push() as h: block automatically calls h.complete() and waits for the chain to drain — no explicit drain() needed. To abort instead of draining, call chain.stop() from another task.
Inspecting a flow
chain.dump() renders the expanded pipeline graph for debugging. Three formats:
print(chain.dump()) # text (default)
print(chain.dump(format="mermaid")) # paste into a markdown file
data = json.loads(chain.dump(format="json"))
For a flow like:
chain = flow(
parse,
validate,
stage(router(classify, fast=fast, slow=heavy_path), name="dispatch"),
db_write,
)
chain.dump() shows:
flow (5 stages):
[0] parse → [1] FixedScaling(workers=1) fifo_queue(maxsize=1)
[1] validate → [2] FixedScaling(workers=1) fifo_queue(maxsize=1)
[2] dispatch [classifier] arms: {fast→[3], slow→[4]} FixedScaling(workers=1) fifo_queue(maxsize=1)
[3] dispatch.fast (arm-end) → [5] (merge) FixedScaling(workers=1) fifo_queue(maxsize=1)
[4] dispatch.slow (arm-end) → [5] (merge) FixedScaling(workers=1) fifo_queue(maxsize=1)
[5] db_write (sink) FixedScaling(workers=1) fifo_queue(maxsize=1)
Sub-flow stages and router arms appear with dotted names (dispatch.fast, ingest.parse) — composition is visible at a glance.
For live runtime stats, use chain.dump(mode="stats"). Readable both during a run AND after it finishes:
print(chain.dump(mode="stats"))
# flow stats (completed):
# [0] parse 0 alive (0 busy, 0 idle) queue=0 [drained] processed=20 errors=0
# [1] boom_sometimes 0 alive (0 busy, 0 idle) queue=0 [drained] processed=16 errors=4
# [2] dispatch 0 alive (0 busy, 0 idle) queue=0 [drained] processed=16 errors=0
# ...
#
# events:
# transformer errors: 4
# source errors: 0
# drops: 0
Per-stage processed and errors counters plus aggregate event totals. Useful for spotting bottlenecks (look at queue_length and busy/idle ratios) and validating health (errors and drops). Mermaid format isn't supported for stats (stats aren't graph-shaped).
Troubleshooting
Common failure modes and what to do about them. Each entry is symptom → why → fix.
TypeError: pass the generator function, not the called generator
Symptom:
await chain.run(items()) # raises TypeError
Why: The framework owns iteration over your source — that's what lets it close the generator on drain(), manage retries, and otherwise control the source lifecycle. A pre-instantiated generator can only be iterated once and from one place; the framework can't hand it back.
Fix: Pass the function itself (no parentheses):
await chain.run(items)
TypeError: transformer 'foo' is sync
Symptom:
def foo(x): # def, not async def
return x.upper()
flow(foo) # raises at construction
Why: Sync functions block the event loop, freezing every other stage. The framework rejects them at construction so the failure mode is loud, not silent.
Fix: Either rewrite as async def, or wrap with sync_stage() for code that has to stay sync (CPU-bound work, third-party libs):
from flowrhythm import sync_stage
flow(sync_stage(foo)) # runs each call in a thread via asyncio.to_thread
Items disappear without reaching the sink
Symptom: Some inputs never arrive at the last stage. No exception, no obvious error.
Why: Items can vanish for several reasons; the framework treats them all as observable events but stays silent unless you set a handler:
| What happened | How to spot it |
|---|---|
| Transformer raised; default handler logged to stderr but pipeline continued | Check stderr; or chain.dump(mode="stats") shows errors > 0 and events.transformer_errors > 0 |
Router classifier returned an unknown label and there was no default arm |
Stats shows events.drops.ROUTER_MISS > 0 |
Last(value) fired upstream — items already in flight got cut off |
Stats shows events.drops.UPSTREAM_TERMINATED > 0 |
| Source generator raised; default handler logged + drained | Stats shows events.source_errors > 0 |
Fix: Set an on_error handler so you see what's being dropped:
async def on_error(event):
match event:
case TransformerError(item, exc, stage):
log.error("stage %s failed on %r: %s", stage, item, exc)
case Dropped(item, stage, reason):
log.warn("dropped %r at %s: %s", item, stage, reason.name)
case SourceError(exc):
log.critical("source died: %s", exc)
chain = flow(..., on_error=on_error)
Pipeline hangs and never returns
Symptom: await chain.run(...) doesn't return. Source long since exhausted.
Why: Almost always a transformer that never finishes — infinite loop, blocking I/O, deadlock on an external lock.
Fix: Use a watchdog to dump stats while the run is in progress. Look for stages with high busy count that don't change between dumps — those are stuck:
async def watchdog():
while True:
await asyncio.sleep(5)
print(chain.dump(mode="stats"))
asyncio.create_task(watchdog())
async with asyncio.timeout(60): # bound the run while debugging
await chain.run(items)
If you can't reproduce it locally, wrap the run in asyncio.timeout(...) in production and call chain.stop() on timeout — that cancels stuck workers and cleanly runs their resource cleanup (__aexit__).
Last(value) isn't actually the last item the sink sees
Symptom: You return Last(value) from a transformer, but the sink processes items after value.
Why: With multiple workers in a downstream stage, item completion order isn't preserved across workers. The framework guarantees value enters the destination queue last; with a single-worker downstream that's also the last item processed, but with N>1 workers two items can be in flight simultaneously and finish in either order.
Fix: Configure the receiving stage with one worker:
chain.configure("sink", scaling=FixedScaling(workers=1))
This applies to the stage that consumes value — usually the sink, sometimes a single-worker formatting stage right before it.
RuntimeError: cannot send() after complete() in push mode
Symptom:
async with chain.push() as h:
await h.send(1)
await h.send(2) # raises RuntimeError
Or:
async with chain.push() as h:
await h.send(1)
await h.complete()
await h.send(2) # raises RuntimeError
Why: complete() is irreversible — it signals end-of-stream and starts the drain cascade. Exiting the async with block calls it automatically.
Fix: Keep all send() calls inside the async with block (or before any explicit complete()). The async with exit handles complete() for you:
async with chain.push() as h:
for item in source:
await h.send(item)
# complete() called automatically here
Worker count stays high under UtilizationScaling after load drops
Symptom: After a burst of items, dump(mode="stats") shows the worker count hasn't shrunk back.
Why (most common): Scaling decisions fire on item enqueue/dequeue events. If items stop flowing entirely, no decision happens — the worker count stays at whatever it was. Workers are idle (waiting on get) but alive.
Less common reasons:
- Cooldown period.
UtilizationScaling(cooldown_seconds=5.0)prevents another scale event within 5 seconds of the last one. - Sampling gates. If you set
sampling_periodorsampling_events, decisions are throttled to those rates. min_workers > 0. Scaling never goes belowmin_workers. For full scale-to-zero, setmin_workers=0(and you must useUtilizationScaling—FixedScalingrequiresworkers >= 1).
Fix: Check your strategy parameters. To force a scale-down decision, send another item (or use a periodic "tick" trigger). To allow scale-to-zero, use UtilizationScaling(min_workers=0, ...).
Pipeline doesn't backpressure — memory grows unbounded
Symptom: Memory usage grows during long runs even though the sink can keep up.
Why: A stage with queue_size=0 (unbounded) doesn't backpressure — its upstream can push items faster than it processes them. Default queue size is 1 precisely to prevent this. If you've explicitly opted into a larger or unbounded queue, you're trading backpressure for buffering.
Fix: Default to small queues and only increase per stage where bursty buffering helps:
chain.configure("bursty_stage", queue_size=100) # explicit, narrow scope
Avoid queue_size=0 (unbounded) unless you have an external bound on input rate.
Where to look first
If something's wrong:
chain.dump(mode="structure")— does the topology match what you wrote? Are stage names what you expected?chain.dump(mode="stats")— are items flowing? Where are they piling up? How many errors / drops?- Set an
on_errorhandler if you haven't — silent drops become loud. - Wrap the run in
asyncio.timeout(...)when debugging hangs, and callchain.stop()on timeout to inspect cleanup.
Public API
Construction
| Symbol | Kind | Purpose |
|---|---|---|
flow(*stages, on_error=None, default_scaling=None, default_queue=None, default_queue_size=None) |
function | Construct a flow from a sequence of stages. Optional kwargs are shorthand for set_error_handler / configure_default |
router(classifier, **arms, default=...) |
function | Construct a router for branching |
stage(fn, name=...) |
function | Override the auto-derived name of a stage |
sync_stage(fn) |
function | Wrap a sync function with asyncio.to_thread so it can be used as an async stage |
Activating a flow
Method on Flow |
Purpose |
|---|---|
flow.run(source=None) |
Run autonomously. With an async-generator source: bounded; without a source: unbounded (framework emits None signals) |
flow.push() |
Enter push mode — returns an AsyncContextManager[PushHandle] |
flow.drain() |
Graceful shutdown: stop the source (or aclose() your generator), wait for in-flight items to finish, then return |
flow.stop() |
Abort: cancel workers, drop in-flight items, release resources |
Method on PushHandle (returned by flow.push()) |
Purpose |
|---|---|
handle.send(item) |
Push an item into the flow's first queue (blocks if queue is full) |
handle.complete() |
Signal end-of-stream (called automatically on async with exit) |
Configuration and inspection (Flow methods)
| Method | Purpose |
|---|---|
flow.configure(name, scaling=..., queue=..., queue_size=...) |
Per-stage tuning. queue= is a queue factory (fifo_queue, priority_queue, …); queue_size= is the queue's maxsize. Either or both may be set independently. |
flow.configure_default(scaling=..., queue=..., queue_size=...) |
Pipeline-wide defaults; same kwargs as configure() |
flow.set_error_handler(handler) |
Set the error sink for uncaught transformer exceptions |
flow.dump(mode="structure"|"stats", format="text"|"mermaid"|"json") |
Inspect the flow. mode="structure" renders the pipeline graph (text/mermaid/JSON). mode="stats" renders live runtime stats: per-stage worker counts, queue lengths, processed/error counters, drops by reason. Stats supports text + JSON only. |
Types and helpers
| Symbol | Kind | Purpose |
|---|---|---|
Flow |
class | Type hint only — def helper(f: Flow) -> Flow. Construct via flow(). |
Router |
class | Type hint only — produced by router() |
PushHandle |
class | Type hint only — returned by flow.push(); provides send() and complete() |
Last |
class | Wrap a transformer's return value: return Last(result) to mark this as the final item |
TransformerError, SourceError, Dropped |
dataclasses | Event types passed to the error handler |
DropReason |
enum | Reasons items get dropped (UPSTREAM_TERMINATED, ROUTER_MISS) |
FixedScaling, UtilizationScaling |
classes | Built-in scaling strategies |
ScalingStrategy, StageSnapshot |
protocol / dataclass | For implementing custom strategies |
fifo_queue, lifo_queue, priority_queue |
functions | Queue factories |
Guarantees
Promises the framework makes to the user. If any of these are violated, it's a bug.
Item processing
- Every item that enters the chain reaches a terminal state before
run()returns orasync with chain.push()exits. A terminal state is: consumed by the sink, or routed to the error handler. Last(value)is final. If a transformer returnsLast(value), the sink will seevalueas its last item. No item can reach the sink aftervalue.
Resources
- Per-worker context managers always have
__aexit__called when the worker stops, even onstop()(immediate abort) or unhandled exceptions. - Resources are released before
run()(orstop()) returns. When the call returns, no workers are alive and no resources are held.
Termination
chain.run(source)returns naturally when the source generator completes and the pipeline finishes draining.chain.run(source)does not re-raise source exceptions. A source-generator failure is delivered to the error handler as aSourceErrorevent; the pipeline drains andrun()returns normally. To abort on source failure, callchain.stop()from outside (see Error Handling).chain.stop()returns only after every worker has exited and every per-worker resource has been released.chain.drain()returns only after the pipeline is fully drained — no items in flight, all workers idle/exited.
Sources
- Async generators are consumed by exactly one task. The framework never forks a generator.
- The same flow definition behaves identically standalone or composed. Embedding a flow as a stage in another flow does not change its per-stage scaling, queues, or configuration.
Order
- Within a single-worker stage, item order is preserved. Items leave in the same order they arrived.
- Across multi-worker stages, order is not preserved. With N > 1 workers, items may complete in any order. If you need order, use
FixedScaling(workers=1)for that stage.
Backpressure
- Slow downstream stages naturally throttle upstream. A stage with a full input queue causes the upstream stage to block on
put(), which propagates back to the source. There is no item buffering beyond configured queue sizes.
What is not guaranteed
- Exactly-once delivery. If a transformer raises and the error handler routes it to a log, the item is gone — no retry, no checkpointing.
- Persistence across process restart. Items in flight when the process dies are lost.
- Order across router arms. If two arms have different latencies, items from the faster arm may interleave with items from the slower arm in the downstream stage.
Architecture
Design Principles
- Orchestrator, not a worker. flowrhythm coordinates external heavy work (subprocesses, services, I/O). It is not for running CPU-bound computation inside the Python process.
- Stream processing pipeline, not a workflow engine. The graph is a DAG — no cycles. Items flow forward and terminate at a sink.
- Structure and configuration are separate.
flow()defines what flows where..configure()tunes how it runs. - Composable. A flow plugs into another flow as a transformer. Complex topologies are built by composition, not by complicating the DSL.
- Routing via
router(). Branching is a regular transformer that dispatches to sub-pipelines by label. Arms converge after the router. - Retry/iteration belongs inside a stage, not in the graph topology.
Component overview
The full type/relationship class diagram lives in DESIGN.md. For day-to-day use, the Pipeline Flow diagram below is enough — it shows how items move through the system.
How flow() processes each kind at construction
flow() walks its arguments at construction. Some kinds are sub-pipelines (expanded into the parent pipeline); others are per-item transformers that occupy a single stage.
match stage:
case Flow() as sub_flow:
# SUB-PIPELINE: fold sub_flow's stages into the parent pipeline.
# Stage names get the sub-flow's name as a prefix (e.g. "ingest.parse").
# Sub-flow's per-stage configuration is preserved unless overridden by the parent.
case Router() as r:
# SUB-PIPELINE: each arm becomes its own sub-pipeline. The classifier
# runs as a single stage that dispatches to arm pipelines.
case ctx if callable(ctx) and len(inspect.signature(ctx).parameters) == 0:
# PER-ITEM: CM factory; framework calls factory() per worker,
# enters the context, uses yielded callable per item.
case fn if callable(fn) and len(inspect.signature(fn).parameters) == 1:
# PER-ITEM: plain async function; called per item.
After expansion, every stage in the runtime pipeline is a single per-item transformer (plain async fn, CM factory, or router classifier). Each stage owns one input queue, a worker pool, and a scaling strategy.
Pipeline Flow
flowchart TD
Src[/Item source/] --> B[Transformer]
B --> C{Exception?}
C -- No --> D[Next stage / last stage drops]
C -- Yes --> E[Error Sink]
B -. "router()" .-> F{Classifier}
F -- arm 1 --> G[Transformer / Chain / Flow]
F -- arm 2 --> H[Transformer / Chain / Flow]
F -- unknown --> I[default arm, or Dropped → Error Handler]
G --> D
H --> D
I --> D
I -. drop .-> E
Item source is one of:
- A source generator passed to
run(source)— bounded - Auto-emitted
Nonesignals fromrun()— unbounded - Items pushed via
handle.send()afterasync with chain.push() as handle— push mode
Project status
flowrhythm is in early development. The DSL and runtime are still settling — see DESIGN.md for design decisions and open questions, and todos/INDEX.md for active plans (in priority order).
License
MIT License. See LICENSE.
Author
Andrey Maximov GitHub
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowrhythm-0.2.0.tar.gz.
File metadata
- Download URL: flowrhythm-0.2.0.tar.gz
- Upload date:
- Size: 105.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9d14e6ee5af4773c2c28febd42ea0cf34c10c35477dc5f74ff70a271d6aef1c2
|
|
| MD5 |
d37b28c28a3168cca8a10a2f171e0d69
|
|
| BLAKE2b-256 |
7f4dada82d4d923654496ccb3720221ee8135d12400e69b8932d705d4eaa1c22
|
File details
Details for the file flowrhythm-0.2.0-py3-none-any.whl.
File metadata
- Download URL: flowrhythm-0.2.0-py3-none-any.whl
- Upload date:
- Size: 44.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
59fad278f7d6d47176e527e38dd269cd62bd097f083bbf0c5585afae58b286ab
|
|
| MD5 |
9e1e85a0fd75ba22a76478f008e24eb8
|
|
| BLAKE2b-256 |
6ed3315a1746488707751b7da2783001f912f84a5bfd49a5563809c65c586272
|