Skip to main content

A DAG-based pipeline orchestration framework for building observable, composable stage pipelines

Project description

Stageflow

A DAG-based pipeline orchestration framework for building observable, composable stage pipelines in Python.

Features

  • DAG Execution: Stages execute as soon as dependencies resolve, maximizing parallelism
  • Fluent Pipeline Builder: Type-safe, composable pipeline definitions
  • Interceptor Framework: Middleware pattern for cross-cutting concerns (timeouts, circuit breakers, tracing, metrics)
  • Observable by Design: Structured events, correlation IDs, and logging built-in
  • Protocol-Based Extension: Clean abstractions for persistence, configuration, and events
  • Async-First: Built on asyncio for high-performance concurrent execution
  • Zero Dependencies: Core library has no external dependencies

Installation

Latest release: v0.5.0

pip install stageflow-core

For development:

pip install stageflow-core[dev]

Quick Start

import asyncio
from stageflow import (
    Pipeline,
    Stage,
    StageContext,
    StageOutput,
    StageKind,
    PipelineTimer,
)
from stageflow.context import ContextSnapshot, RunIdentity
from stageflow.stages import StageInputs

# Define a stage
class GreetStage:
    name = "greet"
    kind = StageKind.TRANSFORM

    async def execute(self, ctx: StageContext) -> StageOutput:
        name = ctx.snapshot.input_text or "World"
        return StageOutput.ok(greeting=f"Hello, {name}!")

# Define another stage that depends on the first
class ShoutStage:
    name = "shout"
    kind = StageKind.TRANSFORM

    async def execute(self, ctx: StageContext) -> StageOutput:
        # Access output from dependency via StageInputs
        greeting = ctx.inputs.get_from("greet", "greeting", default="Hello!")
        return StageOutput.ok(shouted=greeting.upper())

# Build the pipeline
pipeline = (
    Pipeline()
    .with_stage("greet", GreetStage, StageKind.TRANSFORM)
    .with_stage("shout", ShoutStage, StageKind.TRANSFORM, dependencies=("greet",))
)

# Execute
async def main():
    graph = pipeline.build()
    snapshot = ContextSnapshot(run_id=RunIdentity(), input_text="World")
    base_inputs = StageInputs(snapshot=snapshot)
    ctx = StageContext(
        snapshot=snapshot,
        inputs=base_inputs,
        stage_name="pipeline_entry",
        timer=PipelineTimer(),
    )
    results = await graph.run(ctx)
    print(results["shout"].data["shouted"])  # "HELLO, WORLD!"

asyncio.run(main())

Core Concepts

Stages

A Stage is a unit of work with a defined input/output contract:

class Stage(Protocol):
    name: str
    kind: StageKind

    async def execute(self, ctx: StageContext) -> StageOutput: ...

Stage kinds categorize behavior:

  • TRANSFORM - Change input form (STT, TTS, LLM)
  • ENRICH - Add context (profile, memory, skills)
  • ROUTE - Select execution path
  • GUARD - Validate (guardrails, policy)
  • WORK - Side effects (persist, assess)
  • AGENT - Main interaction logic

Pipelines

Pipelines compose stages into a DAG using a fluent builder:

pipeline = (
    Pipeline()
    .with_stage("stt", SttStage, StageKind.TRANSFORM)
    .with_stage("enrich", EnrichStage, StageKind.ENRICH, dependencies=("stt",))
    .with_stage("llm", LlmStage, StageKind.TRANSFORM, dependencies=("enrich",))
    .with_stage("tts", TtsStage, StageKind.TRANSFORM, dependencies=("llm",))
)

Pipelines can be composed:

core_pipeline = Pipeline().with_stage(...)
voice_pipeline = core_pipeline.compose(
    Pipeline().with_stage("tts", TtsStage, StageKind.TRANSFORM, dependencies=("llm",))
)

Interceptors

Interceptors wrap stage execution for cross-cutting concerns:

from stageflow.interceptors import BaseInterceptor, InterceptorResult

class AuthInterceptor(BaseInterceptor):
    name = "auth"
    priority = 5  # Lower = runs first

    async def before(self, stage_name: str, ctx: PipelineContext) -> InterceptorResult | None:
        if not ctx.data.get("authenticated"):
            return InterceptorResult(stage_ran=False, error="Not authenticated")
        return None

    async def after(self, stage_name: str, result: StageResult, ctx: PipelineContext) -> None:
        pass

Built-in interceptors:

  • TimeoutInterceptor - Per-stage timeouts
  • CircuitBreakerInterceptor - Failure isolation
  • TracingInterceptor - OpenTelemetry spans
  • MetricsInterceptor - Stage duration/success metrics
  • LoggingInterceptor - Structured JSON logging

Event Sinks

EventSink is a protocol for event persistence:

from stageflow import EventSink

class MyEventSink(EventSink):
    async def emit(self, *, type: str, data: dict | None) -> None:
        # Persist to your storage
        await db.insert("events", {"type": type, "data": data})

    def try_emit(self, *, type: str, data: dict | None) -> None:
        # Fire-and-forget variant
        asyncio.create_task(self.emit(type=type, data=data))

Architecture

Stageflow follows SOLID principles with a clear separation:

┌─────────────────────────────────────────────────────────────┐
│                        Your Application                     │
├─────────────────────────────────────────────────────────────┤
│  Adapters (implement protocols)                             │
│  - DatabaseEventSink                                        │
│  - PostgresRunStore                                         │
│  - EnvConfigProvider                                        │
├─────────────────────────────────────────────────────────────┤
│                     stageflow (core)                        │
│  ┌─────────┐  ┌─────────┐  ┌─────────────┐  ┌─────────┐     │
│  │ Pipeline│  │  Graph  │  │ Interceptors│  │ Events  │     │
│  └─────────┘  └─────────┘  └─────────────┘  └─────────┘     │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                    Ports (protocols)                │    │
│  │  EventSink | RunStore | ConfigProvider              │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

Event Taxonomy

Stageflow emits structured events for observability:

Event Type When
pipeline.created Pipeline run initialized
pipeline.started Execution begins
pipeline.completed All stages finished
pipeline.failed Unrecoverable error
pipeline.cancelled Graceful termination
stage.{name}.started Stage execution begins
stage.{name}.completed Stage finished successfully
stage.{name}.failed Stage threw error
stage.{name}.skipped Conditional stage skipped

License

MIT

Contributing

Contributions welcome! Please read the contributing guide first.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stageflow_core-0.5.0.tar.gz (508.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stageflow_core-0.5.0-py3-none-any.whl (193.1 kB view details)

Uploaded Python 3

File details

Details for the file stageflow_core-0.5.0.tar.gz.

File metadata

  • Download URL: stageflow_core-0.5.0.tar.gz
  • Upload date:
  • Size: 508.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stageflow_core-0.5.0.tar.gz
Algorithm Hash digest
SHA256 61bf1d7a905cbd0bebe437d139ae9f894780f91cb9b9644a9b4c34c2cc46b166
MD5 25c3eb2605ac0a8262423af83b221c71
BLAKE2b-256 8ea5f8eae41ca347fb14457d3c46677825eb9b2d44765e6ea7122917a3b55344

See more details on using hashes here.

File details

Details for the file stageflow_core-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: stageflow_core-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 193.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stageflow_core-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cf7fc6058f6463f9a8f77d8c4b9ac69421dea72328c55a54237b4f5e878cd528
MD5 298f8b5d606351e68a1600868d99565d
BLAKE2b-256 a19374d3344eea55c07f8a9980d83a362989c6bcf128e43c32184dcd978c5622

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page