Skip to main content

Temporal-backed workflow runtime for Claude Code skills

Project description

sagaflow

CI PyPI Python 3.11+ License: MIT

Durable execution for long-running agent workflows, on top of Temporal.

You write a Python workflow that calls models, runs tools, and writes artifacts. sagaflow runs each step as a Temporal activity, so when the worker dies — or a 40-minute fan-out crashes halfway — the next launch resumes from the last completed step instead of starting over. Results land in ~/.sagaflow/INBOX.md whether or not you're still attached to the session that started them.

Quick start

pip install sagaflow
temporal server start-dev &
export ANTHROPIC_API_KEY=sk-ant-...

sagaflow launch hello-world --name alice --await
# → hello, alice

Kill the terminal mid-run and re-launch the same workflow ID: it picks up from where the worker died.

What you get

  • Resumes after crashes. Activity-level checkpointing via Temporal — workers, sessions, and laptops can all die without losing in-flight work.
  • Decoupled from the caller. Fire-and-forget submissions land in an append-only inbox; a session-start hook surfaces unread results next time you open Claude Code.
  • Provider-agnostic transport. Anthropic SDK by default; point ANTHROPIC_BASE_URL at Bedrock, a model gateway, or any compatible proxy.
  • Auto-managed worker. First sagaflow launch spawns a worker daemon; sagaflow doctor reports health.

Install

pip install sagaflow

Requirements:

  • Python 3.11+
  • Temporal CLI running locally: brew install temporal && temporal server start-dev
  • An Anthropic API key (or a compatible proxy via ANTHROPIC_BASE_URL)

Authoring a skill

