Skip to main content

Define agents (tools + system prompt) in YAML and run them as an orchestrated pipeline.

Project description

agentic-flow

A tiny framework for declaring agents in YAML — each with a system prompt and a set of tools — and running them as a graph over a shared data state. Runs on Gemini (Google) by default, with Claude (Anthropic) also built in and a pluggable provider interface for the rest.

The idea

You declare a graph of nodes (each wrapping an agent or a tool) and edges that decide the next node — or a linear pipeline: that lowers to one. The framework owns how it runs: a scheduler walks a frontier of ready nodes, writes each node's result to a versioned channel, checkpoints at every node boundary, and drives the tool-use loop inside each agent.

YAML  ─►  build_program  ─►  Graph  ─►  runtime scheduler
   entry ─► gather (agent) ─► analyze (agent) ─► write (tool) ─► __end__
                channels:  sources ─► claims ─► draft   (value + version + reducer)

A pipeline: [a, b, c] is sugar the loader lowers to entry: a; a→b→c→__end__, so every existing pipeline runs unchanged. The edges can also branch (when:), loop (a back-edge), or — when the author opts in — let the model route to the next node or replan the remaining graph.

Core modules:

File Responsibility
tools/ @tool decorator + ToolRegistry (named groups); builtin/ holds the built-in tools (state_*, send_message, the todo_* board), one module per concern.
agent.py Agent = system prompt + tools + the model's tool-use loop.
graph.py The graph IR: Node + three Edge kinds + Graph, structural validate(), the dry-run renderer, and the replan edit ops.
store.py Store = versioned channels + reducers + message log + todo board for a run.
runtime.py The flat-frontier scheduler: run/resume, edge resolution (incl. route), the per-node checkpoint, replan, the budget.
cursor_format.py The graph-cursor checkpoint format (CheckpointRecord).
authority.py Replan (authority L2): the guard-gated planner hook + the atomic batch validator.
loader.py build_program — YAML → a validated Graph + the agent map; lowers pipeline: to a graph.
pipeline.py The pipeline grammar (the lowering front-end): Step / Loop types + the YAML → Step parser.
program.py Orchestrator — the ergonomic from_yaml/run/resume facade over the loader + runtime.
retry.py / limits.py RetryPolicy (backoff) and capacity/timeout/visit caps.
providers/ Provider abstraction + gemini and anthropic implementations.
checkpoint.py Checkpointer storage seam (in-memory + JSON-file) for durable runs & resume.

An agent's tools: list accepts individual tool names or a group name — e.g. tools: [todos] grants the whole todo board, tools: [state, send_message] mixes a group with a single tool.

Documentation

This README is the usage guide; deeper docs live under docs/:

Doc Read it for
docs/CONTEXT.md The glossary — the domain language (with _Avoid_ synonym lists), the layering, and the key decisions. Start here for vocabulary.
docs/ARCHITECTURE.md Module map, core abstractions, run lifecycle, design decisions.
docs/FEATURES.md The full feature catalog — every feature, where it's configured, which example shows it.
docs/INTERNALS.md The logic under the hood — the tool-use loop, exactly-once delivery, schema derivation, retries, the provider seam.
CHANGELOG.md Release notes, per version.

Visual studio (author pipelines without writing YAML by hand)

A no-build, static web app for visualizing and authoring pipeline YAML lives in the sibling studio repo at ../studio/builder/. Open it directly — no npm, no build, no server, and js-yaml is vendored so it works offline:

open ../studio/builder/index.html          # macOS  (or: xdg-open ../studio/builder/index.html)
# or serve it:  python -m http.server -d ../studio/builder 8000

It renders a pipeline: as a top-to-bottom flowchart (loops and when branches nest visually), lets you edit agents and steps in an inspector, validates with the same parse-time rules as the pipeline grammar, shows each step's reads/writes and the required initial state, and round-trips to/from YAML (Open · Export · Copy). It's authoring-only — it does not run pipelines. The public website and external docs live alongside it under ../studio/. See ../studio/builder/README.md.

