The orchestration engine for production-grade agentic workflows
Project description
Graflow
Stop wrestling graph composition. Start writing Pythonic agent pipelines. Compose tasks with >> and |.
Graflow is a Python-based agent orchestration framework with a workflow-first, defined-by-run approach focused on simplicity and developer experience. Compose tasks — from ETL jobs to multi-agent LLM pipelines — using plain Python functions and two intuitive operators.
Why Graflow?
Most agent frameworks make you think in framework concepts — state machines, conditional edges, graph builders, node factories. Graflow lets you think in Python. Define tasks, connect them with >> and |, and run.
uv add graflow # or: pip install graflow
from graflow import task, workflow
@task
def fetch():
return {"users": 1200}
@task
def validate(users: int): # auto-resolved from upstream channel
assert users > 0
return users
@task
def notify(users: int):
print(f"Pipeline done: {users} users processed")
with workflow("my_pipeline") as ctx:
fetch >> validate >> notify # that's it
ctx.execute("fetch")
3 decorators, 1 operator, zero boilerplate.
Try hands-on in the browser — no install required:
Before / After
Tired of writing add_conditional_edges, builder patterns, and explicit state objects? Here's the same agent loop in LangGraph vs. Graflow.
Define-and-compile vs. define-by-run. LangGraph makes you declare every edge and conditional transition before execution — the graph is frozen at
compile()time, and all possible routes must be known upfront. Graflow is define-by-run (the same paradigm PyTorch uses vs. TensorFlow 1.x): the graph shape is whatever the Python code does at runtime. Control flow is justif/return/context.next_task()— no separate routing DSL, noENDsentinels, no conditional-edge registry. You debug workflows withpdb, not with a graph inspector.
| LangGraph (conditional routing) | Graflow |
|---|---|
from langgraph.graph import StateGraph, END
builder = StateGraph(State)
builder.add_node("agent", agent_node)
builder.add_node("tool", tool_node)
builder.add_conditional_edges(
"agent",
should_continue,
{"continue": "tool", "end": END},
)
builder.add_edge("tool", "agent")
builder.set_entry_point("agent")
graph = builder.compile()
result = graph.invoke({"input": "hello"})
|
from graflow import task, workflow
with workflow("agent_loop") as ctx:
@task(inject_context=True)
def agent(context):
out = call_llm(context.get_channel().get("input"))
if not should_continue(out):
context.terminate_workflow("done")
return out
@task
def tool(out: str):
return run_tool(out)
agent >> tool >> agent # loop
ctx.execute("agent")
|
Key Features
| Feature | What it does |
|---|---|
>> and | operators |
Chain tasks sequentially or run them in parallel — no graph builder API |
| Auto data flow | Task parameters are automatically resolved from upstream channels |
| Fan-out / Fan-in | fetch >> (transform_a | transform_b) >> store — diamond patterns in one line |
| Dynamic tasks | Add tasks at runtime via ctx.next_task() / ctx.next_iteration() — true defined-by-run |
| Distributed execution | Swap to Redis-backed queues with one config change; add workers with python -m graflow.worker.main |
| Checkpoint & resume | Pause long-running workflows, resume from exact state |
| Human-in-the-loop | Approvals, text input, selections mid-workflow — with Slack/webhook notifications |
| LLM agents | Plug in LiteLLM, Pydantic AI, or Google ADK agents and inject them into tasks |
| Tracing | Langfuse integration for observability across distributed workflows |
Workflow Patterns
Linear pipeline: a >> b >> c >> d
Fan-out: source >> (proc_a | proc_b | proc_c)
Fan-in: (input_a | input_b) >> aggregate
Diamond: fetch >> (transform_a | transform_b) >> store
Multi-stage: (load_a | load_b) >> validate >> (proc_a | proc_b) >> store
A diamond workflow visualized:
graph LR
fetch --> transform_a
fetch --> transform_b
transform_a --> store
transform_b --> store
Real-World Snippets
ETL Pipeline
with workflow("etl") as ctx:
@task
def extract():
return db.query("SELECT * FROM raw_events")
@task
def validate(events: list):
return [e for e in events if e["timestamp"]]
@task
def load(events: list):
warehouse.bulk_insert(events)
extract >> validate >> load
ctx.execute("extract")
Parallel Enrichment
with workflow("enrich") as ctx:
fetch_data >> (enrich_demographics | enrich_geo | enrich_behavioral) >> merge_and_store
ctx.execute("fetch_data")
Multi-Agent LLM Workflow
Register LLM agents once, then inject them directly into tasks with inject_llm_agent="<name>" — no manual lookup, no boilerplate.
from graflow import task, workflow
from graflow.llm.agents.base import LLMAgent
with workflow("research") as ctx:
# Register agents once; factories receive the ExecutionContext
ctx.register_llm_agent("researcher", lambda _: build_researcher_agent())
ctx.register_llm_agent("analyst", lambda _: build_analyst_agent())
@task(inject_llm_agent="researcher")
def research(agent: LLMAgent):
return agent.run("Summarize recent ML papers")["output"]
@task(inject_llm_agent="analyst")
def analyze(agent: LLMAgent, research: str): # `research` auto-resolved from channel
return agent.run(f"Analyze: {research}")["output"]
@task
def publish(analyze: str):
send_report(analyze)
research >> analyze >> publish
ctx.execute("research")
Dynamic Workflows: next_task and next_iteration
Graflow is defined-by-run — tasks can spawn new tasks or loop themselves based on runtime data.
next_task — conditionally add a downstream task at runtime:
from graflow import task, workflow
from graflow.core.task import TaskWrapper
with workflow("router") as ctx:
@task(id="classify", inject_context=True)
def classify(context, value: int = 150):
if value > 100:
context.next_task(TaskWrapper("high", lambda: print(f"high-value: {value * 2}")))
else:
context.next_task(TaskWrapper("low", lambda: print(f"low-value: {value + 10}")))
ctx.execute("classify")
next_task(..., goto=True) — branch to a task and skip the statically-declared successors:
with workflow("pipeline") as ctx:
@task(id="validate", inject_context=True)
def validate(context, record: dict):
if not record.get("ok"):
# Jump to error handler and SKIP process/publish below
context.next_task(error_handler, goto=True)
return
# Fall through: process >> publish will run normally
@task
def process(): ...
@task
def publish(): ...
@task
def error_handler():
print("recovery path")
validate >> process >> publish
ctx.execute("validate", value={"ok": False})
Semantics at a glance:
| Call | New task? | Runs static successors? |
|---|---|---|
next_task(new_task) |
creates | ✅ yes (normal fan-out) |
next_task(existing_task) |
— | ❌ no (auto-goto) |
next_task(task, goto=True) |
either | ❌ no (explicit goto) |
Use goto=True for error recovery, early exits, or replacing the rest of the pipeline with a different branch — it's the Graflow equivalent of a conditional edge pointing at END and then jumping elsewhere, in one line.
next_iteration — loop the current task with new data until a convergence condition:
with workflow("optimize") as ctx:
@task(id="optimizer", inject_context=True, max_cycles=20)
def optimize(context, data=None):
params = data or {"iteration": 0, "accuracy": 0.5}
params = {"iteration": params["iteration"] + 1,
"accuracy": min(params["accuracy"] + 0.15, 0.95)}
print(f"Iter {params['iteration']}: accuracy={params['accuracy']:.2f}")
if params["accuracy"] >= 0.9:
return params # converged → workflow ends
context.next_iteration(params) # loop with new data
ctx.execute("optimizer")
Use next_task for branching, next_iteration for cycles — both respect checkpointing, retries, and tracing.
Run Tasks in Docker
Need environment isolation, pinned Python versions, or GPU runtimes? Tag a task with handler="<name>" and register a DockerTaskHandler bound to that name — the same workflow code now runs each task inside a container.
Each handler is bound to one image, so pick a container per task by registering multiple handlers under distinct names:
from graflow import task, workflow
from graflow.core.context import ExecutionContext
from graflow.core.engine import WorkflowEngine
from graflow.core.handlers.docker import DockerTaskHandler
with workflow("docker_pipeline") as wf:
@task
def local_step():
return "ran on host"
@task(handler="py311") # -> python:3.11-slim
def etl_step():
import sys
return f"etl on python {sys.version_info[:2]}"
@task(handler="pytorch_gpu") # -> pytorch/pytorch:latest
def train_step():
import torch
return f"cuda available: {torch.cuda.is_available()}"
local_step >> etl_step >> train_step
engine = WorkflowEngine()
# Register one handler per image; task's `handler=` picks which one
engine.register_handler("py311", DockerTaskHandler(image="python:3.11-slim"))
engine.register_handler("pytorch_gpu", DockerTaskHandler(image="pytorch/pytorch:latest"))
engine.execute(ExecutionContext.create(wf.graph))
Mix and match freely: CPU vs. GPU, Python 3.10 vs. 3.12, custom-built images with pre-installed dependencies — it's all just "register a handler, tag the task."
💡 Custom handlers.
dockeris just one built-in handler. You can implement your ownTaskHandlerfor Kubernetes Jobs, AWS Lambda, Modal, Ray, SSH-to-remote-host, or any other execution backend — same workflow code, custom runtime. See the Task Handlers guide for the handler interface and full examples.
Scale to Distributed
The same workflow code runs locally or across a Redis-backed worker pool — just change the backend.
from graflow.core.context import create_execution_context
from graflow.queue.factory import QueueBackend
context = create_execution_context(
queue_backend=QueueBackend.REDIS,
channel_backend="redis",
)
# Start workers in separate terminals
python -m graflow.worker.main --worker-id worker-1
python -m graflow.worker.main --worker-id worker-2
Documentation
- Product Documentation — Guides, API reference, architecture
- Examples — Production-ready examples organized by complexity
- CHANGELOG
Contributing
Contributions are welcome — see CONTRIBUTING.md for setup, development workflow, and PR guidelines.
Ways to contribute:
- 🐛 Report bugs or request features via GitHub Issues
- 🔌 Add a new backend (queue / channel / handler) — follow the factory pattern in
graflow/queue/andgraflow/channels/ - 📚 Contribute an example to
examples/— especially real-world agent/LLM workflows - 📖 Improve the docs at graflow.ai
- 💬 Discuss on r/GraflowAI
License
This project is licensed under the Apache License 2.0 — see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file graflow-0.1.9.tar.gz.
File metadata
- Download URL: graflow-0.1.9.tar.gz
- Upload date:
- Size: 155.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
599f992064b04aab6af8b2026337d627108fd4410be9b801db9cf5f62f0a12c4
|
|
| MD5 |
6479bae0d4fbbaf212c9fcec546ac81d
|
|
| BLAKE2b-256 |
e533ba8d3686603701421d310b9f8d5e06b34471f91c3c86fa112fc645f80131
|
File details
Details for the file graflow-0.1.9-py3-none-any.whl.
File metadata
- Download URL: graflow-0.1.9-py3-none-any.whl
- Upload date:
- Size: 202.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e2ae971cde87ed705383a8adc2e46ccc55c2bc84a91423bd8fcddd419c42584d
|
|
| MD5 |
5da5a622f140ae215fa03670f7cd7b13
|
|
| BLAKE2b-256 |
eb3e3d5bf62782427ccc3524e3563bc44bd973c7dddbe5d4e9375a1434fe1f5c
|