Skip to main content

Asyncio DAG workflow engine for AI agents

Project description

stagehand

Stagehand orchestrates multi-agent AI workflows in pure Python. Each workflow is a directed acyclic graph (DAG) of tasks. Tasks with no dependencies run in parallel; tasks with dependencies wait until their upstream tasks complete.


Installation

pip install stagehand-ai

Requires Python 3.11+.


Quickstart

import asyncio
from stagehand import WorkflowBuilder
from stagehand.adapters.executor import OllamaExecutor

async def main():
    run_id = await (
        WorkflowBuilder("haiku-pipeline")
        .agent("writer", OllamaExecutor(), model="qwen2.5",
               system_prompt="You are a haiku writer.",
               tools=["write_file"])
        .task("draft",  agent="writer", prompt="Write a haiku about the ocean at dawn.")
        .task("refine", agent="writer",
              prompt="Refine this haiku:\n\n{{ tasks.draft }}",
              after=["draft"])
        .run()
    )
    print(f"Done — run {run_id}")

asyncio.run(main())

Concepts

Builder

WorkflowBuilder is the primary API. It lets you define agents and tasks in code, then run the workflow with a single await.

WorkflowBuilder(name, version="1")
  .agent(agent_id, executor, *, model, system_prompt, role, tools)
  .task(task_id, *, agent, prompt, fn, after, outputs, secrets, retry, timeout)
  .state_dir(directory)   # where run state is persisted (default: .stagehand/runs)
  .run(inputs={})         # returns run_id

.build() returns a Workflow object without running it, useful if you want to pass it to a Scheduler directly.


Agents

An agent is a named AI persona with an executor, a model, and a system prompt.

.agent(
    "writer",
    OllamaExecutor(),
    model="qwen2.5",
    system_prompt="You are a concise technical writer.",
    tools=["write_file", "read_file"],
)

Different agents in the same workflow can use different executors:

WorkflowBuilder("pipeline")
.agent("drafter", OllamaExecutor(), model="qwen2.5")
.agent("reviewer", ClaudeExecutor(api_key="..."), model="claude-opus-4-5")

Tasks

A task is a single unit of work in the DAG. There are two kinds:

Agent task — runs a prompt against an AI agent:

.task("draft",  agent="writer", prompt="Write a short intro to Python.")
.task("review", agent="reviewer",
      prompt="Review this draft:\n\n{{ tasks.draft }}",
      after=["draft"])

Deterministic task — runs a plain Python callable (sync or async). No agent or prompt needed:

async def fetch_tickets(ctx):
    return await linear_client.get_tickets()  # returns str or TaskResult

.task("fetch", fn=fetch_tickets)
.task("analyze", agent="analyst",
      prompt="Tickets:\n\n{{ tasks.fetch }}\n\nSummarise.",
      after=["fetch"])

The callable receives a RunContext (access to inputs and previous task results) and must return a str or TaskResult.

Parameter Description
agent ID of the agent that runs this task (required unless fn is set)
prompt Message sent to the agent (supports {{ }} expressions) (required unless fn is set)
fn Python callable to run directly (required unless agent/prompt are set)
after List of task IDs this task waits for
outputs StaticOutputs, DynamicOutputs, or PatternOutputs
secrets List of secret names to resolve at runtime
retry RetryPolicy — how many times to retry on failure
timeout Seconds before a single attempt is cancelled. None = no limit. Sync fn tasks cannot be interrupted.

Tasks with no after (or whose dependencies are all complete) start immediately. Multiple ready tasks run in parallel.


Retry

Pass a RetryPolicy to a task to retry it automatically on failure. Downstream tasks are only cancelled once all attempts are exhausted.

from stagehand import RetryPolicy

.task(
    "fetch",
    agent="worker",
    prompt="Fetch the latest report.",
    retry=RetryPolicy(max_attempts=3, delay=2.0),
)
Parameter Default Description
max_attempts 1 Total attempts including the first (1 = no retry)
delay 0.0 Seconds to wait between attempts

Logging

By default the scheduler is silent. Pass a Logger to see workflow and task lifecycle events.

import logging
from stagehand import WorkflowBuilder, StdlibLogger

logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")

run_id = await (
    WorkflowBuilder("my-pipeline")
    .agent("worker", executor, model="claude-opus-4-5")
    .task("analyse", agent="worker", prompt="...")
    .logger(StdlibLogger())   # <-- add this
    .run()
)

StdlibLogger wraps Python's stdlib logging module and, by default, raises httpx and httpcore to WARNING so their per-request INFO lines don't drown out workflow events.

INFO  workflow 'my-pipeline' started [run=sh-20260520-a1b2]
INFO  task 'analyse' starting
INFO  task 'analyse' done
INFO  workflow 'my-pipeline' finished [run=sh-20260520-a1b2]
Parameter Default Description
name "stagehand" Logger name used with logging.getLogger
suppress_http_logs True Raises httpx/httpcore to WARNING to suppress per-request noise

To silence everything, omit .logger() (the default) or pass NullLogger() explicitly.

When using Scheduler directly, pass the logger to its constructor:

from stagehand import Scheduler, StdlibLogger

scheduler = Scheduler(run_state_directory=".stagehand/runs", logger=StdlibLogger())

To cap the number of tasks running at the same time, pass max_concurrency:

scheduler = Scheduler(run_state_directory=".stagehand/runs", max_concurrency=4)

None (the default) means unlimited — all ready tasks start immediately. Setting it to 1 makes the scheduler execute one task at a time, regardless of the DAG structure.

To write a custom logger — for example to route events to a structured sink — implement the Logger port:

from stagehand import Logger

class MyLogger(Logger):
    def debug(self, message: str) -> None: ...
    def info(self, message: str) -> None: ...
    def warning(self, message: str) -> None: ...
    def error(self, message: str) -> None: ...

Template expressions

Prompts support {{ }} expressions to inject values from previous tasks or runtime inputs.

Expression Resolves to
{{ input.key }} A value passed via inputs={"key": "..."}
{{ tasks.id }} The text output of a completed task
{{ tasks.id.files }} Newline-separated list of file paths produced by a task
{{ tasks.id.filename_md }} Path of a specific file, identified by its slug (filename.mdfilename_md)

Outputs

The outputs parameter of .task() declares what files a task produces.

from stagehand import StaticOutputs, DynamicOutputs, PatternOutputs

# Exact file names known upfront
.task("t1", agent="a", prompt="...", outputs=StaticOutputs(["report.md"]))

# Agent decides at runtime (default)
.task("t2", agent="a", prompt="...", outputs=DynamicOutputs())

# Collect by glob after the task finishes
.task("t3", agent="a", prompt="...", outputs=PatternOutputs(pattern="**/*.md"))

Executors

An executor is the AI backend that drives a task. Stagehand ships two:

OllamaExecutor

Runs models locally via Ollama. No API key required.

from stagehand.adapters.executor import OllamaExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage

executor = OllamaExecutor(
    host="http://localhost:11434",   # default
    storage=FilesystemStorage("./output"),
)
ollama pull qwen2.5
ollama serve

Models with reliable tool use: qwen2.5, llama3.1, llama3.2, mistral-nemo.

ClaudeExecutor

Uses the Anthropic Messages API.

from stagehand.adapters.executor import ClaudeExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage

executor = ClaudeExecutor(
    api_key="sk-ant-...",            # or set ANTHROPIC_API_KEY
    storage=FilesystemStorage("./output"),
)

The default model is claude-opus-4-5.

Rate-limit retries

ClaudeExecutor automatically retries messages.create calls that receive a 429 response. Only the individual API call is retried — not the entire agentic loop — so a rate-limit mid-task retries only the current step.

Parameter Default Description
rate_limit_retries 3 Maximum number of attempts per messages.create call (1 = no retries).
rate_limit_delay 60.0 Seconds to wait between attempts.
logger NullLogger() A Logger instance (e.g. StdlibLogger()) to receive retry warnings.
from stagehand.adapters.logger import StdlibLogger

executor = ClaudeExecutor(
    api_key="sk-ant-...",
    rate_limit_retries=5,
    rate_limit_delay=30.0,
    logger=StdlibLogger(),
)

Custom tools

Pass extra tools to ClaudeExecutor via extra_tools:

from stagehand import ToolDefinition
from stagehand.adapters.executor import ClaudeExecutor

my_tool = ToolDefinition(
    name="fetch_ticket",
    description="Fetch a Linear ticket by ID.",
    input_schema={"type": "object", "properties": {"id": {"type": "string"}}, "required": ["id"]},
    handler=lambda args: fetch_from_linear(args["id"]),
)

executor = ClaudeExecutor(api_key="...", extra_tools=[my_tool])

Artifact path validation

FilesystemStorage always rejects paths that contain .. components to prevent path traversal attacks.

Optionally restrict which file extensions are allowed by passing allowed_extensions:

from stagehand import FilesystemStorage

# Only .txt and .md files may be written
storage = FilesystemStorage("./output", allowed_extensions=[".txt", ".md"])

