Turn any LangChain create_agent into a multi-agent workflow orchestrator (Claude Code Workflow, ported).
Project description
workflow-middleware
Claude Code's Workflow tool as a drop-in LangChain middleware.
What it is • How it works • Quick start • Demo • Primitives • Configuration • Caveats
Deep Research Live — the orchestrator authors a workflow at runtime; the graph and phase rail are drawn live from real agent-spawn events.
What It Is
A 1:1 port of Claude Code's Workflow tool as a LangChain middleware. Drop it onto any create_agent and the agent gains a single tool — run_workflow — that lets it write a Python orchestration script and execute it: fan out mini-agents (leaves), read their structured results, and summarise.
Like Claude Code, the orchestrator is a real agent (a full loop, not a single LLM call): it writes the script, runs it, reads the structured result, decides whether more work is needed, and summarises. The leaves are create_agent instances that inherit the orchestrator's tools and model by default — so the workflows the agent can write are specialised by the tools you give it. Same engine, different capabilities.
It is opt-in, never forced: the middleware appends discretionary guidance to the system prompt, but the model decides whether a task is worth orchestrating. On trivial tasks it just answers directly.
[!WARNING] This package executes a Python script written by the LLM. It runs in your own process by default. The runtime guard (reduced builtins + an AST guard that blocks
__class__/__subclasses__/getattr-style dunder escapes) stops the common breakouts, but it is not OS-level isolation — it does not bound CPU/memory and is not hardened against a determined adversary. Use it with an orchestrator model you control; do not run scripts from untrusted sources without a real sandbox. See SECURITY.md.
How It Works
The model never sees @task or @entrypoint. It writes a plain script — a meta = {...} literal then async def main(): ... — using only the injected primitives. The runtime executes that script on top of the LangGraph Functional API:
flowchart TD
A["Orchestrator (create_agent)"] -->|"writes a Python script"| B["run_workflow tool"]
B --> C["executor: AST-validate meta\nexec with reduced builtins"]
C --> D["@entrypoint runs main()"]
D -->|"agent(...) calls"| E["@task-wrapped leaf"]
E --> F["Leaf agent\n(inherits tools + model)"]
F -->|"final text / structured object"| D
D -->|"result + summary"| A
style A fill:#fff,stroke:#333,color:#333
style F fill:#2d6a4f,color:#fff
style C fill:#c0392b,color:#fff
- Model writes the script. A
metadict literal (name, description, phases) thenasync def main()usingagent / parallel / pipeline / phase / log / args / budget. - Executor validates and execs it.
metais extracted via AST andast.literal_eval(literal-only — names, calls, f-strings, comprehensions, starred-unpacking and conditional expressions are rejected). The body isexec'd with injected globals and a reduced__builtins__(no__import__,open,eval,exec,compile,getattr,setattr,type, noos/sys), and a static AST guard rejects any dunder access (__class__/__subclasses__/__globals__…) so the common sandbox-escape chains raiseSecurityErrorbefore the script runs. The three obvious nondeterministic entry points (clock reader, RNG, no-arg date/datetime) are intercepted by name and raise — a best-effort guard against accidental nondeterminism that would break resume, not a sandbox (see Caveats). main()runs under the Functional API. It executes inside an@entrypointwith an in-memory checkpointer and athread_id. Eachagent()call routes through a@task, so leaf results are memoized and checkpointed — an interrupted run replays completed leaves from the checkpoint on resume.- Leaves run as
create_agentinstances that inherit the orchestrator's tools and model. With aschema, the leaf usesresponse_formatand returns the validated object; otherwise it returns its final text. Token usage is aggregated for the budget.
The concurrency cap is ours: an asyncio.Semaphore(min(16, cpu-2)) (configurable), plus a lifetime cap of 1000 leaf spawns per run. The semaphore lives host-side on the Runtime and never crosses the @task boundary.
Quick Start
Install
pip install workflow-middleware
# Or from source
pip install git+https://github.com/emanueleielo/workflow-middleware.git
Offline — no keys, FakeLeafBackend
You can drive the runtime directly with a deterministic offline backend — no model, no network, no API key. This is examples/basic_usage.py:
import asyncio
from workflow_middleware import FakeLeafBackend, WorkflowConfig, run_workflow_script
SCRIPT = '''\
meta = {
"name": "demo-fanout",
"description": "Fan out three reviewers and collect their findings",
"phases": [{"title": "Review", "detail": "one reviewer per dimension"}],
}
async def main():
phase("Review")
findings = await parallel([
(lambda d=d: agent(f"Review for {d}", label=d))
for d in ("correctness", "security", "perf")
])
return {"findings": [f for f in findings if f]}
'''
async def run() -> None:
result, summary = await run_workflow_script(
SCRIPT,
args={"target": "auth.py"},
backend=FakeLeafBackend(),
config=WorkflowConfig(),
thread_id="demo-1",
)
print("result:", result)
print("summary:", summary)
if __name__ == "__main__":
asyncio.run(run())
This exercises meta extraction, the parallel barrier, the error→None filter, and budget aggregation — and runs green with zero env vars.
Real model — plug-and-play middleware
Attach WorkflowMiddleware to a real create_agent. The leaves inherit the orchestrator's tools and model. This is examples/real_usage.py:
from langchain.agents import create_agent
from workflow_middleware import WorkflowMiddleware, WorkflowConfig
agent_graph = create_agent(
"claude-sonnet-4-5", # any model id; needs ANTHROPIC_API_KEY
tools=[...], # the orchestrator's tools; leaves inherit them
middleware=[WorkflowMiddleware(WorkflowConfig(include_patterns=True))],
)
if __name__ == "__main__":
out = agent_graph.invoke({"messages": [
{"role": "user", "content": "Thoroughly audit auth.py for security bugs using a workflow."}
]})
print(out["messages"][-1].content)
The middleware appends the guidance, the model decides to call run_workflow, the leaves inherit tools + model, and the structured result is summarised back to you.
Live Demo — Deep Research
A full browser demo lives in examples/deep-research-live/ (screenshot above): you type a research question, a real orchestrator agent authors a workflow script, leaf agents run live DuckDuckGo searches, and a graph is drawn live from real agent-spawn events while a cited markdown report streams back. It runs in a keyless fake mode (canned script, fully offline) or a real anthropic mode (Claude + live web search).
cd examples/deep-research-live
uv venv --python 3.13 .venv && source .venv/bin/activate
uv pip install -r requirements.txt # installs the package via -e ../..
# Keyless — animates the whole UI offline, no API key, no network:
DEEP_RESEARCH_MODE=fake python -m server.app
# Real — orchestrator authors the script, leaves do real searches (needs a key):
cp .env.example .env # then set ANTHROPIC_API_KEY=...
python -m server.app
Then open http://127.0.0.1:8000 and ask a question. Full walkthrough and the wire protocol are in the demo's README and PROTOCOL.md.
The Primitives
The model's script sees only these. No imports, no filesystem, no @task/@entrypoint.
| Primitive | Signature | Semantics |
|---|---|---|
agent |
await agent(prompt, schema=None, agent_type=None, label=None) |
Run one leaf agent. Acquires the concurrency semaphore, checks the lifetime cap, calls the backend, aggregates tokens. Returns the validated object when schema is given, else the final text. A bare await agent(...) propagates errors. |
parallel |
await parallel(thunks) |
BARRIER. Gather all thunks; a thunk that raises becomes None in its slot (filter it out). |
pipeline |
await pipeline(items, *stages) |
NO barrier. Each item runs independently through every stage as stage(prev, item, index). A stage that raises drops that item to None and skips its remaining stages. |
phase |
phase(title) |
Record a progress group (for the summary / streaming). |
log |
log(message) |
Append a progress line. |
args |
args |
The value passed verbatim to run_workflow (the orchestrator-supplied input). |
budget |
budget |
Token-ceiling handle: budget.total, budget.spent(), budget.remaining(). Once spent >= total, the next agent() raises BudgetExceeded. total=None ⇒ unlimited. |
Past the lifetime cap (max_agents_total, default 1000) agent() raises AgentCapExceeded. The error→None policy is applied by parallel / pipeline only — not by agent itself.
Opt-In Behaviour
Two surfaces, by design (matching Claude Code):
- Tool description (
RUN_WORKFLOW_DESCRIPTION) keeps a hard gate — use it only when explicitly opted in. - System-prompt guidance (
WORKFLOW_GUIDANCE) is discretionary: appended to the orchestrator's prompt so the model knows the capability exists, but it chooses whether to use it.
The guidance is appended, never overwritten — the original system message is preserved with a "\n\n" separator. Toggle and tune it via WorkflowConfig:
WorkflowConfig(
inject_workflow_guidance=True, # False => system message untouched
include_patterns=True, # also append the reusable PATTERNS library
system_prompt="...", # override the appended text entirely
)
Run & Event Tracking
Every completed run_workflow execution is recorded on the middleware instance (keyed by thread, the same instance-level pattern AdvisorMiddleware uses — LangChain 1.2.x ignores Command(update={...}) returned from middleware, so the run summary is stored host-side rather than in graph state). After each run, the middleware appends one WorkflowEvent — the executed script's name, the number of agents_spawned, the aggregated tokens_spent, the thread_id, and whether main() returned without raising (ok) — and bumps a lifetime run counter.
Read the history back off the instance you attached:
mw = WorkflowMiddleware(WorkflowConfig())
agent_graph = create_agent("claude-sonnet-4-5", tools=[...], middleware=[mw])
agent_graph.invoke({"messages": [{"role": "user", "content": "Audit auth.py with a workflow."}]})
for event in mw.get_events(): # list[WorkflowEvent] for the current thread
print(event["name"], event["agents_spawned"], event["tokens_spent"], event["ok"])
print("workflows run this session:", mw.get_total_runs())
get_events(thread_id=None) returns the events for a thread (defaulting to the current one); get_total_runs(thread_id=None) returns the lifetime count. WorkflowEvent and WorkflowState are exported from the package top level.
Configuration
WorkflowConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
leaf_model |
str | BaseChatModel | None |
None |
Model for leaf agents. None ⇒ inherit the orchestrator's model. |
max_concurrency |
int | None |
None |
Cap on concurrent agent() calls. None ⇒ min(16, cpu_count()-2) resolved at runtime. |
max_agents_total |
int |
1000 |
Lifetime cap on total agent() spawns per run. |
budget |
BudgetConfig |
BudgetConfig() |
Token budget controls. |
inject_workflow_guidance |
bool |
True |
Append WORKFLOW_GUIDANCE to the system prompt in wrap_model_call. |
include_patterns |
bool |
False |
Also append the PATTERNS library to the injected guidance. |
system_prompt |
str | None |
None |
Override text appended instead of WORKFLOW_GUIDANCE. |
leaf_prompt_addendum |
str | None |
None |
Override LEAF_PROMPT_ADDENDUM appended to each leaf. |
sandbox |
SandboxConfig |
SandboxConfig() |
OS-isolation options (Phase 4 stub — see Caveats). |
BudgetConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
total |
int | None |
None |
Lifetime token ceiling across all leaves in one run. None ⇒ unlimited. |
SandboxConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
enabled |
bool |
False |
When True, route exec through the sandbox seam. Currently a no-op passthrough. |
backend |
"none" | "srt" |
"none" |
Isolation backend. "srt" (Seatbelt/bubblewrap) reserved for Phase 4. |
Leaf Backends
agent() calls go through a LeafBackend — a small protocol with a single async def run(...) -> LeafResult.
FakeLeafBackend— deterministic, offline, no keys. Echoes a stable transform of the prompt and, whenschemais a pydantic model, fills required fields with deterministic placeholders so structured paths run offline. Used by the examples and tests.LangChainLeafBackend— the real one. Builds a leaf viacreate_agentthat inherits the orchestrator's tools and model, appendsLEAF_PROMPT_ADDENDUMto the inherited system prompt, passesresponse_format=schemawhen given (readingresult["structured_response"]), extractsresult["messages"][-1].textotherwise, and aggregatesusage_metadatafor the budget. Optional named subagents mapagent_type→ a spec that inherits omitted keys.
LeafResult is a plain @dataclass (text, structured, tokens) — msgpack-safe, so it can cross the @task checkpoint boundary.
Caveats
The in-process guard stops common escapes — it is not OS isolation. The script runs with reduced builtins (getattr / setattr / type / __import__ / open / eval / exec all removed) and a static AST guard that rejects any dunder attribute or name (__class__, __bases__, __subclasses__, __globals__, __builtins__, …). Together these block the classic ().__class__.__bases__[0].__subclasses__() and getattr(type(x), "__globals__") breakout chains — a dunder reference raises SecurityError before the script runs. This is a real barrier against the one-line escapes, but it does not bound CPU/memory/recursion and is not a substitute for a process sandbox.
The OS sandbox is a documented stub. sandbox.py is a no-op passthrough today. The executor already routes its compile/exec step through sandbox.run_in_sandbox(...), so the OS-isolation boundary exists as a future seam — but no OS isolation is enforced yet; the model-written code runs in-process. The planned srt (Seatbelt/bubblewrap) isolation plus the IPC bridge is a future phase. Until that lands, keep sandbox.enabled=False and only run scripts whose orchestrator you control; with enabled=True and a non-none backend, run_in_sandbox raises NotImplementedError. See SECURITY.md.
What is real today: the model-written-script engine on the Functional API, the seven primitives, the AST meta validator, the reduced builtins + best-effort determinism guard, both leaf backends, the budget and concurrency caps, the run/event tracking on the middleware instance (see Run & event tracking), and the opt-in middleware injection. What is not: OS isolation (stub above), the live /workflows TUI (event/log streaming only), and prompt-cache economics.
The determinism guard is best-effort, not a security boundary. Resume requires that the same script + same args replays the same sequence of agent() calls, so reading the clock, drawing randomness, or constructing a no-arg date in the body is forbidden. The runtime intercepts those three obvious entry points by name and raises on them — which stops the accidental nondeterminism that would silently break resume. It is not a sandbox: a deliberately crafted script can still reach the real clock via object introspection, so do not treat the raise as a wall. Hard determinism / isolation guarantees need the OS-level sandbox (the stub above / Phase 4). Pass timestamps in via args instead and stamp results after the workflow returns. Memoization is interrupt→resume within a thread, not "re-running a finished thread skips work."
Architecture
workflow_middleware/
├── __init__.py # Public API: WorkflowMiddleware, WorkflowConfig, run_workflow_script, ...
├── config.py # WorkflowConfig + BudgetConfig + SandboxConfig dataclasses
├── state.py # WorkflowState + WorkflowEvent TypedDicts
├── prompts.py # RUN_WORKFLOW_DESCRIPTION, WORKFLOW_GUIDANCE, LEAF_PROMPT_ADDENDUM, ...
├── runtime.py # Runtime + the injected primitives + Budget
├── executor.py # AST meta-extraction, reduced-builtins exec, @entrypoint/@task wiring
├── leaves.py # LeafBackend protocol, FakeLeafBackend, LangChainLeafBackend
├── sandbox.py # Phase 4 OS-isolation seam (no-op stub today)
├── middleware.py # WorkflowMiddleware: run_workflow tool + opt-in guidance
└── py.typed # PEP 561 type marker
Development
# Install with dev dependencies
pip install -e ".[dev,anthropic]"
# Run tests
pytest
# Lint
ruff check workflow_middleware/
# Type check
mypy workflow_middleware/
License
MIT — Emanuele Ielo
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file workflow_middleware-0.1.0.tar.gz.
File metadata
- Download URL: workflow_middleware-0.1.0.tar.gz
- Upload date:
- Size: 1.4 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dbe292e5480267cb55aacc4710b39f1f6d83b1c830d2f05c9a29e39eb8e5f2ea
|
|
| MD5 |
aa811d2fc0bee446664e38cc8c4b6b5c
|
|
| BLAKE2b-256 |
0058d6989e51838233b22d0f3d544dff9186c3ad879865b37bea50fc524e4edb
|
File details
Details for the file workflow_middleware-0.1.0-py3-none-any.whl.
File metadata
- Download URL: workflow_middleware-0.1.0-py3-none-any.whl
- Upload date:
- Size: 47.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
18fa0d46f651cc2bfc5688c43dacc46bf02826ff9e130f693b1c69e7fed88782
|
|
| MD5 |
9429c87ee20128f43eea1e6e0299d5c4
|
|
| BLAKE2b-256 |
4addb29abb59a06dada3a3e5ba7009ce1d4bdd5b56cc5335fe8c48863f1ebde6
|