Asyncio DAG workflow engine for AI agents
Project description
stagehand
Stagehand orchestrates multi-agent AI workflows in pure Python. Each workflow is a directed acyclic graph (DAG) of tasks. Tasks with no dependencies run in parallel; tasks with dependencies wait until their upstream tasks complete.
Installation
pip install stagehand-ai
Requires Python 3.11+.
Quickstart
import asyncio
from stagehand import WorkflowBuilder
from stagehand.adapters.executor import OllamaExecutor
async def main():
run_id = await (
WorkflowBuilder("haiku-pipeline")
.agent("writer", OllamaExecutor(), model="qwen2.5",
system_prompt="You are a haiku writer.",
tools=["write_file"])
.task("draft", agent="writer", prompt="Write a haiku about the ocean at dawn.")
.task("refine", agent="writer",
prompt="Refine this haiku:\n\n{{ tasks.draft }}",
after=["draft"])
.run()
)
print(f"Done — run {run_id}")
asyncio.run(main())
Concepts
Builder
WorkflowBuilder is the primary API. It lets you define agents and tasks in code, then run the workflow with a single await.
WorkflowBuilder(name, version="1")
.agent(agent_id, executor, *, model, system_prompt, role, tools)
.task(task_id, *, agent, prompt, after, outputs, secrets)
.state_dir(directory) # where run state is persisted (default: .stagehand/runs)
.run(inputs={}) # returns run_id
.build() returns a Workflow object without running it, useful if you want to pass it to a Scheduler directly.
Agents
An agent is a named AI persona with an executor, a model, and a system prompt.
.agent(
"writer",
OllamaExecutor(),
model="qwen2.5",
system_prompt="You are a concise technical writer.",
tools=["write_file", "read_file"],
)
Different agents in the same workflow can use different executors:
WorkflowBuilder("pipeline")
.agent("drafter", OllamaExecutor(), model="qwen2.5")
.agent("reviewer", ClaudeExecutor(api_key="..."), model="claude-opus-4-5")
Tasks
A task is a single unit of work in the DAG. There are two kinds:
Agent task — runs a prompt against an AI agent:
.task("draft", agent="writer", prompt="Write a short intro to Python.")
.task("review", agent="reviewer",
prompt="Review this draft:\n\n{{ tasks.draft }}",
after=["draft"])
Deterministic task — runs a plain Python callable (sync or async). No agent or prompt needed:
async def fetch_tickets(ctx):
return await linear_client.get_tickets() # returns str or TaskResult
.task("fetch", fn=fetch_tickets)
.task("analyze", agent="analyst",
prompt="Tickets:\n\n{{ tasks.fetch }}\n\nSummarise.",
after=["fetch"])
The callable receives a RunContext (access to inputs and previous task results) and must return a str or TaskResult.
| Parameter | Description |
|---|---|
agent |
ID of the agent that runs this task (required unless fn is set) |
prompt |
Message sent to the agent (supports {{ }} expressions) (required unless fn is set) |
fn |
Python callable to run directly (required unless agent/prompt are set) |
after |
List of task IDs this task waits for |
outputs |
StaticOutputs, DynamicOutputs, or PatternOutputs |
secrets |
List of secret names to resolve at runtime |
retry |
RetryPolicy — how many times to retry on failure |
Tasks with no after (or whose dependencies are all complete) start immediately. Multiple ready tasks run in parallel.
Retry
Pass a RetryPolicy to a task to retry it automatically on failure. Downstream tasks are only cancelled once all attempts are exhausted.
from stagehand import RetryPolicy
.task(
"fetch",
agent="worker",
prompt="Fetch the latest report.",
retry=RetryPolicy(max_attempts=3, delay=2.0),
)
| Parameter | Default | Description |
|---|---|---|
max_attempts |
1 |
Total attempts including the first (1 = no retry) |
delay |
0.0 |
Seconds to wait between attempts |
Template expressions
Prompts support {{ }} expressions to inject values from previous tasks or runtime inputs.
| Expression | Resolves to |
|---|---|
{{ input.key }} |
A value passed via inputs={"key": "..."} |
{{ tasks.id }} |
The text output of a completed task |
{{ tasks.id.files }} |
Newline-separated list of file paths produced by a task |
{{ tasks.id.filename_md }} |
Path of a specific file, identified by its slug (filename.md → filename_md) |
Outputs
The outputs parameter of .task() declares what files a task produces.
from stagehand import StaticOutputs, DynamicOutputs, PatternOutputs
# Exact file names known upfront
.task("t1", agent="a", prompt="...", outputs=StaticOutputs(["report.md"]))
# Agent decides at runtime (default)
.task("t2", agent="a", prompt="...", outputs=DynamicOutputs())
# Collect by glob after the task finishes
.task("t3", agent="a", prompt="...", outputs=PatternOutputs(pattern="**/*.md"))
Executors
An executor is the AI backend that drives a task. Stagehand ships two:
OllamaExecutor
Runs models locally via Ollama. No API key required.
from stagehand.adapters.executor import OllamaExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage
executor = OllamaExecutor(
host="http://localhost:11434", # default
storage=FilesystemStorage("./output"),
)
ollama pull qwen2.5
ollama serve
Models with reliable tool use: qwen2.5, llama3.1, llama3.2, mistral-nemo.
ClaudeExecutor
Uses the Anthropic Messages API.
from stagehand.adapters.executor import ClaudeExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage
executor = ClaudeExecutor(
api_key="sk-ant-...", # or set ANTHROPIC_API_KEY
storage=FilesystemStorage("./output"),
)
The default model is claude-opus-4-5.
Custom tools
Pass extra tools to ClaudeExecutor via extra_tools:
from stagehand import ToolDefinition
from stagehand.adapters.executor import ClaudeExecutor
my_tool = ToolDefinition(
name="fetch_ticket",
description="Fetch a Linear ticket by ID.",
input_schema={"type": "object", "properties": {"id": {"type": "string"}}, "required": ["id"]},
handler=lambda args: fetch_from_linear(args["id"]),
)
executor = ClaudeExecutor(api_key="...", extra_tools=[my_tool])
Custom executor
Implement AgentExecutor to add any backend:
from stagehand.ports.executor import AgentExecutor, ExecutionRequest, ExecutionResult
class MyExecutor(AgentExecutor):
async def execute(self, request: ExecutionRequest) -> ExecutionResult:
output = call_my_model(request.prompt, request.system_prompt)
return ExecutionResult(output=output)
Resume
A failed or interrupted run can be resumed. Completed tasks are skipped.
from stagehand import Scheduler
scheduler = Scheduler(run_state_directory=".stagehand/runs")
workflow = builder.build()
run_id = await scheduler.run(workflow)
# ... if it fails or you want to retry:
await scheduler.resume(run_id, workflow)
Examples
Sequential
run_id = await (
WorkflowBuilder("sequential")
.agent("writer", OllamaExecutor(), model="qwen2.5",
system_prompt="You write haiku.", tools=["write_file"])
.task("draft", agent="writer", prompt="Write a haiku about the ocean. Save to draft.md.")
.task("refine", agent="writer",
prompt="Refine this:\n\n{{ tasks.draft }}\n\nSave to final.md.",
after=["draft"])
.run()
)
draft → refine
Parallel
run_id = await (
WorkflowBuilder("parallel")
.agent("writer", OllamaExecutor(), model="qwen2.5",
system_prompt="You write clearly.", tools=["write_file"])
.task("pros", agent="writer", prompt="Write pros of remote work. Save to pros.md.")
.task("cons", agent="writer", prompt="Write cons of remote work. Save to cons.md.")
.task("summary", agent="writer",
prompt="Combine:\n\nPROS:\n{{ tasks.pros }}\n\nCONS:\n{{ tasks.cons }}\n\nSave to summary.md.",
after=["pros", "cons"])
.run()
)
pros ─┐
├→ summary
cons ─┘
Architecture
Stagehand uses a ports-and-adapters (hexagonal) architecture. The dependency rule is strict:
core/ → nothing external (stdlib only)
ports/ → nothing (interfaces only)
adapters/ → ports/ only
builder → core/ + ports/
| Package | Responsibility |
|---|---|
stagehand/core/ |
Domain types, DAG, scheduler, run state, template engine |
stagehand/ports/ |
ABCs: AgentExecutor, ArtifactStorage, SecretProvider |
stagehand/adapters/executor/ |
ClaudeExecutor, OllamaExecutor |
stagehand/adapters/storage/ |
FilesystemStorage |
stagehand/adapters/secrets/ |
EnvSecretProvider |
stagehand/builder.py |
WorkflowBuilder — primary public API |
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stagehand_ai-0.2.0.tar.gz.
File metadata
- Download URL: stagehand_ai-0.2.0.tar.gz
- Upload date:
- Size: 29.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8bea5a4e61448657eb5985bef60c9cafb93151eee25ce4e195c2600590b5d2c8
|
|
| MD5 |
a416410048cfe1af3eac8351cb0b57db
|
|
| BLAKE2b-256 |
0bbbf10f79fd4552ce91de8ef9dd3b3d19bb50b12d7a8e1909731f510cac81cf
|
Provenance
The following attestation bundles were made for stagehand_ai-0.2.0.tar.gz:
Publisher:
publish.yml on janmarkuslanger/stagehand
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stagehand_ai-0.2.0.tar.gz -
Subject digest:
8bea5a4e61448657eb5985bef60c9cafb93151eee25ce4e195c2600590b5d2c8 - Sigstore transparency entry: 1581363494
- Sigstore integration time:
-
Permalink:
janmarkuslanger/stagehand@38ffcad8b7184df924054bbfc06ce2a375f35a84 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/janmarkuslanger
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@38ffcad8b7184df924054bbfc06ce2a375f35a84 -
Trigger Event:
release
-
Statement type:
File details
Details for the file stagehand_ai-0.2.0-py3-none-any.whl.
File metadata
- Download URL: stagehand_ai-0.2.0-py3-none-any.whl
- Upload date:
- Size: 26.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f3c1db94683b5a24882d9de7d5dcbcfc544dfd4874883a039249e5d0901eb20
|
|
| MD5 |
a9bf966b00ee16f6e6ab905f324d831e
|
|
| BLAKE2b-256 |
275e1d706bd06f3caa3bf483d99edbf2b822098b9ce852022cab56b9149e33ca
|
Provenance
The following attestation bundles were made for stagehand_ai-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on janmarkuslanger/stagehand
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stagehand_ai-0.2.0-py3-none-any.whl -
Subject digest:
2f3c1db94683b5a24882d9de7d5dcbcfc544dfd4874883a039249e5d0901eb20 - Sigstore transparency entry: 1581363637
- Sigstore integration time:
-
Permalink:
janmarkuslanger/stagehand@38ffcad8b7184df924054bbfc06ce2a375f35a84 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/janmarkuslanger
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@38ffcad8b7184df924054bbfc06ce2a375f35a84 -
Trigger Event:
release
-
Statement type: