Skip to main content

Temporal-backed workflow runtime for Claude Code skills

Project description

sagaflow

CI PyPI Python 3.11+ License: MIT

Durable execution for long-running agent workflows, on top of Temporal.

You write a Python workflow that calls models, runs tools, and writes artifacts. sagaflow runs each step as a Temporal activity, so when the worker dies — or a 40-minute fan-out crashes halfway — the next launch resumes from the last completed step instead of starting over. Results land in ~/.sagaflow/INBOX.md whether or not you're still attached to the session that started them.

Quick start

pip install sagaflow
temporal server start-dev &
export ANTHROPIC_API_KEY=sk-ant-...

sagaflow launch hello-world --name alice --await
# → hello, alice

Kill the terminal mid-run and re-launch the same workflow ID: it picks up from where the worker died.

What you get

  • Resumes after crashes. Activity-level checkpointing via Temporal — workers, sessions, and laptops can all die without losing in-flight work.
  • Decoupled from the caller. Fire-and-forget submissions land in an append-only inbox; a session-start hook surfaces unread results next time you open Claude Code.
  • Provider-agnostic transport. Anthropic SDK by default; point ANTHROPIC_BASE_URL at Bedrock, a model gateway, or any compatible proxy.
  • Auto-managed worker. First sagaflow launch spawns a worker daemon; sagaflow doctor reports health.

Install

pip install sagaflow

Requirements:

  • Python 3.11+
  • Temporal CLI running locally: brew install temporal && temporal server start-dev
  • An Anthropic API key (or a compatible proxy via ANTHROPIC_BASE_URL)

Authoring a skill

