Skip to main content

Asyncio DAG workflow engine for AI agents

Project description

stagehand

Stagehand orchestrates multi-agent AI workflows in pure Python. Each workflow is a directed acyclic graph (DAG) of tasks. Tasks with no dependencies run in parallel; tasks with dependencies wait until their upstream tasks complete.


Installation

pip install stagehand-ai

Requires Python 3.11+.


Quickstart

import asyncio
from stagehand import WorkflowBuilder
from stagehand.adapters.executor import OllamaExecutor

async def main():
    run_id = await (
        WorkflowBuilder("haiku-pipeline")
        .agent("writer", OllamaExecutor(), model="qwen2.5",
               system_prompt="You are a haiku writer.",
               tools=["write_file"])
        .task("draft",  agent="writer", prompt="Write a haiku about the ocean at dawn.")
        .task("refine", agent="writer",
              prompt="Refine this haiku:\n\n{{ tasks.draft }}",
              after=["draft"])
        .run()
    )
    print(f"Done — run {run_id}")

asyncio.run(main())

Concepts

Builder

WorkflowBuilder is the primary API. It lets you define agents and tasks in code, then run the workflow with a single await.

WorkflowBuilder(name, version="1")
  .agent(agent_id, executor, *, model, system_prompt, role, tools)
  .task(task_id, *, agent, prompt, after, outputs, secrets)
  .state_dir(directory)   # where run state is persisted (default: .stagehand/runs)
  .run(inputs={})         # returns run_id

.build() returns a Workflow object without running it, useful if you want to pass it to a Scheduler directly.


Agents

An agent is a named AI persona with an executor, a model, and a system prompt.

.agent(
    "writer",
    OllamaExecutor(),
    model="qwen2.5",
    system_prompt="You are a concise technical writer.",
    tools=["write_file", "read_file"],
)

Different agents in the same workflow can use different executors:

WorkflowBuilder("pipeline")
.agent("drafter", OllamaExecutor(), model="qwen2.5")
.agent("reviewer", ClaudeExecutor(api_key="..."), model="claude-opus-4-5")

Tasks

A task is a single unit of work in the DAG. There are two kinds:

Agent task — runs a prompt against an AI agent:

.task("draft",  agent="writer", prompt="Write a short intro to Python.")
.task("review", agent="reviewer",
      prompt="Review this draft:\n\n{{ tasks.draft }}",
      after=["draft"])

Deterministic task — runs a plain Python callable (sync or async). No agent or prompt needed:

async def fetch_tickets(ctx):
    return await linear_client.get_tickets()  # returns str or TaskResult

.task("fetch", fn=fetch_tickets)
.task("analyze", agent="analyst",
      prompt="Tickets:\n\n{{ tasks.fetch }}\n\nSummarise.",
      after=["fetch"])

The callable receives a RunContext (access to inputs and previous task results) and must return a str or TaskResult.

Parameter Description
agent ID of the agent that runs this task (required unless fn is set)
prompt Message sent to the agent (supports {{ }} expressions) (required unless fn is set)
fn Python callable to run directly (required unless agent/prompt are set)
after List of task IDs this task waits for
outputs StaticOutputs, DynamicOutputs, or PatternOutputs
secrets List of secret names to resolve at runtime
retry RetryPolicy — how many times to retry on failure

Tasks with no after (or whose dependencies are all complete) start immediately. Multiple ready tasks run in parallel.


Retry

Pass a RetryPolicy to a task to retry it automatically on failure. Downstream tasks are only cancelled once all attempts are exhausted.

from stagehand import RetryPolicy

.task(
    "fetch",
    agent="worker",
    prompt="Fetch the latest report.",
    retry=RetryPolicy(max_attempts=3, delay=2.0),
)
Parameter Default Description
max_attempts 1 Total attempts including the first (1 = no retry)
delay 0.0 Seconds to wait between attempts

Template expressions

Prompts support {{ }} expressions to inject values from previous tasks or runtime inputs.

Expression Resolves to
{{ input.key }} A value passed via inputs={"key": "..."}
{{ tasks.id }} The text output of a completed task
{{ tasks.id.files }} Newline-separated list of file paths produced by a task
{{ tasks.id.filename_md }} Path of a specific file, identified by its slug (filename.mdfilename_md)

Outputs

The outputs parameter of .task() declares what files a task produces.

from stagehand import StaticOutputs, DynamicOutputs, PatternOutputs

# Exact file names known upfront
.task("t1", agent="a", prompt="...", outputs=StaticOutputs(["report.md"]))

# Agent decides at runtime (default)
.task("t2", agent="a", prompt="...", outputs=DynamicOutputs())

# Collect by glob after the task finishes
.task("t3", agent="a", prompt="...", outputs=PatternOutputs(pattern="**/*.md"))

Executors

An executor is the AI backend that drives a task. Stagehand ships two:

OllamaExecutor

Runs models locally via Ollama. No API key required.

from stagehand.adapters.executor import OllamaExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage

executor = OllamaExecutor(
    host="http://localhost:11434",   # default
    storage=FilesystemStorage("./output"),
)
ollama pull qwen2.5
ollama serve

