A DAG-based pipeline orchestration framework for building observable, composable stage pipelines
Project description
Stageflow
A DAG-based pipeline orchestration framework for building observable, composable stage pipelines in Python.
Features
- DAG Execution: Stages execute as soon as dependencies resolve, maximizing parallelism
- Fluent Pipeline Builder: Type-safe, composable pipeline definitions
- Interceptor Framework: Middleware pattern for cross-cutting concerns (timeouts, circuit breakers, tracing, metrics)
- Observable by Design: Structured events, correlation IDs, and logging built-in
- Protocol-Based Extension: Clean abstractions for persistence, configuration, and events
- Async-First: Built on asyncio for high-performance concurrent execution
- Zero Dependencies: Core library has no external dependencies
Installation
Latest release: v0.2.0
pip install stageflow
For development:
pip install stageflow[dev]
Quick Start
import asyncio
from stageflow import (
Pipeline,
Stage,
StageContext,
StageOutput,
StageKind,
PipelineTimer,
)
from stageflow.context import ContextSnapshot, RunIdentity
from stageflow.stages import StageInputs
# Define a stage
class GreetStage:
name = "greet"
kind = StageKind.TRANSFORM
async def execute(self, ctx: StageContext) -> StageOutput:
name = ctx.snapshot.input_text or "World"
return StageOutput.ok(greeting=f"Hello, {name}!")
# Define another stage that depends on the first
class ShoutStage:
name = "shout"
kind = StageKind.TRANSFORM
async def execute(self, ctx: StageContext) -> StageOutput:
# Access output from dependency via StageInputs
greeting = ctx.inputs.get_from("greet", "greeting", default="Hello!")
return StageOutput.ok(shouted=greeting.upper())
# Build the pipeline
pipeline = (
Pipeline()
.with_stage("greet", GreetStage, StageKind.TRANSFORM)
.with_stage("shout", ShoutStage, StageKind.TRANSFORM, dependencies=("greet",))
)
# Execute
async def main():
graph = pipeline.build()
snapshot = ContextSnapshot(run_id=RunIdentity(), input_text="World")
base_inputs = StageInputs(snapshot=snapshot)
ctx = StageContext(
snapshot=snapshot,
inputs=base_inputs,
stage_name="pipeline_entry",
timer=PipelineTimer(),
)
results = await graph.run(ctx)
print(results["shout"].data["shouted"]) # "HELLO, WORLD!"
asyncio.run(main())
Core Concepts
Stages
A Stage is a unit of work with a defined input/output contract:
class Stage(Protocol):
name: str
kind: StageKind
async def execute(self, ctx: StageContext) -> StageOutput: ...
Stage kinds categorize behavior:
TRANSFORM- Change input form (STT, TTS, LLM)ENRICH- Add context (profile, memory, skills)ROUTE- Select execution pathGUARD- Validate (guardrails, policy)WORK- Side effects (persist, assess)AGENT- Main interaction logic
Pipelines
Pipelines compose stages into a DAG using a fluent builder:
pipeline = (
Pipeline()
.with_stage("stt", SttStage, StageKind.TRANSFORM)
.with_stage("enrich", EnrichStage, StageKind.ENRICH, dependencies=("stt",))
.with_stage("llm", LlmStage, StageKind.TRANSFORM, dependencies=("enrich",))
.with_stage("tts", TtsStage, StageKind.TRANSFORM, dependencies=("llm",))
)
Pipelines can be composed:
core_pipeline = Pipeline().with_stage(...)
voice_pipeline = core_pipeline.compose(
Pipeline().with_stage("tts", TtsStage, StageKind.TRANSFORM, dependencies=("llm",))
)
Interceptors
Interceptors wrap stage execution for cross-cutting concerns:
from stageflow.interceptors import BaseInterceptor, InterceptorResult
class AuthInterceptor(BaseInterceptor):
name = "auth"
priority = 5 # Lower = runs first
async def before(self, stage_name: str, ctx: PipelineContext) -> InterceptorResult | None:
if not ctx.data.get("authenticated"):
return InterceptorResult(stage_ran=False, error="Not authenticated")
return None
async def after(self, stage_name: str, result: StageResult, ctx: PipelineContext) -> None:
pass
Built-in interceptors:
TimeoutInterceptor- Per-stage timeoutsCircuitBreakerInterceptor- Failure isolationTracingInterceptor- OpenTelemetry spansMetricsInterceptor- Stage duration/success metricsLoggingInterceptor- Structured JSON logging
Event Sinks
EventSink is a protocol for event persistence:
from stageflow import EventSink
class MyEventSink(EventSink):
async def emit(self, *, type: str, data: dict | None) -> None:
# Persist to your storage
await db.insert("events", {"type": type, "data": data})
def try_emit(self, *, type: str, data: dict | None) -> None:
# Fire-and-forget variant
asyncio.create_task(self.emit(type=type, data=data))
Architecture
Stageflow follows SOLID principles with a clear separation:
┌─────────────────────────────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────────────────────────────┤
│ Adapters (implement protocols) │
│ - DatabaseEventSink │
│ - PostgresRunStore │
│ - EnvConfigProvider │
├─────────────────────────────────────────────────────────────┤
│ stageflow (core) │
│ ┌─────────┐ ┌─────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ Pipeline │ │ Graph │ │ Interceptors│ │ Events │ │
│ └─────────┘ └─────────┘ └─────────────┘ └─────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Ports (protocols) │ │
│ │ EventSink | RunStore | ConfigProvider │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Event Taxonomy
Stageflow emits structured events for observability:
| Event Type | When |
|---|---|
pipeline.created |
Pipeline run initialized |
pipeline.started |
Execution begins |
pipeline.completed |
All stages finished |
pipeline.failed |
Unrecoverable error |
pipeline.cancelled |
Graceful termination |
stage.{name}.started |
Stage execution begins |
stage.{name}.completed |
Stage finished successfully |
stage.{name}.failed |
Stage threw error |
stage.{name}.skipped |
Conditional stage skipped |
License
MIT
Contributing
Contributions welcome! Please read the contributing guide first.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stageflow_core-0.2.0.tar.gz.
File metadata
- Download URL: stageflow_core-0.2.0.tar.gz
- Upload date:
- Size: 456.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
20f8e7e315cf2259be8dba5e935fed54ccc0bf08caa9384c858e78608dc0c542
|
|
| MD5 |
5f2ea5aca2869614f44069bf3370eebe
|
|
| BLAKE2b-256 |
b91e8c577d9175d0541ac65f6eb2eb93ae5daebe376becebc97005dd9d96c7e7
|
File details
Details for the file stageflow_core-0.2.0-py3-none-any.whl.
File metadata
- Download URL: stageflow_core-0.2.0-py3-none-any.whl
- Upload date:
- Size: 161.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
380bd69c49460072bd817643c98c7d328b945272f387a5190aaef4872f3a8eaf
|
|
| MD5 |
88a79a13935e234173cf067ea0fe9525
|
|
| BLAKE2b-256 |
5e1aacc605d1c46e6d932085910bdb5196e9262ffc5078998b7ee0009be3a6a7
|