Extension matching is case-insensitive. Any path that violates a constraint raises ValueError before any I/O is performed. Implement validate_path on a custom ArtifactStorage subclass to apply your own rules:

from stagehand import ArtifactStorage

class MyStorage(ArtifactStorage):
    def validate_path(self, path: str) -> None:
        if not path.startswith("safe/"):
            raise ValueError(f"path must be inside safe/: {path!r}")  # raise to block the write
    ...

Custom executor

Implement AgentExecutor to add any backend:

from stagehand.ports.executor import AgentExecutor, ExecutionRequest, ExecutionResult

class MyExecutor(AgentExecutor):
    async def execute(self, request: ExecutionRequest) -> ExecutionResult:
        output = call_my_model(request.prompt, request.system_prompt)
        return ExecutionResult(output=output)

Resume

A failed or interrupted run can be resumed. Completed tasks are skipped.

from stagehand import Scheduler

scheduler = Scheduler(run_state_directory=".stagehand/runs")
workflow = builder.build()

run_id = await scheduler.run(workflow)
# ... if it fails or you want to retry:
await scheduler.resume(run_id, workflow)

Examples

Sequential

run_id = await (
    WorkflowBuilder("sequential")
    .agent("writer", OllamaExecutor(), model="qwen2.5",
           system_prompt="You write haiku.", tools=["write_file"])
    .task("draft",  agent="writer", prompt="Write a haiku about the ocean. Save to draft.md.")
    .task("refine", agent="writer",
          prompt="Refine this:\n\n{{ tasks.draft }}\n\nSave to final.md.",
          after=["draft"])
    .run()
)
draft  →  refine

Parallel

run_id = await (
    WorkflowBuilder("parallel")
    .agent("writer", OllamaExecutor(), model="qwen2.5",
           system_prompt="You write clearly.", tools=["write_file"])
    .task("pros",    agent="writer", prompt="Write pros of remote work. Save to pros.md.")
    .task("cons",    agent="writer", prompt="Write cons of remote work. Save to cons.md.")
    .task("summary", agent="writer",
          prompt="Combine:\n\nPROS:\n{{ tasks.pros }}\n\nCONS:\n{{ tasks.cons }}\n\nSave to summary.md.",
          after=["pros", "cons"])
    .run()
)
pros  ─┐
       ├→  summary
cons  ─┘

Architecture

Stagehand uses a ports-and-adapters (hexagonal) architecture. The dependency rule is strict:

core/     →  nothing external (stdlib only)
ports/    →  nothing (interfaces only)
adapters/ →  ports/ only
builder   →  core/ + ports/
Package Responsibility
stagehand/core/ Domain types, DAG, scheduler, run state, template engine
stagehand/ports/ ABCs: AgentExecutor, ArtifactStorage, SecretProvider
stagehand/adapters/executor/ ClaudeExecutor, OllamaExecutor
stagehand/adapters/storage/ FilesystemStorage
stagehand/adapters/secrets/ EnvSecretProvider
stagehand/builder.py WorkflowBuilder — primary public API

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stagehand_ai-0.5.0.tar.gz (38.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stagehand_ai-0.5.0-py3-none-any.whl (30.3 kB view details)

Uploaded Python 3

File details

Details for the file stagehand_ai-0.5.0.tar.gz.

File metadata

  • Download URL: stagehand_ai-0.5.0.tar.gz
  • Upload date:
  • Size: 38.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for stagehand_ai-0.5.0.tar.gz
Algorithm Hash digest
SHA256 3735fffa2a223860fee851d5a23784cd14a515da4d773d181a6165ba81df84e3
MD5 fea01bf43c676bab4a96119d94b591b5
BLAKE2b-256 25229cacc3079a7ea745b579c48a7f04808584868afc27d526378fdd6982ab75

See more details on using hashes here.

Provenance

The following attestation bundles were made for stagehand_ai-0.5.0.tar.gz:

Publisher: publish.yml on janmarkuslanger/stagehand

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stagehand_ai-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: stagehand_ai-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 30.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for stagehand_ai-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e58fcb899b12238661cc880c1082d299ff05c1f315a8d7d00bc860efc8c231d7
MD5 ad2b48360871534119b9af8ecd38c5ac
BLAKE2b-256 f3aa51640f8ec802a00d7ef652b178c51f3d44de341a43c0ae4178e956357c8f

See more details on using hashes here.

Provenance

The following attestation bundles were made for stagehand_ai-0.5.0-py3-none-any.whl:

Publisher: publish.yml on janmarkuslanger/stagehand

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page