Install & run

pip install gives you the importable library (import agentic_flow) and the agentic-flow CLI. The distribution is published on PyPI as agentlead-flow-core:

pip install agentlead-flow-core

The examples and docs referenced below are not shipped in the wheel — they live in the GitHub repo. Clone it to run them (the examples run on Gemini, so set GEMINI_API_KEY):

git clone https://github.com/ai-agent-lead/agentic-flow
cd agentic-flow
pip install -e .                 # editable install from the clone
export GEMINI_API_KEY=...
python -m examples.quickstart.run        # smallest: one agent, one tool, one step
python -m examples.shared_state.run      # multi-agent fan-out → fan-in over one Store

# To run an agent on Claude, set provider: anthropic on it and:
export ANTHROPIC_API_KEY=sk-ant-...

Each example is a self-contained, single-concept folder under examples/, meant to be read in order — see examples/README.md for the full 15-step ladder (quickstart, structured_output, tool_steps, shared_state, refine_loop, approval, graph_branch, router, subgraph_intake, delegation, shared_transcript, team_todos, secrets_demo, incident_replan, data_pipeline). Four run with no API key: subgraph_intake, approval, secrets_demo, data_pipeline.

CLI

Installing the package adds an agentic-flow command. Point it at a pipeline.yaml and its tools module:

agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools             # run
agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools --dry-run   # print the plan, no model
agentic-flow my/pipeline.yaml --tools my.tools --set topic="Q3 sales"                            # seed state
agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools --log-level DEBUG  # trace via logging

--tools MODULE imports a module that registers @tool functions (repeatable); --set KEY=VALUE seeds initial state; --dry-run builds and renders the graph (nodes, edges grouped by source, (back-edge) flags, the authority footer) without calling any model — a pipeline: is shown as the graph it lowers to; --log-level LEVEL routes the trace through logging. For durable runs, --checkpoint SPEC / --run-id / --resume / --human-input* persist and resume a run (with exit codes 0/1/2) — see Durable runs & human-in-the-loop below.

Docker

docker build -t agentic-flow .
docker run --rm -e GEMINI_API_KEY agentic-flow      # runs the shared_state example

# Or with docker compose (reads keys from .env):
cp .env.example .env                                # then put GEMINI_API_KEY in .env
docker compose run --rm agentic-flow                # shared_state (default)
docker compose run --rm graph_branch                # native graph: example; live-mounts examples/graph_branch (rebuild the image to pick up framework changes)

Generated artifacts land on the host under examples/<name>/out/ (mounted by compose). The default image runs the Gemini shared_state example via the CLI.

Testing (in Docker)

The test suites run in the container, not your local Python env. They're offline (no API key), and the source is mounted, so they test your live code with no rebuild:

docker compose run --rm tests        # runs every tests/test_*.py in Docker → exit 0 on success

The suites are script-style (each tests/test_*.py runs its own assertions), so they're invoked with python, not pytest. Adding a suite? Add it to the tests service command in docker-compose.yml.

Defining tools

Any function with type hints becomes a tool; the JSON Schema is derived for you — the docstring summary becomes the description, Args: lines the parameter docs.

from agentic_flow import tool

@tool
def save_report(filename: str, content: str) -> str:
    """Write a report to disk and return the path.

    Args:
        filename: File name to write, e.g. "report.md".
        content: The full text of the report.
    """
    ...

Register related tools under a group with @tool(group="…") so an agent can grant them all by listing the group name (see the tools: note above).

Defining agents & the pipeline (YAML)

provider: gemini                 # default provider for all agents
model: gemini-3.5-flash          # default model for all agents

agents:
  analyst:
    system: "You are a data analyst..."
    tools: [fetch_sales_data, calculator]
  writer:
    system: "You are an executive writer..."
    tools: [save_report]

