Skip to main content

Turn any LangChain create_agent into a multi-agent workflow orchestrator (Claude Code Workflow, ported).

Project description


workflow-middleware

Claude Code's Workflow tool as a drop-in LangChain middleware.

PyPI License Python LangChain

What it isHow it worksQuick startDemoPrimitivesConfigurationCaveats

Deep Research Live — graph and phase rail built live from real agent-spawn events
Deep Research Live — the orchestrator authors a workflow at runtime; the graph and phase rail are drawn live from real agent-spawn events.


What It Is

A 1:1 port of Claude Code's Workflow tool as a LangChain middleware. Drop it onto any create_agent and the agent gains a single tool — run_workflow — that lets it write a Python orchestration script and execute it: fan out mini-agents (leaves), read their structured results, and summarise.

Like Claude Code, the orchestrator is a real agent (a full loop, not a single LLM call): it writes the script, runs it, reads the structured result, decides whether more work is needed, and summarises. The leaves are create_agent instances that inherit the orchestrator's tools and model by default — so the workflows the agent can write are specialised by the tools you give it. Same engine, different capabilities.

It is opt-in, never forced: the middleware appends discretionary guidance to the system prompt, but the model decides whether a task is worth orchestrating. On trivial tasks it just answers directly.

[!WARNING] This package executes a Python script written by the LLM. It runs in your own process by default. The runtime guard (reduced builtins + an AST guard that blocks __class__ / __subclasses__ / getattr-style dunder escapes) stops the common breakouts, but it is not OS-level isolation — it does not bound CPU/memory and is not hardened against a determined adversary. Use it with an orchestrator model you control; do not run scripts from untrusted sources without a real sandbox. See SECURITY.md.


How It Works

The model never sees @task or @entrypoint. It writes a plain script — a meta = {...} literal then async def main(): ... — using only the injected primitives. The runtime executes that script on top of the LangGraph Functional API:

flowchart TD
    A["Orchestrator (create_agent)"] -->|"writes a Python script"| B["run_workflow tool"]
    B --> C["executor: AST-validate meta\nexec with reduced builtins"]
    C --> D["@entrypoint runs main()"]
    D -->|"agent(...) calls"| E["@task-wrapped leaf"]
    E --> F["Leaf agent\n(inherits tools + model)"]
    F -->|"final text / structured object"| D
    D -->|"result + summary"| A

    style A fill:#fff,stroke:#333,color:#333
    style F fill:#2d6a4f,color:#fff
    style C fill:#c0392b,color:#fff
  1. Model writes the script. A meta dict literal (name, description, phases) then async def main() using agent / parallel / pipeline / phase / log / args / budget.
  2. Executor validates and execs it. meta is extracted via AST and ast.literal_eval (literal-only — names, calls, f-strings, comprehensions, starred-unpacking and conditional expressions are rejected). The body is exec'd with injected globals and a reduced __builtins__ (no __import__, open, eval, exec, compile, getattr, setattr, type, no os / sys), and a static AST guard rejects any dunder access (__class__ / __subclasses__ / __globals__ …) so the common sandbox-escape chains raise SecurityError before the script runs. The three obvious nondeterministic entry points (clock reader, RNG, no-arg date/datetime) are intercepted by name and raise — a best-effort guard against accidental nondeterminism that would break resume, not a sandbox (see Caveats).
  3. main() runs under the Functional API. It executes inside an @entrypoint with an in-memory checkpointer and a thread_id. Each agent() call routes through a @task, so leaf results are memoized and checkpointed — an interrupted run replays completed leaves from the checkpoint on resume.
  4. Leaves run as create_agent instances that inherit the orchestrator's tools and model. With a schema, the leaf uses response_format and returns the validated object; otherwise it returns its final text. Token usage is aggregated for the budget.

The concurrency cap is ours: an asyncio.Semaphore(min(16, cpu-2)) (configurable), plus a lifetime cap of 1000 leaf spawns per run. The semaphore lives host-side on the Runtime and never crosses the @task boundary.


Quick Start

Install

pip install workflow-middleware

# Or from source
pip install git+https://github.com/emanueleielo/workflow-middleware.git

Offline — no keys, FakeLeafBackend

You can drive the runtime directly with a deterministic offline backend — no model, no network, no API key. This is examples/basic_usage.py:

