Skip to main content

Asyncio DAG workflow engine for AI agents

Project description

stagehand

Stagehand orchestrates multi-agent AI workflows in pure Python. Each workflow is a directed acyclic graph (DAG) of tasks. Tasks with no dependencies run in parallel; tasks with dependencies wait until their upstream tasks complete.


Installation

pip install stagehand-ai

Requires Python 3.11+.


Quickstart

import asyncio
from stagehand import WorkflowBuilder
from stagehand.adapters.executor import OllamaExecutor

async def main():
    run_id = await (
        WorkflowBuilder("haiku-pipeline")
        .agent("writer", OllamaExecutor(), model="qwen2.5",
               system_prompt="You are a haiku writer.",
               tools=["write_file"])
        .task("draft",  agent="writer", prompt="Write a haiku about the ocean at dawn.")
        .task("refine", agent="writer",
              prompt="Refine this haiku:\n\n{{ tasks.draft }}",
              after=["draft"])
        .run()
    )
    print(f"Done — run {run_id}")

asyncio.run(main())

Concepts

Builder

WorkflowBuilder is the primary API. It lets you define agents and tasks in code, then run the workflow with a single await.

WorkflowBuilder(name, version="1")
  .agent(agent_id, executor, *, model, system_prompt, role, tools)
  .task(task_id, *, agent, prompt, fn, after, outputs, secrets, retry, timeout)
  .state_dir(directory)   # where run state is persisted (default: .stagehand/runs)
  .concurrency(n)         # max tasks running simultaneously (default: unlimited)
  .run(inputs={})         # returns run_id

.build() returns a Workflow object without running it, useful if you want to pass it to a Scheduler directly.


Agents

An agent is a named AI persona with an executor, a model, and a system prompt.

.agent(
    "writer",
    OllamaExecutor(),
    model="qwen2.5",
    system_prompt="You are a concise technical writer.",
    tools=["write_file", "read_file"],
)

Different agents in the same workflow can use different executors:

WorkflowBuilder("pipeline")
.agent("drafter", OllamaExecutor(), model="qwen2.5")
.agent("reviewer", ClaudeExecutor(api_key="..."), model="claude-opus-4-5")

Tasks

A task is a single unit of work in the DAG. There are two kinds:

Agent task — runs a prompt against an AI agent:

.task("draft",  agent="writer", prompt="Write a short intro to Python.")
.task("review", agent="reviewer",
      prompt="Review this draft:\n\n{{ tasks.draft }}",
      after=["draft"])

Deterministic task — runs a plain Python callable (sync or async). No agent or prompt needed:

async def fetch_tickets(ctx):
    return await linear_client.get_tickets()  # returns str or TaskResult

.task("fetch", fn=fetch_tickets)
.task("analyze", agent="analyst",
      prompt="Tickets:\n\n{{ tasks.fetch }}\n\nSummarise.",
      after=["fetch"])

The callable receives a RunContext (access to inputs and previous task results) and must return a str or TaskResult.

Parameter Description
agent ID of the agent that runs this task (required unless fn is set)
prompt Message sent to the agent (supports {{ }} expressions) (required unless fn is set)
fn Python callable to run directly (required unless agent/prompt are set)
after List of task IDs this task waits for
outputs StaticOutputs, DynamicOutputs, or PatternOutputs
secrets List of secret names to resolve at runtime
retry RetryPolicy — how many times to retry on failure
timeout Seconds before a single attempt is cancelled. None = no limit. Sync fn tasks cannot be interrupted.

Tasks with no after (or whose dependencies are all complete) start immediately. Multiple ready tasks run in parallel.


Retry

Pass a RetryPolicy to a task to retry it automatically on failure. Downstream tasks are only cancelled once all attempts are exhausted.

from stagehand import RetryPolicy

.task(
    "fetch",
    agent="worker",
    prompt="Fetch the latest report.",
    retry=RetryPolicy(max_attempts=3, delay=2.0),
)
Parameter Default Description
max_attempts 1 Total attempts including the first (1 = no retry)
delay 0.0 Seconds to wait between attempts

Logging

By default the scheduler is silent. Pass a Logger to see workflow and task lifecycle events.

import logging
from stagehand import WorkflowBuilder, StdlibLogger

logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")

run_id = await (
    WorkflowBuilder("my-pipeline")
    .agent("worker", executor, model="claude-opus-4-5")
    .task("analyse", agent="worker", prompt="...")
    .logger(StdlibLogger())   # <-- add this
    .run()
)

StdlibLogger wraps Python's stdlib logging module and, by default, raises httpx and httpcore to WARNING so their per-request INFO lines don't drown out workflow events.