pipeline:
  - name: analyze
    agent: analyst
    input: "Analyze this year's sales..."
    output: analysis               # writes result to state["analysis"]

  - name: write
    agent: writer
    input: |                       # {analysis} is pulled from state
      Write a summary of:
      {analysis}
    output: summary

A pipeline: is sugar the loader lowers to a graph (so it runs on the same engine as a hand-authored graph:). Two leaf step kinds: an agent step (agent: + input:, model-driven) and a tool step (tool: + args:, called directly, no model). input/args are templated with the current state ({key}, {key[field]}), so each step consumes what earlier steps produced. Per-agent overrides: model, max_tokens, thinking, output_schema, retry, tool_retry, limits.

Control flow — the flow isn't strictly linear. A loop step repeats a body: until an until: condition goes truthy (or a max_iterations: cap), and a conditional step (when: + then: / else:) branches. Conditions are plain state templates judged for truthiness (""/"false"/"0"/"no"/"none"/"off" are falsy) — no expression language; the boolean comes from a tool or a structured-output agent. The refine_loop example shows a reviewer agent's {approved} verdict driving both the loop and the final branch.

Author the graph directly when a readable sequence can't express the flow — any-to-any handoff, cycles, fan-in, or letting the model choose. A graph: block declares an entry, a nodes: mapping, and an edges: list (static {from, to}, conditional {from, to, when}, or model-driven {from, route: {by, to}}). The model never owns flow by default; you opt into the authority spectrumroute (an edge whose successor an agent picks from your candidate set) or replan (an authority.replan: policy that lets a planner edit the not-yet-run subgraph from your declared vocabulary). See docs/FEATURES.md §2–§2c.

pipeline:
  - name: refine
    loop:
      max_iterations: 5            # hard cap (the runaway guard)
      until: "{review[approved]}"  # checked after each pass; truthy → stop early
      body:
        - { name: draft,  agent: writer,   input: "Revise. Last review: {review}", output: draft }
        - { name: review, agent: reviewer, input: "Review: {draft}", output: review }   # → {approved}
  - name: ship
    when: "{review[approved]}"
    then: [ { name: publish, tool: save_report, args: { content: "{draft}" } } ]
    else: [ { name: escalate, agent: editor, input: "Not yet: {review}" } ]

Durable runs & human-in-the-loop

A run can be checkpointed and resumed later — across a crash or a pause for a person. The engine checkpoints at every node boundary, so a crash resumes at the last completed node (inside loops and branches too). A human: node (a {prompt, output} mapping) pauses the run: the runtime checkpoints a marker and raises NodePaused; the host collects the answer and calls resume(...), which writes it to store[output] and continues. (A human: node needs a checkpointer — a pause is durable by definition — and is valid anywhere, including inside loops and branches.)

pipeline:
  - name: draft
    tool: make_draft
    args: { topic: "{topic}" }
    output: draft
  - name: approve            # the human gate — pauses the run here
    human:
      prompt: "Approve this draft? Reply 'approve' to publish:\n\n{draft}"
      output: decision       # resume() writes the human's answer here
  - name: ship
    when: "{decision}"       # truthiness branch on the verdict
    then: [ { name: publish, tool: publish, args: { draft: "{draft}" } } ]
    else: [ { name: shelve,  tool: shelve,  args: { draft: "{draft}" } } ]

From code — pass a run_id + a Checkpointer to run, catch the pause, and resume with the answer (here InMemoryCheckpointer, so it runs in one process):

from agentic_flow import Orchestrator, InMemoryCheckpointer, NodePaused, registry

orch = Orchestrator.from_yaml("pipeline.yaml", registry)
cp = InMemoryCheckpointer()                                   # or JsonFileCheckpointer("./runs")
try:
    orch.run({"topic": "Q3"}, run_id="R1", checkpointer=cp)   # checkpoints at every node boundary
except NodePaused as paused:
    print(paused.prompt)                                      # show the human what to answer
    answer = "approve"                                        # …collected out of band…