import asyncio
from workflow_middleware import FakeLeafBackend, WorkflowConfig, run_workflow_script

SCRIPT = '''\
meta = {
    "name": "demo-fanout",
    "description": "Fan out three reviewers and collect their findings",
    "phases": [{"title": "Review", "detail": "one reviewer per dimension"}],
}

async def main():
    phase("Review")
    findings = await parallel([
        (lambda d=d: agent(f"Review for {d}", label=d))
        for d in ("correctness", "security", "perf")
    ])
    return {"findings": [f for f in findings if f]}
'''

async def run() -> None:
    result, summary = await run_workflow_script(
        SCRIPT,
        args={"target": "auth.py"},
        backend=FakeLeafBackend(),
        config=WorkflowConfig(),
        thread_id="demo-1",
    )
    print("result:", result)
    print("summary:", summary)

if __name__ == "__main__":
    asyncio.run(run())

This exercises meta extraction, the parallel barrier, the error→None filter, and budget aggregation — and runs green with zero env vars.

Real model — plug-and-play middleware

Attach WorkflowMiddleware to a real create_agent. The leaves inherit the orchestrator's tools and model. This is examples/real_usage.py:

from langchain.agents import create_agent
from workflow_middleware import WorkflowMiddleware, WorkflowConfig

agent_graph = create_agent(
    "claude-sonnet-4-5",          # any model id; needs ANTHROPIC_API_KEY
    tools=[...],                  # the orchestrator's tools; leaves inherit them
    middleware=[WorkflowMiddleware(WorkflowConfig(include_patterns=True))],
)

if __name__ == "__main__":
    out = agent_graph.invoke({"messages": [
        {"role": "user", "content": "Thoroughly audit auth.py for security bugs using a workflow."}
    ]})
    print(out["messages"][-1].content)

The middleware appends the guidance, the model decides to call run_workflow, the leaves inherit tools + model, and the structured result is summarised back to you.


Live Demo — Deep Research

A full browser demo lives in examples/deep-research-live/ (screenshot above): you type a research question, a real orchestrator agent authors a workflow script, leaf agents run live DuckDuckGo searches, and a graph is drawn live from real agent-spawn events while a cited markdown report streams back. It runs in a keyless fake mode (canned script, fully offline) or a real anthropic mode (Claude + live web search).

cd examples/deep-research-live
uv venv --python 3.13 .venv && source .venv/bin/activate
uv pip install -r requirements.txt          # installs the package via -e ../..

# Keyless — animates the whole UI offline, no API key, no network:
DEEP_RESEARCH_MODE=fake python -m server.app

# Real — orchestrator authors the script, leaves do real searches (needs a key):
cp .env.example .env        # then set ANTHROPIC_API_KEY=...
python -m server.app

Then open http://127.0.0.1:8000 and ask a question. Full walkthrough and the wire protocol are in the demo's README and PROTOCOL.md.


The Primitives

The model's script sees only these. No imports, no filesystem, no @task/@entrypoint.

Primitive Signature Semantics
agent await agent(prompt, schema=None, agent_type=None, label=None) Run one leaf agent. Acquires the concurrency semaphore, checks the lifetime cap, calls the backend, aggregates tokens. Returns the validated object when schema is given, else the final text. A bare await agent(...) propagates errors.
parallel await parallel(thunks) BARRIER. Gather all thunks; a thunk that raises becomes None in its slot (filter it out).
pipeline await pipeline(items, *stages) NO barrier. Each item runs independently through every stage as stage(prev, item, index). A stage that raises drops that item to None and skips its remaining stages.
phase phase(title) Record a progress group (for the summary / streaming).
log log(message) Append a progress line.
args args The value passed verbatim to run_workflow (the orchestrator-supplied input).
budget budget Token-ceiling handle: budget.total, budget.spent(), budget.remaining(). Once spent >= total, the next agent() raises BudgetExceeded. total=None ⇒ unlimited.

Past the lifetime cap (max_agents_total, default 1000) agent() raises AgentCapExceeded. The error→None policy is applied by parallel / pipeline only — not by agent itself.


Opt-In Behaviour