A skill is a directory under ~/.claude/skills/<skill-name>/ containing three things:

  1. workflow.py — a Temporal workflow class (the durable orchestration)
  2. __init__.py — a register() function that wires the workflow into sagaflow
  3. prompts/*.md — the system/user prompts the workflow's activities load

Here is the complete hello-world skill (the one sagaflow launch hello-world --name alice runs).

~/.claude/skills/hello-world/workflow.py — the durable workflow:

from dataclasses import dataclass
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from sagaflow.durable.activities import (
        EmitFindingInput, SpawnSubagentInput, WriteArtifactInput,
    )
    from sagaflow.durable.retry_policies import HAIKU_POLICY


@dataclass(frozen=True)
class HelloWorldInput:
    run_id: str
    name: str
    inbox_path: str
    run_dir: str
    greeter_system_prompt: str
    greeter_user_prompt: str


@workflow.defn(name="HelloWorldWorkflow")
class HelloWorldWorkflow:
    @workflow.run
    async def run(self, inp: HelloWorldInput) -> str:
        prompt_path = f"{inp.run_dir}/prompt.txt"
        await workflow.execute_activity(
            "write_artifact",
            WriteArtifactInput(path=prompt_path, content=inp.greeter_user_prompt),
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=HAIKU_POLICY,
        )
        parsed = await workflow.execute_activity(
            "spawn_subagent",
            SpawnSubagentInput(
                role="greeter", tier_name="HAIKU",
                system_prompt=inp.greeter_system_prompt,
                user_prompt_path=prompt_path,
                max_tokens=64, tools_needed=False,
            ),
            start_to_close_timeout=timedelta(seconds=600),
            heartbeat_timeout=timedelta(seconds=120),
            retry_policy=HAIKU_POLICY,
        )
        greeting = parsed.get("GREETING", "hello")
        await workflow.execute_activity(
            "emit_finding",
            EmitFindingInput(
                inbox_path=inp.inbox_path, run_id=inp.run_id,
                skill="hello-world", status="DONE", summary=greeting,
                timestamp_iso=workflow.now().isoformat(timespec="seconds"),
            ),
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=HAIKU_POLICY,
        )
        return greeting

Each execute_activity call is a checkpoint. If the worker dies between them, replay resumes from the last completed one.

~/.claude/skills/hello-world/__init__.py — registration:

from typing import Any

from sagaflow.durable.activities import emit_finding, spawn_subagent, write_artifact
from sagaflow.prompts import load_prompt
from sagaflow.registry import SkillRegistry, SkillSpec

from skills.hello_world.workflow import HelloWorldInput, HelloWorldWorkflow


def _build_input(*, run_id, run_dir, inbox_path, cli_args: dict[str, Any]) -> HelloWorldInput:
    name = str(cli_args.get("name", "world"))
    return HelloWorldInput(
        run_id=run_id, name=name,
        inbox_path=inbox_path, run_dir=run_dir,
        greeter_system_prompt=load_prompt(__file__, "greeter.system"),
        greeter_user_prompt=load_prompt(__file__, "greeter.user", substitutions={"name": name}),
    )


def register(registry: SkillRegistry) -> None:
    registry.register(SkillSpec(
        name="hello-world",
        workflow_cls=HelloWorldWorkflow,
        activities=[write_artifact, emit_finding, spawn_subagent],
        build_input=_build_input,
    ))

register() is what the worker calls at startup to discover the skill. _build_input translates CLI args (--name alice) into the workflow's input dataclass and loads prompts from disk.

~/.claude/skills/hello-world/prompts/greeter.system.md:

You are a greeter. Output a greeting using the format
STRUCTURED_OUTPUT_START / GREETING|<text> / STRUCTURED_OUTPUT_END.
Do not include any other text.

~/.claude/skills/hello-world/prompts/greeter.user.md:

Greet $name

That's the whole skill. sagaflow launch hello-world --name alice finds the registration, builds the input, hands the workflow to Temporal, and the worker runs it durably.

CLI

sagaflow launch <name> --arg key=value [--await]   # submit a workflow
sagaflow inbox                                     # list unread results
sagaflow dismiss <run-id>                          # mark as read
sagaflow doctor                                    # diagnose temporal/worker/hook

How it works

sagaflow launch
   │
   ▼
preflight (install hook, spawn worker if missing)
   │
   ▼
Temporal (localhost:7233) ── workflow ID ── worker daemon
                                              │
                                              ▼
                                         activities:
                                          • model calls
                                          • file I/O
                                          • inbox emit
                                              │
                                              ▼
                              ~/.sagaflow/INBOX.md  +  desktop notify
                                              │
                                              ▼
                                  next session: SessionStart
                                  hook surfaces unread runs

If the worker crashes mid-run, the next sagaflow launch (or the next worker poll) resumes from the last completed activity. Activities that already succeeded don't re-execute.

Development

git clone https://github.com/npow/sagaflow
cd sagaflow
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"

ruff check sagaflow tests
mypy sagaflow
pytest

# Opt-in end-to-end tests (require live Temporal + real Anthropic access)
SAGAFLOW_E2E=1 pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sagaflow-0.10.26.tar.gz (541.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sagaflow-0.10.26-py3-none-any.whl (285.7 kB view details)

Uploaded Python 3

File details

Details for the file sagaflow-0.10.26.tar.gz.

File metadata

  • Download URL: sagaflow-0.10.26.tar.gz
  • Upload date:
  • Size: 541.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sagaflow-0.10.26.tar.gz
Algorithm Hash digest
SHA256 afadeec1209049e0731f05b14bdec25ab497bff74d85a1dfe67081d9c5c0bb59
MD5 f14676a99c79626ba797f639704584d1
BLAKE2b-256 c388bbf8492a4b9a52667879b486bbac4c4860a6ccf427c7f239242248a79415

See more details on using hashes here.

Provenance

The following attestation bundles were made for sagaflow-0.10.26.tar.gz:

Publisher: publish.yml on npow/sagaflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sagaflow-0.10.26-py3-none-any.whl.

File metadata

  • Download URL: sagaflow-0.10.26-py3-none-any.whl
  • Upload date:
  • Size: 285.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sagaflow-0.10.26-py3-none-any.whl
Algorithm Hash digest
SHA256 bf61cd4a44fe16bc8f0f519c2b39bb9c9e40b1b5eadbaa7504f26312e1989aff
MD5 7e2b5897b8e1a170a167948097b3c4c6
BLAKE2b-256 9d188eb8a204d6a546bc466f3a3c5755d41547b59af492e5907f18680d21ded9

See more details on using hashes here.

Provenance

The following attestation bundles were made for sagaflow-0.10.26-py3-none-any.whl:

Publisher: publish.yml on npow/sagaflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page