Skip to main content

A lightweight Python framework for readable multi-agent pipelines.

Project description

Orchflow

PyPI version Python versions CI License

Orchflow is a lightweight Python framework for readable multi-agent pipelines. It gives you sequential, parallel, conditional, retryable, and observable orchestration without forcing every workflow into a heavy graph runtime.

pip install orchflow
from orchflow import Flow, StepContext, step


@step
async def research(input: str, context: StepContext) -> str:
    return f"research about {input}"


@step
async def write(input: str, context: StepContext) -> str:
    return f"draft based on {context.previous}"


result = await Flow([research, write]).run("agent orchestration")
print(result.output)

Why Orchflow Exists

Plain Python function chaining is easy to read, but it becomes fragile as soon as a workflow needs retries, parallel work, branching, shared state, or traces. Large graph frameworks are powerful, but they can add more abstraction than a small agent pipeline needs.

Orchflow sits in the middle: the user writes normal Python functions, while the framework handles orchestration mechanics that should be reliable and inspectable.

flowchart LR
    A["flow.run(input)"] --> B["Step: research"]
    B --> C{"Parallel group"}
    C --> D["Step: web_research"]
    C --> E["Step: docs_research"]
    D --> F["Merge outputs"]
    E --> F
    F --> G{"Condition"}
    G --> H["technical_writer"]
    G --> I["general_writer"]
    H --> J["FlowResult + traces"]
    I --> J

What It Demonstrates

Orchflow is intentionally small, but it is built like a real package:

  • Async-first execution with sync-step support through worker threads
  • Sequential pipelines, parallel fan-out, and conditional routing
  • Retry policy at both flow and step level
  • Shared run state with explicit StepContext
  • Flat StepTrace records for every attempt, including failures
  • Live lifecycle events with Flow.events(...)
  • Lightweight human input gates with callback or stdin providers
  • Optional LiteLLM-backed Agent without making LiteLLM a core dependency
  • Offline test helpers under orchflow.testing
  • Typed package metadata, CI, TestPyPI/PyPI release workflows, and tag releases

Core Concepts

Orchflow keeps the public model deliberately small.

Concept Purpose
Agent Stateless role-based LLM helper with optional LiteLLM support
@step Decorator for a unit of workflow work
StepContext Carries previous output, original input, metadata, and shared state
Flow Orchestrates sequential, parallel, and conditional execution
FlowResult Final output, traces, state, timing, and failure details
FlowEvent Live lifecycle event emitted while a flow runs
human_input Step helper for pausing a flow and collecting reviewer text

Sequential Flow

from orchflow import Flow, StepContext, step


@step(name="research", retry=2)
async def research(input: str, context: StepContext) -> str:
    return f"notes about {input}"


@step(name="draft")
async def draft(input: str, context: StepContext) -> str:
    return f"article based on {context.previous}"


result = await Flow([research, draft], name="content-pipeline").run(
    "AI agent orchestration"
)

print(result.output)
print([trace.step_name for trace in result.traces])

Important data-flow rule: the first input argument is always the original flow.run(...) input. Previous step output is available as context.previous.

Parallel Flow

Wrap independent steps in a list to run them concurrently.

flow = Flow([
    plan,
    [web_research, docs_research],
    synthesize,
])

result = await flow.run("workflow frameworks")

The next step receives a dictionary keyed by step name:

{
    "web_research": "...",
    "docs_research": "..."
}

Parallel steps produce separate flat trace entries with the same parallel_group_id.

Conditional Flow

from orchflow import Flow, condition

flow = Flow([
    classify,
    condition(
        when=lambda ctx: ctx.previous == "technical",
        then=technical_writer,
        otherwise=general_writer,
    ),
])

The predicate receives the current StepContext, so routing can use context.previous, shared state, or run metadata.

Human Review

Use human_input(...) when a pipeline needs a lightweight review point without adding checkpointing, queues, or a separate UI.

from orchflow import Flow, StepContext, condition, human_input, step


@step
async def draft(input: str, context: StepContext) -> str:
    text = f"Draft about {input}"
    context.state["draft"] = text
    return text


review = human_input(
    lambda ctx: f"Review this draft:\n{ctx.previous}\n\nDecision: ",
    name="human_review",
)


@step
async def publish(input: str, context: StepContext) -> str:
    return f"Published: {context.state['draft']}"