Two surfaces, by design (matching Claude Code):

  • Tool description (RUN_WORKFLOW_DESCRIPTION) keeps a hard gate — use it only when explicitly opted in.
  • System-prompt guidance (WORKFLOW_GUIDANCE) is discretionary: appended to the orchestrator's prompt so the model knows the capability exists, but it chooses whether to use it.

The guidance is appended, never overwritten — the original system message is preserved with a "\n\n" separator. Toggle and tune it via WorkflowConfig:

WorkflowConfig(
    inject_workflow_guidance=True,   # False => system message untouched
    include_patterns=True,           # also append the reusable PATTERNS library
    system_prompt="...",             # override the appended text entirely
)

Run & Event Tracking

Every completed run_workflow execution is recorded on the middleware instance (keyed by thread, the same instance-level pattern AdvisorMiddleware uses — LangChain 1.2.x ignores Command(update={...}) returned from middleware, so the run summary is stored host-side rather than in graph state). After each run, the middleware appends one WorkflowEvent — the executed script's name, the number of agents_spawned, the aggregated tokens_spent, the thread_id, and whether main() returned without raising (ok) — and bumps a lifetime run counter.

Read the history back off the instance you attached:

mw = WorkflowMiddleware(WorkflowConfig())
agent_graph = create_agent("claude-sonnet-4-5", tools=[...], middleware=[mw])

agent_graph.invoke({"messages": [{"role": "user", "content": "Audit auth.py with a workflow."}]})

for event in mw.get_events():        # list[WorkflowEvent] for the current thread
    print(event["name"], event["agents_spawned"], event["tokens_spent"], event["ok"])

print("workflows run this session:", mw.get_total_runs())

get_events(thread_id=None) returns the events for a thread (defaulting to the current one); get_total_runs(thread_id=None) returns the lifetime count. WorkflowEvent and WorkflowState are exported from the package top level.


Configuration

WorkflowConfig

Parameter Type Default Description
leaf_model str | BaseChatModel | None None Model for leaf agents. None ⇒ inherit the orchestrator's model.
max_concurrency int | None None Cap on concurrent agent() calls. Nonemin(16, cpu_count()-2) resolved at runtime.
max_agents_total int 1000 Lifetime cap on total agent() spawns per run.
budget BudgetConfig BudgetConfig() Token budget controls.
inject_workflow_guidance bool True Append WORKFLOW_GUIDANCE to the system prompt in wrap_model_call.
include_patterns bool False Also append the PATTERNS library to the injected guidance.
system_prompt str | None None Override text appended instead of WORKFLOW_GUIDANCE.
leaf_prompt_addendum str | None None Override LEAF_PROMPT_ADDENDUM appended to each leaf.
sandbox SandboxConfig SandboxConfig() OS-isolation options (Phase 4 stub — see Caveats).

BudgetConfig

Parameter Type Default Description
total int | None None Lifetime token ceiling across all leaves in one run. None ⇒ unlimited.

SandboxConfig

Parameter Type Default Description
enabled bool False When True, route exec through the sandbox seam. Currently a no-op passthrough.
backend "none" | "srt" "none" Isolation backend. "srt" (Seatbelt/bubblewrap) reserved for Phase 4.

Leaf Backends

agent() calls go through a LeafBackend — a small protocol with a single async def run(...) -> LeafResult.

  • FakeLeafBackend — deterministic, offline, no keys. Echoes a stable transform of the prompt and, when schema is a pydantic model, fills required fields with deterministic placeholders so structured paths run offline. Used by the examples and tests.
  • LangChainLeafBackend — the real one. Builds a leaf via create_agent that inherits the orchestrator's tools and model, appends LEAF_PROMPT_ADDENDUM to the inherited system prompt, passes response_format=schema when given (reading result["structured_response"]), extracts result["messages"][-1].text otherwise, and aggregates usage_metadata for the budget. Optional named subagents map agent_type → a spec that inherits omitted keys.

LeafResult is a plain @dataclass (text, structured, tokens) — msgpack-safe, so it can cross the @task checkpoint boundary.


Caveats

The in-process guard stops common escapes — it is not OS isolation. The script runs with reduced builtins (getattr / setattr / type / __import__ / open / eval / exec all removed) and a static AST guard that rejects any dunder attribute or name (__class__, __bases__, __subclasses__, __globals__, __builtins__, …). Together these block the classic ().__class__.__bases__[0].__subclasses__() and getattr(type(x), "__globals__") breakout chains — a dunder reference raises SecurityError before the script runs. This is a real barrier against the one-line escapes, but it does not bound CPU/memory/recursion and is not a substitute for a process sandbox.