INFO  workflow 'my-pipeline' started [run=sh-20260520-a1b2]
INFO  task 'analyse' starting
INFO  task 'analyse' done
INFO  workflow 'my-pipeline' finished [run=sh-20260520-a1b2]
Parameter Default Description
name "stagehand" Logger name used with logging.getLogger
suppress_http_logs True Raises httpx/httpcore to WARNING to suppress per-request noise

To silence everything, omit .logger() (the default) or pass NullLogger() explicitly.

When using Scheduler directly, pass the logger to its constructor:

from stagehand import Scheduler, StdlibLogger

scheduler = Scheduler(run_state_directory=".stagehand/runs", logger=StdlibLogger())

To cap the number of tasks running at the same time, pass max_concurrency:

scheduler = Scheduler(run_state_directory=".stagehand/runs", max_concurrency=4)

None (the default) means unlimited — all ready tasks start immediately. Setting it to 1 makes the scheduler execute one task at a time, regardless of the DAG structure.

To write a custom logger — for example to route events to a structured sink — implement the Logger port:

from stagehand import Logger

class MyLogger(Logger):
    def debug(self, message: str) -> None: ...
    def info(self, message: str) -> None: ...
    def warning(self, message: str) -> None: ...
    def error(self, message: str) -> None: ...

Template expressions

Prompts support {{ }} expressions to inject values from previous tasks or runtime inputs.

Expression Resolves to
{{ input.key }} A value passed via inputs={"key": "..."}
{{ tasks.id }} The text output of a completed task
{{ tasks.id.files }} Newline-separated list of file paths produced by a task
{{ tasks.id.filename_md }} Path of a specific file, identified by its slug (filename.mdfilename_md)

Outputs

The outputs parameter of .task() declares what files a task produces.

from stagehand import StaticOutputs, DynamicOutputs, PatternOutputs

# Exact file names known upfront
.task("t1", agent="a", prompt="...", outputs=StaticOutputs(["report.md"]))

# Agent decides at runtime (default)
.task("t2", agent="a", prompt="...", outputs=DynamicOutputs())

# Collect by glob after the task finishes
.task("t3", agent="a", prompt="...", outputs=PatternOutputs(pattern="**/*.md"))

Executors

An executor is the AI backend that drives a task. Stagehand ships two:

OllamaExecutor

Runs models locally via Ollama. No API key required.

from stagehand.adapters.executor import OllamaExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage

executor = OllamaExecutor(
    host="http://localhost:11434",   # default
    storage=FilesystemStorage("./output"),
)
ollama pull qwen2.5
ollama serve

Models with reliable tool use: qwen2.5, llama3.1, llama3.2, mistral-nemo.

ClaudeExecutor

Uses the Anthropic Messages API.

from stagehand.adapters.executor import ClaudeExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage

executor = ClaudeExecutor(
    api_key="sk-ant-...",            # or set ANTHROPIC_API_KEY
    storage=FilesystemStorage("./output"),
)

The default model is claude-opus-4-5.

Rate-limit retries

ClaudeExecutor automatically retries messages.create calls that receive a 429 response. Only the individual API call is retried — not the entire agentic loop — so a rate-limit mid-task retries only the current step.

Parameter Default Description
rate_limit_retries 3 Maximum number of attempts per messages.create call (1 = no retries).
rate_limit_delay 60.0 Seconds to wait between attempts.
logger NullLogger() A Logger instance (e.g. StdlibLogger()) to receive retry warnings.
from stagehand.adapters.logger import StdlibLogger

executor = ClaudeExecutor(
    api_key="sk-ant-...",
    rate_limit_retries=5,
    rate_limit_delay=30.0,
    logger=StdlibLogger(),
)

Custom tools

Pass extra tools to ClaudeExecutor via extra_tools:

from stagehand import ToolDefinition
from stagehand.adapters.executor import ClaudeExecutor

my_tool = ToolDefinition(
    name="fetch_ticket",
    description="Fetch a Linear ticket by ID.",
    input_schema={"type": "object", "properties": {"id": {"type": "string"}}, "required": ["id"]},
    handler=lambda args: fetch_from_linear(args["id"]),
)

executor = ClaudeExecutor(api_key="...", extra_tools=[my_tool])

Artifact path validation

FilesystemStorage always rejects paths that contain .. components to prevent path traversal attacks.

Optionally restrict which file extensions are allowed by passing allowed_extensions:

from stagehand import FilesystemStorage

# Only .txt and .md files may be written
storage = FilesystemStorage("./output", allowed_extensions=[".txt", ".md"])

