Skip to main content

A lightweight Python framework for readable multi-agent pipelines.

Project description

Orchflow

PyPI version Python versions CI License

Orchflow is a lightweight Python framework for readable multi-agent pipelines. It gives you sequential, parallel, conditional, retryable, and observable orchestration, with lightweight human review and JSON resume, without forcing every workflow into a heavy graph runtime.

pip install orchflow
from orchflow import Flow, StepContext, step


@step
async def research(input: str, context: StepContext) -> str:
    return f"research about {input}"


@step
async def write(input: str, context: StepContext) -> str:
    return f"draft based on {context.previous}"


result = await Flow([research, write]).run("agent orchestration")
print(result.output)

Why Orchflow Exists

Plain Python function chaining is easy to read, but it becomes fragile as soon as a workflow needs retries, parallel work, branching, shared state, or traces. Large graph frameworks are powerful, but they can add more abstraction than a small agent pipeline needs.

Orchflow sits in the middle: the user writes normal Python functions, while the framework handles orchestration mechanics that should be reliable and inspectable.

flowchart LR
    A["flow.run(input)"] --> B["Step: research"]
    B --> C{"Parallel group"}
    C --> D["Step: web_research"]
    C --> E["Step: docs_research"]
    D --> F["Merge outputs"]
    E --> F
    F --> G{"Condition"}
    G --> H["technical_writer"]
    G --> I["general_writer"]
    H --> J["FlowResult + traces"]
    I --> J

What It Demonstrates

Orchflow is intentionally small, but it is built like a real package:

  • Async-first execution with sync-step support through worker threads
  • Sequential pipelines, parallel fan-out, and conditional routing
  • Retry policy at both flow and step level
  • Shared run state with explicit StepContext
  • Flat StepTrace records for every attempt, including failures
  • Live lifecycle events with Flow.events(...)
  • Lightweight human input gates with callback or stdin providers
  • JSON checkpoints and resume for practical long-running workflows
  • Optional LiteLLM-backed Agent without making LiteLLM a core dependency
  • Structured agent outputs with JSON schema or optional Pydantic models
  • Offline test helpers under orchflow.testing
  • Typed package metadata, CI, TestPyPI/PyPI release workflows, and tag releases

Core Concepts

Orchflow keeps the public model deliberately small.

Concept Purpose
Agent Stateless role-based LLM helper with optional LiteLLM support
AgentConfig Typed provider configuration for Agent calls
@step Decorator for a unit of workflow work
StepContext Carries previous output, original input, metadata, and shared state
Flow Orchestrates sequential, parallel, and conditional execution
FlowResult Final output, traces, state, timing, and failure details
FlowEvent Live lifecycle event emitted while a flow runs
human_input Step helper for pausing a flow and collecting reviewer text
JsonCheckpointStore Local JSON checkpoint store for resume

Sequential Flow

from orchflow import Flow, StepContext, step


@step(name="research", retry=2)
async def research(input: str, context: StepContext) -> str:
    return f"notes about {input}"


@step(name="draft")
async def draft(input: str, context: StepContext) -> str:
    return f"article based on {context.previous}"


result = await Flow([research, draft], name="content-pipeline").run(
    "AI agent orchestration"
)

print(result.output)
print([trace.step_name for trace in result.traces])

Important data-flow rule: the first input argument is always the original flow.run(...) input. Previous step output is available as context.previous.

Parallel Flow

Wrap independent steps in a list to run them concurrently.

flow = Flow([
    plan,
    [web_research, docs_research],
    synthesize,
])

result = await flow.run("workflow frameworks")

The next step receives a dictionary keyed by step name:

{
    "web_research": "...",
    "docs_research": "..."
}

Parallel steps produce separate flat trace entries with the same parallel_group_id.

Conditional Flow

from orchflow import Flow, condition

flow = Flow([
    classify,
    condition(
        when=lambda ctx: ctx.previous == "technical",
        then=technical_writer,
        otherwise=general_writer,
    ),
])

The predicate receives the current StepContext, so routing can use context.previous, shared state, or run metadata.

Human Review

Use human_input(...) when a pipeline needs a lightweight review point without adding checkpointing, queues, or a separate UI.

from orchflow import Flow, StepContext, condition, human_input, step


@step
async def draft(input: str, context: StepContext) -> str:
    text = f"Draft about {input}"
    context.state["draft"] = text
    return text


review = human_input(
    lambda ctx: f"Review this draft:\n{ctx.previous}\n\nDecision: ",
    name="human_review",
)


@step
async def publish(input: str, context: StepContext) -> str:
    return f"Published: {context.state['draft']}"


@step
async def revise(input: str, context: StepContext) -> str:
    return f"Revision requested: {context.previous}"


flow = Flow([
    draft,
    review,
    condition(
        when=lambda ctx: str(ctx.previous).strip().lower() == "approve",
        then=publish,
        otherwise=revise,
    ),
])

By default, human_input(...) reads from stdin. Applications and tests can pass a sync or async provider(prompt, context) callback instead. The human response is normal step output, so it is available as context.previous to the next step.

Checkpoint And Resume

Use JsonCheckpointStore when a flow should survive a transient failure without rerunning completed top-level work.

from orchflow import Flow, JsonCheckpointStore