The OS sandbox is a documented stub. sandbox.py is a no-op passthrough today. The executor already routes its compile/exec step through sandbox.run_in_sandbox(...), so the OS-isolation boundary exists as a future seam — but no OS isolation is enforced yet; the model-written code runs in-process. The planned srt (Seatbelt/bubblewrap) isolation plus the IPC bridge is a future phase. Until that lands, keep sandbox.enabled=False and only run scripts whose orchestrator you control; with enabled=True and a non-none backend, run_in_sandbox raises NotImplementedError. See SECURITY.md.

What is real today: the model-written-script engine on the Functional API, the seven primitives, the AST meta validator, the reduced builtins + best-effort determinism guard, both leaf backends, the budget and concurrency caps, the run/event tracking on the middleware instance (see Run & event tracking), and the opt-in middleware injection. What is not: OS isolation (stub above), the live /workflows TUI (event/log streaming only), and prompt-cache economics.

The determinism guard is best-effort, not a security boundary. Resume requires that the same script + same args replays the same sequence of agent() calls, so reading the clock, drawing randomness, or constructing a no-arg date in the body is forbidden. The runtime intercepts those three obvious entry points by name and raises on them — which stops the accidental nondeterminism that would silently break resume. It is not a sandbox: a deliberately crafted script can still reach the real clock via object introspection, so do not treat the raise as a wall. Hard determinism / isolation guarantees need the OS-level sandbox (the stub above / Phase 4). Pass timestamps in via args instead and stamp results after the workflow returns. Memoization is interrupt→resume within a thread, not "re-running a finished thread skips work."


Architecture

workflow_middleware/
├── __init__.py        # Public API: WorkflowMiddleware, WorkflowConfig, run_workflow_script, ...
├── config.py          # WorkflowConfig + BudgetConfig + SandboxConfig dataclasses
├── state.py           # WorkflowState + WorkflowEvent TypedDicts
├── prompts.py         # RUN_WORKFLOW_DESCRIPTION, WORKFLOW_GUIDANCE, LEAF_PROMPT_ADDENDUM, ...
├── runtime.py         # Runtime + the injected primitives + Budget
├── executor.py        # AST meta-extraction, reduced-builtins exec, @entrypoint/@task wiring
├── leaves.py          # LeafBackend protocol, FakeLeafBackend, LangChainLeafBackend
├── sandbox.py         # Phase 4 OS-isolation seam (no-op stub today)
├── middleware.py      # WorkflowMiddleware: run_workflow tool + opt-in guidance
└── py.typed           # PEP 561 type marker

Development

# Install with dev dependencies
pip install -e ".[dev,anthropic]"

# Run tests
pytest

# Lint
ruff check workflow_middleware/

# Type check
mypy workflow_middleware/

License

MIT — Emanuele Ielo

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

workflow_middleware-0.1.0.tar.gz (1.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

workflow_middleware-0.1.0-py3-none-any.whl (47.0 kB view details)

Uploaded Python 3

File details

Details for the file workflow_middleware-0.1.0.tar.gz.

File metadata

  • Download URL: workflow_middleware-0.1.0.tar.gz
  • Upload date:
  • Size: 1.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for workflow_middleware-0.1.0.tar.gz
Algorithm Hash digest
SHA256 dbe292e5480267cb55aacc4710b39f1f6d83b1c830d2f05c9a29e39eb8e5f2ea
MD5 aa811d2fc0bee446664e38cc8c4b6b5c
BLAKE2b-256 0058d6989e51838233b22d0f3d544dff9186c3ad879865b37bea50fc524e4edb

See more details on using hashes here.

File details

Details for the file workflow_middleware-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: workflow_middleware-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 47.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for workflow_middleware-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 18fa0d46f651cc2bfc5688c43dacc46bf02826ff9e130f693b1c69e7fed88782
MD5 9429c87ee20128f43eea1e6e0299d5c4
BLAKE2b-256 4addb29abb59a06dada3a3e5ba7009ce1d4bdd5b56cc5335fe8c48863f1ebde6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page