store = orch.resume("R1", cp, human_input=answer)             # injects + continues
print(store["result"])

From the CLI — name a backend with --checkpoint; a paused run exits 2 and prints a copy-pasteable resume command:

agentic-flow pipeline.yaml --tools my.tools --checkpoint json:./runs --run-id R1   # run → exit 2 (paused)
agentic-flow pipeline.yaml --tools my.tools --checkpoint json:./runs --run-id R1 \
    --resume --human-input "approve"                                               # resume → exit 0 (done)

Exit codes: 0 completed, 1 error, 2 paused. The answer can also come as --human-input-json (parsed JSON) or --human-input-file; on a TTY, --resume prompts for it interactively. Channel values are JSON-serializable and exactly-once delivery survives a resume. A runnable, no-key demo: python -m examples.approval.run. More in docs/FEATURES.md §16–§17 and docs/INTERNALS.md §12.

Providers (Gemini & Anthropic)

gemini (Google) is the default; anthropic (Claude) is also built in. Set a pipeline default and override per agent:

provider: gemini             # pipeline default
model: gemini-3.5-flash

agents:
  analyst:                   # uses the defaults above
    system: "..."
  auditor:
    provider: anthropic      # this agent runs on Claude
    model: claude-opus-4-8   # optional — omit to use the provider's default
    system: "..."

Agents on different providers share one Store and can delegate to each other, so one graph can mix both. Credentials come from the environment: GEMINI_API_KEY (or GOOGLE_API_KEY) / ANTHROPIC_API_KEY. Add a provider by implementing the four-method Provider interface and calling register_provider("name", Cls).

Gemini caveat: output_schema (JSON mode) and tool use don't combine on one Gemini agent — split into two agents, or run that agent on Anthropic.

Agent & pipeline features

Each is covered in depth in docs/FEATURES.md; the essentials:

Structured output — give an agent an output_schema (JSON Schema) and its answer comes back as a dict (the framework adds additionalProperties: false for you). Later steps address fields directly: {report[title]}.

Agents as tools (delegation) — list other agents under an agent's agents: key and they become tools it can call; the sub-agent runs its own loop and shares the same Store. Cycles are bounded by Agent.MAX_STEPS.

reporter:
  tools: [save_report]
  agents:
    - analyst                                           # bare name → tool "analyst"
    - { name: analyst, as: crunch, description: "..." } # or rename