Extension matching is case-insensitive. Any path that violates a constraint raises ValueError before any I/O is performed. Implement validate_path on a custom ArtifactStorage subclass to apply your own rules:

from stagehand import ArtifactStorage

class MyStorage(ArtifactStorage):
    def validate_path(self, path: str) -> None:
        if not path.startswith("safe/"):
            raise ValueError(f"path must be inside safe/: {path!r}")  # raise to block the write
    ...

Custom executor

Implement AgentExecutor to add any backend:

from stagehand.ports.executor import AgentExecutor, ExecutionRequest, ExecutionResult

class MyExecutor(AgentExecutor):
    async def execute(self, request: ExecutionRequest) -> ExecutionResult:
        output = call_my_model(request.prompt, request.system_prompt)
        return ExecutionResult(output=output)

Resume

A failed or interrupted run can be resumed. Completed tasks are skipped.

from stagehand import Scheduler

scheduler = Scheduler(run_state_directory=".stagehand/runs")
workflow = builder.build()

run_id = await scheduler.run(workflow)
# ... if it fails or you want to retry:
await scheduler.resume(run_id, workflow)

Examples

Sequential

run_id = await (
    WorkflowBuilder("sequential")
    .agent("writer", OllamaExecutor(), model="qwen2.5",
           system_prompt="You write haiku.", tools=["write_file"])
    .task("draft",  agent="writer", prompt="Write a haiku about the ocean. Save to draft.md.")
    .task("refine", agent="writer",
          prompt="Refine this:\n\n{{ tasks.draft }}\n\nSave to final.md.",
          after=["draft"])
    .run()
)
draft  →  refine

Parallel

run_id = await (
    WorkflowBuilder("parallel")
    .agent("writer", OllamaExecutor(), model="qwen2.5",
           system_prompt="You write clearly.", tools=["write_file"])
    .task("pros",    agent="writer", prompt="Write pros of remote work. Save to pros.md.")
    .task("cons",    agent="writer", prompt="Write cons of remote work. Save to cons.md.")
    .task("summary", agent="writer",
          prompt="Combine:\n\nPROS:\n{{ tasks.pros }}\n\nCONS:\n{{ tasks.cons }}\n\nSave to summary.md.",
          after=["pros", "cons"])
    .run()
)
pros  ─┐
       ├→  summary
cons  ─┘

Architecture

Stagehand uses a ports-and-adapters (hexagonal) architecture. The dependency rule is strict:

core/     →  nothing external (stdlib only)
ports/    →  nothing (interfaces only)
adapters/ →  ports/ only
builder   →  core/ + ports/
Package Responsibility
stagehand/core/ Domain types, DAG, scheduler, run state, template engine
stagehand/ports/ ABCs: AgentExecutor, ArtifactStorage, SecretProvider
stagehand/adapters/executor/ ClaudeExecutor, OllamaExecutor
stagehand/adapters/storage/ FilesystemStorage
stagehand/adapters/secrets/ EnvSecretProvider
stagehand/builder.py WorkflowBuilder — primary public API

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stagehand_ai-0.6.0.tar.gz (38.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stagehand_ai-0.6.0-py3-none-any.whl (30.5 kB view details)

Uploaded Python 3

File details

Details for the file stagehand_ai-0.6.0.tar.gz.

File metadata

  • Download URL: stagehand_ai-0.6.0.tar.gz
  • Upload date:
  • Size: 38.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for stagehand_ai-0.6.0.tar.gz
Algorithm Hash digest
SHA256 6e5d96217c243e91a110dbc322abffebce3b6a4601d44cea8309542a692c3071
MD5 1af5a2470e1384a34543825e03de76d6
BLAKE2b-256 81690b882eeda580d6cf555894f1fb75f50e32d90c05df16fcceff94005982ed

See more details on using hashes here.

Provenance

The following attestation bundles were made for stagehand_ai-0.6.0.tar.gz:

Publisher: publish.yml on janmarkuslanger/stagehand

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stagehand_ai-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: stagehand_ai-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 30.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for stagehand_ai-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b12cc71d87976c733303059e257ff21c76c8a3bd2fb313090caa8ee305aac6f5
MD5 98e85d03388a2bbfd579e77180a460b0
BLAKE2b-256 e9d75c4c992ed18a4001d4734ba55a61b1cda0abae2ac2e00606fea1336d696f

See more details on using hashes here.

Provenance

The following attestation bundles were made for stagehand_ai-0.6.0-py3-none-any.whl:

Publisher: publish.yml on janmarkuslanger/stagehand

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page