@step
async def revise(input: str, context: StepContext) -> str:
    return f"Revision requested: {context.previous}"


flow = Flow([
    draft,
    review,
    condition(
        when=lambda ctx: str(ctx.previous).strip().lower() == "approve",
        then=publish,
        otherwise=revise,
    ),
])

By default, human_input(...) reads from stdin. Applications and tests can pass a sync or async provider(prompt, context) callback instead. The human response is normal step output, so it is available as context.previous to the next step.

Live Events

Flow.events(...) lets applications observe a workflow while it runs.

async for event in flow.events("agent observability"):
    print(event.type, event.step_name, event.attempt)

Event types:

  • flow_started
  • step_started
  • step_completed
  • step_failed
  • retry_scheduled
  • flow_completed
  • flow_failed

Events are orchestration lifecycle events, not token streaming. The final flow_completed or flow_failed event carries a FlowResult.

Trace Output

Every step attempt creates a flat StepTrace.

result = await flow.run("topic")

for trace in result.traces:
    print(trace.to_dict())

Example shape:

{
    "step_name": "draft",
    "input": "topic",
    "output": "article...",
    "error": None,
    "attempt": 1,
    "parallel_group_id": None,
    "duration_seconds": 0.42,
    "started_at": "2026-05-10T03:13:48.994932+00:00",
    "ended_at": "2026-05-10T03:13:49.414932+00:00",
    "success": True,
}

Optional LLM Agent

Core Orchflow has no runtime dependencies. The public Agent uses LiteLLM only when you install the optional extra:

pip install "orchflow[litellm]"
from orchflow import Agent

writer = Agent(
    name="writer",
    role="You write concise technical explanations.",
    model="gpt-4o-mini",
)

text = await writer.run("Explain lightweight orchestration")

Tool-calling loops, memory, and durable agent state are intentionally outside the current scope. Orchflow focuses on orchestration first.

Examples

uv run python examples/basic_sequential.py
uv run python examples/parallel_steps.py
uv run python examples/conditional_flow.py
uv run python examples/live_events.py
uv run python examples/human_review.py

Docs:

Development

uv sync --extra dev
uv run pytest
uv run ruff check
uv run ruff format --check
uv run pyright
uv build

Release Process

TestPyPI publishing is manual through .github/workflows/publish-testpypi.yml. Real PyPI publishing is tag-based through .github/workflows/publish-pypi.yml.

git tag -a v0.3.0 -m "Release v0.3.0"
git push origin v0.3.0

The release workflow verifies that the Git tag matches pyproject.toml, uploads to PyPI through trusted publishing, and creates a GitHub Release.

Roadmap

  • 0.3.x: human input polish and docs improvements
  • 0.4.0: lightweight checkpoint/resume
  • 0.5.0: richer optional agent adapters

Source Of Truth

Project decisions live in AGENTS.md. Implementation follows that document.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

orchflow-0.3.0.tar.gz (158.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

orchflow-0.3.0-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file orchflow-0.3.0.tar.gz.

File metadata

  • Download URL: orchflow-0.3.0.tar.gz
  • Upload date:
  • Size: 158.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for orchflow-0.3.0.tar.gz
Algorithm Hash digest
SHA256 799f5a12628b3ddde86b259e69e0e7499e6451a35d4de3e87baeb7790da2dc45
MD5 b9d7364dd63ffadee9ef532f916aa7ea
BLAKE2b-256 492309d8481850dfc10fd856faedfec88c7c8e6a4294c57ecf68d67f402e0a56

See more details on using hashes here.

Provenance

The following attestation bundles were made for orchflow-0.3.0.tar.gz:

Publisher: publish-pypi.yml on awesome-pro/orchflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file orchflow-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: orchflow-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 15.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for orchflow-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 27be71e02ffa497654cac70210d838ce09ba20a975f00cc4b3271a784a1d608b
MD5 61aa6dfb3ef2bfa75158de7aeb54001a
BLAKE2b-256 32ddb33624e16f8cab5230a2e565e56b95b0d520875f3ec7a7801d00d15ca074

See more details on using hashes here.

Provenance

The following attestation bundles were made for orchflow-0.3.0-py3-none-any.whl:

Publisher: publish-pypi.yml on awesome-pro/orchflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page