Shared state & messaging — one Store (versioned channels + message log + todo board) is threaded through every agent, tool, and sub-agent. Three ways to move data: node output: keys (read via {key}), the model-driven built-ins (state_*, send_message/read_messages, the todo_* board), and a state/ctx parameter injected into any @tool (hidden from the model's schema). Messages and assigned todos use exactly-once push delivery — surfaced into the recipient's prompt without polling. A graph: agent node can also add emit: <transcript-channel> (an add_messages channel): its answer is appended as a turn {id, sender, content} a later node reads via {transcript} templating — a shared, replayable blackboard alongside the push log.

Secrets — a credential is named in YAML, never embedded. A top-level secrets: block maps a local name to a pluggable backend ({provider: env, var: X} / {provider: file, path: P} / shorthand name: VAR); the value is fetched late at point-of-use, wrapped in a masked Secret (prints Secret(****)), and never written to the Store — so it can't reach a checkpoint. Reference it as {secret: name} in an agent's api_key: or an MCP server's env:/headers: (which also accept ${VAR} / ${VAR:-default}), or read it in a tool via ctx.secrets.get(name).reveal(). env and file backends ship; a vault/cloud backend is a registry entry away. A typo'd reference or unknown backend fails at load; a missing var/file is a clear error. See docs/FEATURES.md §3b (offline demo: python -m examples.secrets_demo.run).

Retries & limits — a retry: policy (attempts + exponential backoff) at two scopes: tool retry (per-@tool(retry=…) or an agent's tool_retry:) and agent retry (retry:, defaulting to transient API errors + OutputParseError; refusals aren't retried). A limits: block caps steps, output tokens, and wall-clock — per agent and for the whole run (plus a per-node max_visits cycle bound).

⚠️ Agent retry re-runs tools already called — for side-effecting tools, prefer tool retry.

Driving it from code

from agentic_flow import Orchestrator, registry

orch = Orchestrator.from_yaml("examples/shared_state/pipeline.yaml", registry)
store = orch.run({"product": "Orbit"})    # seed initial state if you want
print(store["summary"])                    # Store is dict-like; store.log / store.todos too

Pass on_event=... to from_yaml to trace every node, tool call, and result.

Observability

on_event(event, data) is the single seam — every node_*, tool_*, retry, checkpoint, human, route, replan / replan_rejected, and messages_delivered flows through it. Three sinks ship on top:

  • Console printerconsole_tracer() (agentic_flow/console.py), the one console sink the CLI and examples/_common.py share.
  • Standard loggingevent_logger() returns an on_event hook (INFO for node boundaries, WARNING for retries/tool errors, DEBUG for per-call detail); call configure_logging() to opt in. From the CLI: --log-level DEBUG.
  • EventBus — a pub/sub fan-out that is an on_event hook, with filtered subscribers and a thread-safe real-time stream() for watching a run live:
from agentic_flow import EventBus, Orchestrator, registry

bus = EventBus()
orch = Orchestrator.from_yaml("pipeline.yaml", registry, on_event=bus)
bus.run_in_background(lambda: orch.run(initial))   # closes the bus when the run ends
for event, data in bus.stream():                   # blocks, yields events live
    if event == "tool_result":
        print(data["agent"], data["tool"], "→", data["result"])

A runnable demo: python -m examples.team_todos.run_stream. The logging/bus internals are in docs/INTERNALS.md.

Extending

  • New tools — add @tool functions in any imported module (group with group=).
  • New agents / nodes / edges — edit the YAML; no code change.
  • Delegation / structured output / retries / limits — set agents: / output_schema / retry: / limits: on an agent.
  • Shared state — the state_*/send_message tools, or a state/ctx param on your own tools.
  • New providers — implement the Provider interface and register_provider.
  • Observability — pass on_event=...; route to logging (event_logger()) or an EventBus.
  • Branching / loops — a pipeline:'s loop: / when: (lowered to a back-edge + conditional edges), or author graph: conditional/back edges directly; conditions are state templates judged for truthiness.
  • Let the model drive flow — add a route: on an edge (the model picks the next node) or an authority.replan: policy (the model rewrites the not-yet-run subgraph) — both compose only from your declared vocabulary.
  • Pause for a human / persist & resume — add a human: ({prompt, output}) node and run with run(run_id=, checkpointer=); catch NodePaused and resume(..., human_input=...). Add a backend by subclassing Checkpointer + register_checkpointer.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentlead_flow_core-0.1.0.tar.gz (394.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentlead_flow_core-0.1.0-py3-none-any.whl (126.1 kB view details)

Uploaded Python 3

File details

Details for the file agentlead_flow_core-0.1.0.tar.gz.

File metadata

  • Download URL: agentlead_flow_core-0.1.0.tar.gz
  • Upload date:
  • Size: 394.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.13

File hashes

Hashes for agentlead_flow_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 b812e1dc9591275a76d6bdf6083319c3d3603a2635246734559be9ed8ce3e047
MD5 9f975498ab3b0fb066a11366c505b987
BLAKE2b-256 a68ecd020915bd05b6f5aae1a4c6e09502a35954c39a0cebb4a1bb0fb21f719d

See more details on using hashes here.

File details

Details for the file agentlead_flow_core-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for agentlead_flow_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 996598a279cc54b36e4de99a988b19137e69fb336f10ce3b4ab0817efb02a9a0
MD5 6aeffb36a7512eb67ce074cce112c200
BLAKE2b-256 2146681c25c07ebb2d219406917364d398d708c04955b39b4d7bd58e05310eb8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page