store = JsonCheckpointStore("orchflow-checkpoint.json")
flow = Flow([collect, draft, publish], name="checkpointed-pipeline")

first = await flow.run(
    "AI agent orchestration",
    checkpoint=store,
    raise_on_error=False,
)

if not first.success:
    resumed = await flow.resume(store)
    print(resumed.output)

Checkpoints are plain JSON and are saved after each completed top-level item: single steps, selected condition branches, or complete parallel groups. A failed parallel group resumes by rerunning the whole group. Successful flows keep the checkpoint file and mark it completed for inspection.

Live Events

Flow.events(...) lets applications observe a workflow while it runs.

async for event in flow.events("agent observability"):
    print(event.type, event.step_name, event.attempt)

Event types:

  • flow_started
  • step_started
  • step_completed
  • step_failed
  • retry_scheduled
  • flow_completed
  • flow_failed
  • checkpoint_saved
  • checkpoint_loaded

Events are orchestration lifecycle events, not token streaming. The final flow_completed or flow_failed event carries a FlowResult.

Trace Output

Every step attempt creates a flat StepTrace.

result = await flow.run("topic")

for trace in result.traces:
    print(trace.to_dict())

Example shape:

{
    "step_name": "draft",
    "input": "topic",
    "output": "article...",
    "error": None,
    "attempt": 1,
    "parallel_group_id": None,
    "duration_seconds": 0.42,
    "started_at": "2026-05-10T03:13:48.994932+00:00",
    "ended_at": "2026-05-10T03:13:49.414932+00:00",
    "success": True,
}

Optional LLM Agent

Core Orchflow has no runtime dependencies. The public Agent uses LiteLLM only when you install the optional extra:

pip install "orchflow[litellm]"
from orchflow import Agent

writer = Agent(
    name="writer",
    role="You write concise technical explanations.",
    model="gpt-4o-mini",
)

text = await writer.run("Explain lightweight orchestration")

For structured workflows, use AgentConfig and run_structured(...):

from orchflow import Agent, AgentConfig

extractor = Agent(
    name="extractor",
    role="Extract structured data. Return only JSON.",
    config=AgentConfig(model="gpt-4o-mini", temperature=0),
)

person = await extractor.run_structured(
    "Ada works at OpenAI.",
    schema={
        "title": "person",
        "type": "object",
        "properties": {
            "name": {"type": "string"},
            "company": {"type": "string"},
        },
        "required": ["name", "company"],
    },
)

run_structured(...) returns parsed JSON for schema dictionaries and a Pydantic model instance when a Pydantic model class is passed. Tool-calling loops, memory, and durable agent state are intentionally outside the current scope. Orchflow focuses on orchestration first.

Examples

uv run python examples/basic_sequential.py
uv run python examples/parallel_steps.py
uv run python examples/conditional_flow.py
uv run python examples/live_events.py
uv run python examples/human_review.py
uv run python examples/checkpoint_resume.py

Optional LiteLLM-backed examples after installing orchflow[litellm] and configuring a provider API key:

uv run python examples/litellm_agent.py
uv run python examples/structured_agent.py

Docs:

Development

uv sync --extra dev
uv run pytest
uv run ruff check
uv run ruff format --check
uv run pyright
uv build

Release Process

TestPyPI publishing is manual through .github/workflows/publish-testpypi.yml. Real PyPI publishing is tag-based through .github/workflows/publish-pypi.yml.

git tag -a v0.5.0 -m "Release v0.5.0"
git push origin v0.5.0

The release workflow verifies that the Git tag matches pyproject.toml, uploads to PyPI through trusted publishing, and creates a GitHub Release.

Roadmap

  • 0.5.x: structured agent polish and docs improvements
  • 0.6.0: evaluate one-turn tool execution

Source Of Truth

Project decisions live in AGENTS.md. Implementation follows that document.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

orchflow-0.5.0.tar.gz (167.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

orchflow-0.5.0-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file orchflow-0.5.0.tar.gz.

File metadata

  • Download URL: orchflow-0.5.0.tar.gz
  • Upload date:
  • Size: 167.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for orchflow-0.5.0.tar.gz
Algorithm Hash digest
SHA256 c113e8eca7b289bbe25b1e4334339ab2cae9635b81c3877991cb6df16570ad2d
MD5 76ee405d3b73c2f554991561eaa290d1
BLAKE2b-256 80444c1ee6ea0b2c5335259ee52437283d0c7951ef5161d1293c02a26e3b7970

See more details on using hashes here.

Provenance

The following attestation bundles were made for orchflow-0.5.0.tar.gz:

Publisher: publish-pypi.yml on awesome-pro/orchflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file orchflow-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: orchflow-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 20.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for orchflow-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7a7dda063517cbfcb23037a91dcd7d4bb3e28498dc202b3f565c128255278164
MD5 138e1a365644662a36176fec85ee5b93
BLAKE2b-256 02c1442a4ff26ef9b7f0dce0eb3422e972aa1468b79b3fd11dc1d649e0753b35

See more details on using hashes here.

Provenance

The following attestation bundles were made for orchflow-0.5.0-py3-none-any.whl:

Publisher: publish-pypi.yml on awesome-pro/orchflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page