Models with reliable tool use: qwen2.5, llama3.1, llama3.2, mistral-nemo.

ClaudeExecutor

Uses the Anthropic Messages API.

from stagehand.adapters.executor import ClaudeExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage

executor = ClaudeExecutor(
    api_key="sk-ant-...",            # or set ANTHROPIC_API_KEY
    storage=FilesystemStorage("./output"),
)

The default model is claude-opus-4-5.

Custom tools

Pass extra tools to ClaudeExecutor via extra_tools:

from stagehand import ToolDefinition
from stagehand.adapters.executor import ClaudeExecutor

my_tool = ToolDefinition(
    name="fetch_ticket",
    description="Fetch a Linear ticket by ID.",
    input_schema={"type": "object", "properties": {"id": {"type": "string"}}, "required": ["id"]},
    handler=lambda args: fetch_from_linear(args["id"]),
)

executor = ClaudeExecutor(api_key="...", extra_tools=[my_tool])

Custom executor

Implement AgentExecutor to add any backend:

from stagehand.ports.executor import AgentExecutor, ExecutionRequest, ExecutionResult

class MyExecutor(AgentExecutor):
    async def execute(self, request: ExecutionRequest) -> ExecutionResult:
        output = call_my_model(request.prompt, request.system_prompt)
        return ExecutionResult(output=output)

Resume

A failed or interrupted run can be resumed. Completed tasks are skipped.

from stagehand import Scheduler

scheduler = Scheduler(run_state_directory=".stagehand/runs")
workflow = builder.build()

run_id = await scheduler.run(workflow)
# ... if it fails or you want to retry:
await scheduler.resume(run_id, workflow)

Examples

Sequential

run_id = await (
    WorkflowBuilder("sequential")
    .agent("writer", OllamaExecutor(), model="qwen2.5",
           system_prompt="You write haiku.", tools=["write_file"])
    .task("draft",  agent="writer", prompt="Write a haiku about the ocean. Save to draft.md.")
    .task("refine", agent="writer",
          prompt="Refine this:\n\n{{ tasks.draft }}\n\nSave to final.md.",
          after=["draft"])
    .run()
)
draft  →  refine

Parallel

run_id = await (
    WorkflowBuilder("parallel")
    .agent("writer", OllamaExecutor(), model="qwen2.5",
           system_prompt="You write clearly.", tools=["write_file"])
    .task("pros",    agent="writer", prompt="Write pros of remote work. Save to pros.md.")
    .task("cons",    agent="writer", prompt="Write cons of remote work. Save to cons.md.")
    .task("summary", agent="writer",
          prompt="Combine:\n\nPROS:\n{{ tasks.pros }}\n\nCONS:\n{{ tasks.cons }}\n\nSave to summary.md.",
          after=["pros", "cons"])
    .run()
)
pros  ─┐
       ├→  summary
cons  ─┘

Architecture

Stagehand uses a ports-and-adapters (hexagonal) architecture. The dependency rule is strict:

core/     →  nothing external (stdlib only)
ports/    →  nothing (interfaces only)
adapters/ →  ports/ only
builder   →  core/ + ports/
Package Responsibility
stagehand/core/ Domain types, DAG, scheduler, run state, template engine
stagehand/ports/ ABCs: AgentExecutor, ArtifactStorage, SecretProvider
stagehand/adapters/executor/ ClaudeExecutor, OllamaExecutor
stagehand/adapters/storage/ FilesystemStorage
stagehand/adapters/secrets/ EnvSecretProvider
stagehand/builder.py WorkflowBuilder — primary public API

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stagehand_ai-0.2.0.tar.gz (29.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stagehand_ai-0.2.0-py3-none-any.whl (26.2 kB view details)

Uploaded Python 3

File details

Details for the file stagehand_ai-0.2.0.tar.gz.

File metadata

  • Download URL: stagehand_ai-0.2.0.tar.gz
  • Upload date:
  • Size: 29.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for stagehand_ai-0.2.0.tar.gz
Algorithm Hash digest
SHA256 8bea5a4e61448657eb5985bef60c9cafb93151eee25ce4e195c2600590b5d2c8
MD5 a416410048cfe1af3eac8351cb0b57db
BLAKE2b-256 0bbbf10f79fd4552ce91de8ef9dd3b3d19bb50b12d7a8e1909731f510cac81cf

See more details on using hashes here.

Provenance

The following attestation bundles were made for stagehand_ai-0.2.0.tar.gz:

Publisher: publish.yml on janmarkuslanger/stagehand

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stagehand_ai-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: stagehand_ai-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 26.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for stagehand_ai-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2f3c1db94683b5a24882d9de7d5dcbcfc544dfd4874883a039249e5d0901eb20
MD5 a9bf966b00ee16f6e6ab905f324d831e
BLAKE2b-256 275e1d706bd06f3caa3bf483d99edbf2b822098b9ce852022cab56b9149e33ca

See more details on using hashes here.

Provenance

The following attestation bundles were made for stagehand_ai-0.2.0-py3-none-any.whl:

Publisher: publish.yml on janmarkuslanger/stagehand

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page