Define agents (tools + system prompt) in YAML and run them as an orchestrated pipeline.
Project description
agentic-flow
A tiny framework for declaring agents in YAML — each with a system prompt and a set of tools — and running them as a graph over a shared data state. Runs on Gemini (Google) by default, with Claude (Anthropic) also built in and a pluggable provider interface for the rest.
The idea
You declare a graph of nodes (each wrapping an agent or a tool) and edges that
decide the next node — or a linear pipeline: that lowers to one. The framework
owns how it runs: a scheduler walks a frontier of ready nodes, writes each
node's result to a versioned channel, checkpoints at every node boundary, and
drives the tool-use loop inside each agent.
YAML ─► build_program ─► Graph ─► runtime scheduler
entry ─► gather (agent) ─► analyze (agent) ─► write (tool) ─► __end__
channels: sources ─► claims ─► draft (value + version + reducer)
A pipeline: [a, b, c] is sugar the loader lowers to entry: a; a→b→c→__end__, so
every existing pipeline runs unchanged. The edges can also branch (when:),
loop (a back-edge), or — when the author opts in — let the model route to
the next node or replan the remaining graph.
Core modules:
| File | Responsibility |
|---|---|
tools/ |
@tool decorator + ToolRegistry (named groups); builtin/ holds the built-in tools (state_*, send_message, the todo_* board), one module per concern. |
agent.py |
Agent = system prompt + tools + the model's tool-use loop. |
graph.py |
The graph IR: Node + three Edge kinds + Graph, structural validate(), the dry-run renderer, and the replan edit ops. |
store.py |
Store = versioned channels + reducers + message log + todo board for a run. |
runtime.py |
The flat-frontier scheduler: run/resume, edge resolution (incl. route), the per-node checkpoint, replan, the budget. |
cursor_format.py |
The graph-cursor checkpoint format (CheckpointRecord). |
authority.py |
Replan (authority L2): the guard-gated planner hook + the atomic batch validator. |
loader.py |
build_program — YAML → a validated Graph + the agent map; lowers pipeline: to a graph. |
pipeline.py |
The pipeline grammar (the lowering front-end): Step / Loop types + the YAML → Step parser. |
program.py |
Orchestrator — the ergonomic from_yaml/run/resume facade over the loader + runtime. |
retry.py / limits.py |
RetryPolicy (backoff) and capacity/timeout/visit caps. |
providers/ |
Provider abstraction + gemini and anthropic implementations. |
checkpoint.py |
Checkpointer storage seam (in-memory + JSON-file) for durable runs & resume. |
An agent's tools: list accepts individual tool names or a group name — e.g.
tools: [todos] grants the whole todo board, tools: [state, send_message]
mixes a group with a single tool.
Documentation
This README is the usage guide; deeper docs live under
docs/:
| Doc | Read it for |
|---|---|
| docs/CONTEXT.md | The glossary — the domain language (with _Avoid_ synonym lists), the layering, and the key decisions. Start here for vocabulary. |
| docs/ARCHITECTURE.md | Module map, core abstractions, run lifecycle, design decisions. |
| docs/FEATURES.md | The full feature catalog — every feature, where it's configured, which example shows it. |
| docs/INTERNALS.md | The logic under the hood — the tool-use loop, exactly-once delivery, schema derivation, retries, the provider seam. |
| CHANGELOG.md | Release notes, per version. |
Visual studio (author pipelines without writing YAML by hand)
A no-build, static web app for visualizing and authoring pipeline YAML lives
in the sibling studio repo at ../studio/builder/. Open
it directly — no npm, no build, no server, and js-yaml is vendored so it works
offline:
open ../studio/builder/index.html # macOS (or: xdg-open ../studio/builder/index.html)
# or serve it: python -m http.server -d ../studio/builder 8000
It renders a pipeline: as a top-to-bottom flowchart (loops and when branches
nest visually), lets you edit agents and steps in an inspector, validates with the
same parse-time rules as the pipeline grammar, shows each step's reads/writes and
the required initial state, and round-trips to/from YAML (Open · Export · Copy).
It's authoring-only — it does not run pipelines. The public website and external
docs live alongside it under ../studio/. See ../studio/builder/README.md.
Install & run
pip install gives you the importable library (import agentic_flow) and the
agentic-flow CLI. The distribution is published on PyPI as agentlead-flow-core:
pip install agentlead-flow-core
The examples and docs referenced below are not shipped in the wheel — they live
in the GitHub repo. Clone it to run them (the examples run on Gemini, so set
GEMINI_API_KEY):
git clone https://github.com/ai-agent-lead/agentic-flow
cd agentic-flow
pip install -e . # editable install from the clone
export GEMINI_API_KEY=...
python -m examples.quickstart.run # smallest: one agent, one tool, one step
python -m examples.shared_state.run # multi-agent fan-out → fan-in over one Store
# To run an agent on Claude, set provider: anthropic on it and:
export ANTHROPIC_API_KEY=sk-ant-...
Each example is a self-contained, single-concept folder under examples/,
meant to be read in order — see examples/README.md for the
full 15-step ladder (quickstart, structured_output, tool_steps, shared_state,
refine_loop, approval, graph_branch, router, subgraph_intake, delegation,
shared_transcript, team_todos, secrets_demo, incident_replan, data_pipeline).
Four run with no API key: subgraph_intake, approval, secrets_demo, data_pipeline.
CLI
Installing the package adds an agentic-flow command. Point it at a
pipeline.yaml and its tools module:
agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools # run
agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools --dry-run # print the plan, no model
agentic-flow my/pipeline.yaml --tools my.tools --set topic="Q3 sales" # seed state
agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools --log-level DEBUG # trace via logging
--tools MODULE imports a module that registers @tool functions (repeatable);
--set KEY=VALUE seeds initial state; --dry-run builds and renders the graph
(nodes, edges grouped by source, (back-edge) flags, the authority footer) without
calling any model — a pipeline: is shown as the graph it lowers to; --log-level LEVEL routes the trace through logging. For durable runs, --checkpoint SPEC /
--run-id / --resume / --human-input* persist and resume a run (with exit codes
0/1/2) — see Durable runs & human-in-the-loop below.
Docker
docker build -t agentic-flow .
docker run --rm -e GEMINI_API_KEY agentic-flow # runs the shared_state example
# Or with docker compose (reads keys from .env):
cp .env.example .env # then put GEMINI_API_KEY in .env
docker compose run --rm agentic-flow # shared_state (default)
docker compose run --rm graph_branch # native graph: example; live-mounts examples/graph_branch (rebuild the image to pick up framework changes)
Generated artifacts land on the host under examples/<name>/out/ (mounted by
compose). The default image runs the Gemini shared_state example via the CLI.
Testing (in Docker)
The test suites run in the container, not your local Python env. They're offline (no API key), and the source is mounted, so they test your live code with no rebuild:
docker compose run --rm tests # runs every tests/test_*.py in Docker → exit 0 on success
The suites are script-style (each tests/test_*.py runs its own assertions), so
they're invoked with python, not pytest. Adding a suite? Add it to the tests
service command in docker-compose.yml.
Defining tools
Any function with type hints becomes a tool; the JSON Schema is derived for you —
the docstring summary becomes the description, Args: lines the parameter docs.
from agentic_flow import tool
@tool
def save_report(filename: str, content: str) -> str:
"""Write a report to disk and return the path.
Args:
filename: File name to write, e.g. "report.md".
content: The full text of the report.
"""
...
Register related tools under a group with @tool(group="…") so an agent can grant
them all by listing the group name (see the tools: note above).
Defining agents & the pipeline (YAML)
provider: gemini # default provider for all agents
model: gemini-3.5-flash # default model for all agents
agents:
analyst:
system: "You are a data analyst..."
tools: [fetch_sales_data, calculator]
writer:
system: "You are an executive writer..."
tools: [save_report]
pipeline:
- name: analyze
agent: analyst
input: "Analyze this year's sales..."
output: analysis # writes result to state["analysis"]
- name: write
agent: writer
input: | # {analysis} is pulled from state
Write a summary of:
{analysis}
output: summary
A pipeline: is sugar the loader lowers to a graph (so it runs on the same
engine as a hand-authored graph:). Two leaf step kinds: an agent step
(agent: + input:, model-driven) and a tool step (tool: + args:, called
directly, no model). input/args are templated with the current state ({key},
{key[field]}), so each step consumes what earlier steps produced. Per-agent
overrides: model, max_tokens, thinking, output_schema, retry,
tool_retry, limits.
Control flow — the flow isn't strictly linear. A loop step repeats a
body: until an until: condition goes truthy (or a max_iterations: cap), and a
conditional step (when: + then: / else:) branches. Conditions are plain
state templates judged for truthiness (""/"false"/"0"/"no"/"none"/"off"
are falsy) — no expression language; the boolean comes from a tool or a
structured-output agent. The refine_loop example shows a reviewer agent's
{approved} verdict driving both the loop and the final branch.
Author the graph directly when a readable sequence can't express the flow —
any-to-any handoff, cycles, fan-in, or letting the model choose. A graph: block
declares an entry, a nodes: mapping, and an edges: list (static {from, to},
conditional {from, to, when}, or model-driven {from, route: {by, to}}). The
model never owns flow by default; you opt into the authority spectrum —
route (an edge whose successor an agent picks from your candidate set) or
replan (an authority.replan: policy that lets a planner edit the not-yet-run
subgraph from your declared vocabulary). See docs/FEATURES.md §2–§2c.
pipeline:
- name: refine
loop:
max_iterations: 5 # hard cap (the runaway guard)
until: "{review[approved]}" # checked after each pass; truthy → stop early
body:
- { name: draft, agent: writer, input: "Revise. Last review: {review}", output: draft }
- { name: review, agent: reviewer, input: "Review: {draft}", output: review } # → {approved}
- name: ship
when: "{review[approved]}"
then: [ { name: publish, tool: save_report, args: { content: "{draft}" } } ]
else: [ { name: escalate, agent: editor, input: "Not yet: {review}" } ]
Durable runs & human-in-the-loop
A run can be checkpointed and resumed later — across a crash or a pause for
a person. The engine checkpoints at every node boundary, so a crash resumes at
the last completed node (inside loops and branches too). A human: node (a
{prompt, output} mapping) pauses the run: the runtime checkpoints a marker and
raises NodePaused; the host collects the answer and calls resume(...), which
writes it to store[output] and continues. (A human: node needs a checkpointer —
a pause is durable by definition — and is valid anywhere, including inside loops and
branches.)
pipeline:
- name: draft
tool: make_draft
args: { topic: "{topic}" }
output: draft
- name: approve # the human gate — pauses the run here
human:
prompt: "Approve this draft? Reply 'approve' to publish:\n\n{draft}"
output: decision # resume() writes the human's answer here
- name: ship
when: "{decision}" # truthiness branch on the verdict
then: [ { name: publish, tool: publish, args: { draft: "{draft}" } } ]
else: [ { name: shelve, tool: shelve, args: { draft: "{draft}" } } ]
From code — pass a run_id + a Checkpointer to run, catch the pause, and
resume with the answer (here InMemoryCheckpointer, so it runs in one process):
from agentic_flow import Orchestrator, InMemoryCheckpointer, NodePaused, registry
orch = Orchestrator.from_yaml("pipeline.yaml", registry)
cp = InMemoryCheckpointer() # or JsonFileCheckpointer("./runs")
try:
orch.run({"topic": "Q3"}, run_id="R1", checkpointer=cp) # checkpoints at every node boundary
except NodePaused as paused:
print(paused.prompt) # show the human what to answer
answer = "approve" # …collected out of band…
store = orch.resume("R1", cp, human_input=answer) # injects + continues
print(store["result"])
From the CLI — name a backend with --checkpoint; a paused run exits 2 and
prints a copy-pasteable resume command:
agentic-flow pipeline.yaml --tools my.tools --checkpoint json:./runs --run-id R1 # run → exit 2 (paused)
agentic-flow pipeline.yaml --tools my.tools --checkpoint json:./runs --run-id R1 \
--resume --human-input "approve" # resume → exit 0 (done)
Exit codes: 0 completed, 1 error, 2 paused. The answer can also come as
--human-input-json (parsed JSON) or --human-input-file; on a TTY, --resume
prompts for it interactively. Channel values are JSON-serializable and
exactly-once delivery survives a resume. A runnable, no-key
demo: python -m examples.approval.run. More in
docs/FEATURES.md §16–§17 and
docs/INTERNALS.md §12.
Providers (Gemini & Anthropic)
gemini (Google) is the default; anthropic (Claude) is also built in. Set a
pipeline default and override per agent:
provider: gemini # pipeline default
model: gemini-3.5-flash
agents:
analyst: # uses the defaults above
system: "..."
auditor:
provider: anthropic # this agent runs on Claude
model: claude-opus-4-8 # optional — omit to use the provider's default
system: "..."
Agents on different providers share one Store and can delegate to each other, so
one graph can mix both. Credentials come from the environment: GEMINI_API_KEY
(or GOOGLE_API_KEY) / ANTHROPIC_API_KEY. Add a provider by implementing the
four-method Provider interface and calling register_provider("name", Cls).
Gemini caveat:
output_schema(JSON mode) and tool use don't combine on one Gemini agent — split into two agents, or run that agent on Anthropic.
Agent & pipeline features
Each is covered in depth in docs/FEATURES.md; the essentials:
Structured output — give an agent an output_schema (JSON Schema) and its
answer comes back as a dict (the framework adds additionalProperties: false
for you). Later steps address fields directly: {report[title]}.
Agents as tools (delegation) — list other agents under an agent's agents:
key and they become tools it can call; the sub-agent runs its own loop and shares
the same Store. Cycles are bounded by Agent.MAX_STEPS.
reporter:
tools: [save_report]
agents:
- analyst # bare name → tool "analyst"
- { name: analyst, as: crunch, description: "..." } # or rename
Shared state & messaging — one Store (versioned channels + message log + todo
board) is threaded through every agent, tool, and sub-agent. Three ways to move
data: node output: keys (read via {key}), the model-driven built-ins (state_*,
send_message/read_messages, the todo_* board), and a state/ctx parameter
injected into any @tool (hidden from the model's schema). Messages and assigned
todos use exactly-once push delivery — surfaced into the recipient's prompt
without polling. A graph: agent node can also add emit: <transcript-channel>
(an add_messages channel): its answer is appended as a turn {id, sender, content}
a later node reads via {transcript} templating — a shared, replayable blackboard
alongside the push log.
Secrets — a credential is named in YAML, never embedded. A top-level
secrets: block maps a local name to a pluggable backend ({provider: env, var: X} /
{provider: file, path: P} / shorthand name: VAR); the value is fetched late at
point-of-use, wrapped in a masked Secret (prints Secret(****)), and never written
to the Store — so it can't reach a checkpoint. Reference it as {secret: name} in an
agent's api_key: or an MCP server's env:/headers: (which also accept ${VAR} /
${VAR:-default}), or read it in a tool via ctx.secrets.get(name).reveal(). env and
file backends ship; a vault/cloud backend is a registry entry away. A typo'd reference
or unknown backend fails at load; a missing var/file is a clear error. See
docs/FEATURES.md §3b (offline demo: python -m examples.secrets_demo.run).
Retries & limits — a retry: policy (attempts + exponential backoff) at two
scopes: tool retry (per-@tool(retry=…) or an agent's tool_retry:) and
agent retry (retry:, defaulting to transient API errors + OutputParseError;
refusals aren't retried). A limits: block caps steps, output tokens, and
wall-clock — per agent and for the whole run (plus a per-node max_visits cycle bound).
⚠️ Agent retry re-runs tools already called — for side-effecting tools, prefer tool retry.
Driving it from code
from agentic_flow import Orchestrator, registry
orch = Orchestrator.from_yaml("examples/shared_state/pipeline.yaml", registry)
store = orch.run({"product": "Orbit"}) # seed initial state if you want
print(store["summary"]) # Store is dict-like; store.log / store.todos too
Pass on_event=... to from_yaml to trace every node, tool call, and result.
Observability
on_event(event, data) is the single seam — every node_*, tool_*, retry,
checkpoint, human, route, replan / replan_rejected, and
messages_delivered flows through it. Three sinks ship on top:
- Console printer —
console_tracer()(agentic_flow/console.py), the one console sink the CLI andexamples/_common.pyshare. - Standard
logging—event_logger()returns anon_eventhook (INFO for node boundaries, WARNING for retries/tool errors, DEBUG for per-call detail); callconfigure_logging()to opt in. From the CLI:--log-level DEBUG. EventBus— a pub/sub fan-out that is anon_eventhook, with filtered subscribers and a thread-safe real-timestream()for watching a run live:
from agentic_flow import EventBus, Orchestrator, registry
bus = EventBus()
orch = Orchestrator.from_yaml("pipeline.yaml", registry, on_event=bus)
bus.run_in_background(lambda: orch.run(initial)) # closes the bus when the run ends
for event, data in bus.stream(): # blocks, yields events live
if event == "tool_result":
print(data["agent"], data["tool"], "→", data["result"])
A runnable demo: python -m examples.team_todos.run_stream. The logging/bus internals
are in docs/INTERNALS.md.
Extending
- New tools — add
@toolfunctions in any imported module (group withgroup=). - New agents / nodes / edges — edit the YAML; no code change.
- Delegation / structured output / retries / limits — set
agents:/output_schema/retry:/limits:on an agent. - Shared state — the
state_*/send_messagetools, or astate/ctxparam on your own tools. - New providers — implement the
Providerinterface andregister_provider. - Observability — pass
on_event=...; route tologging(event_logger()) or anEventBus. - Branching / loops — a
pipeline:'sloop:/when:(lowered to a back-edge + conditional edges), or authorgraph:conditional/back edges directly; conditions are state templates judged for truthiness. - Let the model drive flow — add a
route:on an edge (the model picks the next node) or anauthority.replan:policy (the model rewrites the not-yet-run subgraph) — both compose only from your declared vocabulary. - Pause for a human / persist & resume — add a
human:({prompt, output}) node and run withrun(run_id=, checkpointer=); catchNodePausedandresume(..., human_input=...). Add a backend by subclassingCheckpointer+register_checkpointer.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentlead_flow_core-0.1.1.tar.gz.
File metadata
- Download URL: agentlead_flow_core-0.1.1.tar.gz
- Upload date:
- Size: 128.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
23dd9af2289b919b5523f4b2a2d1773ca11692635c79b69a40194b6d57465690
|
|
| MD5 |
ce75381ee7ba825bcd1ccd6d25b587a7
|
|
| BLAKE2b-256 |
a20cab00fdb3706b791ecab25b842b70473f252c7cf25210287155469e8c96b2
|
File details
Details for the file agentlead_flow_core-0.1.1-py3-none-any.whl.
File metadata
- Download URL: agentlead_flow_core-0.1.1-py3-none-any.whl
- Upload date:
- Size: 126.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6c3e0f87b35dcaa4718b0e8b209d41bb51e7fb566943c3f9c3c8c35c21e9f14d
|
|
| MD5 |
cd5bf446fe8cc0c3483cc4e144b71795
|
|
| BLAKE2b-256 |
79b18841974ba2a6aa4ef60c9e616dad70fafdb5e5f37845271812e96dd7f048
|