A skill is a directory under ~/.claude/skills/<skill-name>/ containing three things:

  1. workflow.py — a Temporal workflow class (the durable orchestration)
  2. __init__.py — a register() function that wires the workflow into sagaflow
  3. prompts/*.md — the system/user prompts the workflow's activities load

Here is the complete hello-world skill (the one sagaflow launch hello-world --name alice runs).

~/.claude/skills/hello-world/workflow.py — the durable workflow:

from dataclasses import dataclass
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from sagaflow.durable.activities import (
        EmitFindingInput, SpawnSubagentInput, WriteArtifactInput,
    )
    from sagaflow.durable.retry_policies import HAIKU_POLICY


@dataclass(frozen=True)
class HelloWorldInput:
    run_id: str
    name: str
    inbox_path: str
    run_dir: str
    greeter_system_prompt: str
    greeter_user_prompt: str


@workflow.defn(name="HelloWorldWorkflow")
class HelloWorldWorkflow:
    @workflow.run
    async def run(self, inp: HelloWorldInput) -> str:
        prompt_path = f"{inp.run_dir}/prompt.txt"
        await workflow.execute_activity(
            "write_artifact",
            WriteArtifactInput(path=prompt_path, content=inp.greeter_user_prompt),
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=HAIKU_POLICY,
        )
        parsed = await workflow.execute_activity(
            "spawn_subagent",
            SpawnSubagentInput(
                role="greeter", tier_name="HAIKU",
                system_prompt=inp.greeter_system_prompt,
                user_prompt_path=prompt_path,
                max_tokens=64, tools_needed=False,
            ),
            start_to_close_timeout=timedelta(seconds=600),
            heartbeat_timeout=timedelta(seconds=120),
            retry_policy=HAIKU_POLICY,
        )
        greeting = parsed.get("GREETING", "hello")
        await workflow.execute_activity(
            "emit_finding",
            EmitFindingInput(
                inbox_path=inp.inbox_path, run_id=inp.run_id,
                skill="hello-world", status="DONE", summary=greeting,
                timestamp_iso=workflow.now().isoformat(timespec="seconds"),
            ),
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=HAIKU_POLICY,
        )
        return greeting

Each execute_activity call is a checkpoint. If the worker dies between them, replay resumes from the last completed one.

~/.claude/skills/hello-world/__init__.py — registration:

from typing import Any

from sagaflow.durable.activities import emit_finding, spawn_subagent, write_artifact
from sagaflow.prompts import load_prompt
from sagaflow.registry import SkillRegistry, SkillSpec

from skills.hello_world.workflow import HelloWorldInput, HelloWorldWorkflow


def _build_input(*, run_id, run_dir, inbox_path, cli_args: dict[str, Any]) -> HelloWorldInput:
    name = str(cli_args.get("name", "world"))
    return HelloWorldInput(
        run_id=run_id, name=name,
        inbox_path=inbox_path, run_dir=run_dir,
        greeter_system_prompt=load_prompt(__file__, "greeter.system"),
        greeter_user_prompt=load_prompt(__file__, "greeter.user", substitutions={"name": name}),
    )


def register(registry: SkillRegistry) -> None:
    registry.register(SkillSpec(
        name="hello-world",
        workflow_cls=HelloWorldWorkflow,
        activities=[write_artifact, emit_finding, spawn_subagent],
        build_input=_build_input,
    ))

register() is what the worker calls at startup to discover the skill. _build_input translates CLI args (--name alice) into the workflow's input dataclass and loads prompts from disk.

~/.claude/skills/hello-world/prompts/greeter.system.md:

You are a greeter. Output a greeting using the format
STRUCTURED_OUTPUT_START / GREETING|<text> / STRUCTURED_OUTPUT_END.
Do not include any other text.

~/.claude/skills/hello-world/prompts/greeter.user.md:

Greet $name

That's the whole skill. sagaflow launch hello-world --name alice finds the registration, builds the input, hands the workflow to Temporal, and the worker runs it durably.

CLI

sagaflow launch <name> --arg key=value [--await]   # submit a workflow
sagaflow inbox                                     # list unread results
sagaflow dismiss <run-id>                          # mark as read
sagaflow doctor                                    # diagnose temporal/worker/hook

How it works

sagaflow launch
   │
   ▼
preflight (install hook, spawn worker if missing)
   │
   ▼
Temporal (localhost:7233) ── workflow ID ── worker daemon
                                              │
                                              ▼
                                         activities:
                                          • model calls
                                          • file I/O
                                          • inbox emit
                                              │
                                              ▼
                              ~/.sagaflow/INBOX.md  +  desktop notify
                                              │
                                              ▼
                                  next session: SessionStart
                                  hook surfaces unread runs

If the worker crashes mid-run, the next sagaflow launch (or the next worker poll) resumes from the last completed activity. Activities that already succeeded don't re-execute.

Development

git clone https://github.com/npow/sagaflow
cd sagaflow
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"

ruff check sagaflow tests
mypy sagaflow
pytest

# Opt-in end-to-end tests (require live Temporal + real Anthropic access)
SAGAFLOW_E2E=1 pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sagaflow-0.10.13.tar.gz (537.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sagaflow-0.10.13-py3-none-any.whl (260.5 kB view details)

Uploaded Python 3

File details

Details for the file sagaflow-0.10.13.tar.gz.

File metadata

  • Download URL: sagaflow-0.10.13.tar.gz
  • Upload date:
  • Size: 537.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sagaflow-0.10.13.tar.gz
Algorithm Hash digest
SHA256 3d80060e9e45344108a945beeda70fb5f689a6e9149b8ab39610a62c60eb7cfa
MD5 55d933cb8b95f41263bbe9fd032b7a82
BLAKE2b-256 046baa0809bd32803f119cde09f0dcb87d6ceed7ff863641851c877bcde5a1ac

See more details on using hashes here.

Provenance

The following attestation bundles were made for sagaflow-0.10.13.tar.gz:

Publisher: publish.yml on npow/sagaflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sagaflow-0.10.13-py3-none-any.whl.

File metadata

  • Download URL: sagaflow-0.10.13-py3-none-any.whl
  • Upload date:
  • Size: 260.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sagaflow-0.10.13-py3-none-any.whl
Algorithm Hash digest
SHA256 478d13ab0360d418773ce8b84e50ab2bbfd08053a4c47ed52c69d08d7256a76b
MD5 861df57c2b9f3827c16b0636715a98c1
BLAKE2b-256 d7fd7225854b356e0fe9a1bcf8d1274a6b42fadb5728839c2c70e1e49523d893

See more details on using hashes here.

Provenance

The following attestation bundles were made for sagaflow-0.10.13-py3-none-any.whl:

Publisher: publish.yml on npow/sagaflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page