Skip to main content

Temporal-backed workflow runtime for Claude Code skills

Project description

sagaflow

CI PyPI Python 3.11+ License: MIT

Durable execution for long-running agent workflows, on top of Temporal.

You write a Python workflow that calls models, runs tools, and writes artifacts. sagaflow runs each step as a Temporal activity, so when the worker dies — or a 40-minute fan-out crashes halfway — the next launch resumes from the last completed step instead of starting over. Results land in ~/.sagaflow/INBOX.md whether or not you're still attached to the session that started them.

Quick start

pip install sagaflow
temporal server start-dev &
export ANTHROPIC_API_KEY=sk-ant-...

sagaflow launch hello-world --name alice --await
# → hello, alice

Kill the terminal mid-run and re-launch the same workflow ID: it picks up from where the worker died.

What you get

  • Resumes after crashes. Activity-level checkpointing via Temporal — workers, sessions, and laptops can all die without losing in-flight work.
  • Decoupled from the caller. Fire-and-forget submissions land in an append-only inbox; a session-start hook surfaces unread results next time you open Claude Code.
  • Provider-agnostic transport. Anthropic SDK by default; point ANTHROPIC_BASE_URL at Bedrock, a model gateway, or any compatible proxy.
  • Auto-managed worker. First sagaflow launch spawns a worker daemon; sagaflow doctor reports health.

Install

pip install sagaflow

Requirements:

  • Python 3.11+
  • Temporal CLI running locally: brew install temporal && temporal server start-dev
  • An Anthropic API key (or a compatible proxy via ANTHROPIC_BASE_URL)

Authoring a skill

A skill is a directory under ~/.claude/skills/<skill-name>/ containing three things:

  1. workflow.py — a Temporal workflow class (the durable orchestration)
  2. __init__.py — a register() function that wires the workflow into sagaflow
  3. prompts/*.md — the system/user prompts the workflow's activities load

Here is the complete hello-world skill (the one sagaflow launch hello-world --name alice runs).

~/.claude/skills/hello-world/workflow.py — the durable workflow:

from dataclasses import dataclass
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from sagaflow.durable.activities import (
        EmitFindingInput, SpawnSubagentInput, WriteArtifactInput,
    )
    from sagaflow.durable.retry_policies import HAIKU_POLICY


@dataclass(frozen=True)
class HelloWorldInput:
    run_id: str
    name: str
    inbox_path: str
    run_dir: str
    greeter_system_prompt: str
    greeter_user_prompt: str


@workflow.defn(name="HelloWorldWorkflow")
class HelloWorldWorkflow:
    @workflow.run
    async def run(self, inp: HelloWorldInput) -> str:
        prompt_path = f"{inp.run_dir}/prompt.txt"
        await workflow.execute_activity(
            "write_artifact",
            WriteArtifactInput(path=prompt_path, content=inp.greeter_user_prompt),
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=HAIKU_POLICY,
        )
        parsed = await workflow.execute_activity(
            "spawn_subagent",
            SpawnSubagentInput(
                role="greeter", tier_name="HAIKU",
                system_prompt=inp.greeter_system_prompt,
                user_prompt_path=prompt_path,
                max_tokens=64, tools_needed=False,
            ),
            start_to_close_timeout=timedelta(seconds=600),
            heartbeat_timeout=timedelta(seconds=120),
            retry_policy=HAIKU_POLICY,
        )
        greeting = parsed.get("GREETING", "hello")
        await workflow.execute_activity(
            "emit_finding",
            EmitFindingInput(
                inbox_path=inp.inbox_path, run_id=inp.run_id,
                skill="hello-world", status="DONE", summary=greeting,
                timestamp_iso=workflow.now().isoformat(timespec="seconds"),
            ),
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=HAIKU_POLICY,
        )
        return greeting

Each execute_activity call is a checkpoint. If the worker dies between them, replay resumes from the last completed one.

~/.claude/skills/hello-world/__init__.py — registration:

from typing import Any

from sagaflow.durable.activities import emit_finding, spawn_subagent, write_artifact
from sagaflow.prompts import load_prompt
from sagaflow.registry import SkillRegistry, SkillSpec

from skills.hello_world.workflow import HelloWorldInput, HelloWorldWorkflow


def _build_input(*, run_id, run_dir, inbox_path, cli_args: dict[str, Any]) -> HelloWorldInput:
    name = str(cli_args.get("name", "world"))
    return HelloWorldInput(
        run_id=run_id, name=name,
        inbox_path=inbox_path, run_dir=run_dir,
        greeter_system_prompt=load_prompt(__file__, "greeter.system"),
        greeter_user_prompt=load_prompt(__file__, "greeter.user", substitutions={"name": name}),
    )


def register(registry: SkillRegistry) -> None:
    registry.register(SkillSpec(
        name="hello-world",
        workflow_cls=HelloWorldWorkflow,
        activities=[write_artifact, emit_finding, spawn_subagent],
        build_input=_build_input,
    ))

register() is what the worker calls at startup to discover the skill. _build_input translates CLI args (--name alice) into the workflow's input dataclass and loads prompts from disk.

~/.claude/skills/hello-world/prompts/greeter.system.md:

You are a greeter. Output a greeting using the format
STRUCTURED_OUTPUT_START / GREETING|<text> / STRUCTURED_OUTPUT_END.
Do not include any other text.

~/.claude/skills/hello-world/prompts/greeter.user.md:

Greet $name

That's the whole skill. sagaflow launch hello-world --name alice finds the registration, builds the input, hands the workflow to Temporal, and the worker runs it durably.

CLI

sagaflow launch <name> --arg key=value [--await]   # submit a workflow
sagaflow inbox                                     # list unread results
sagaflow dismiss <run-id>                          # mark as read
sagaflow doctor                                    # diagnose temporal/worker/hook

How it works

sagaflow launch
   │
   ▼
preflight (install hook, spawn worker if missing)
   │
   ▼
Temporal (localhost:7233) ── workflow ID ── worker daemon
                                              │
                                              ▼
                                         activities:
                                          • model calls
                                          • file I/O
                                          • inbox emit
                                              │
                                              ▼
                              ~/.sagaflow/INBOX.md  +  desktop notify
                                              │
                                              ▼
                                  next session: SessionStart
                                  hook surfaces unread runs

If the worker crashes mid-run, the next sagaflow launch (or the next worker poll) resumes from the last completed activity. Activities that already succeeded don't re-execute.

Development

git clone https://github.com/npow/sagaflow
cd sagaflow
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"

ruff check sagaflow tests
mypy sagaflow
pytest

# Opt-in end-to-end tests (require live Temporal + real Anthropic access)
SAGAFLOW_E2E=1 pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sagaflow-0.10.23.tar.gz (540.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sagaflow-0.10.23-py3-none-any.whl (329.3 kB view details)

Uploaded Python 3

File details

Details for the file sagaflow-0.10.23.tar.gz.

File metadata

  • Download URL: sagaflow-0.10.23.tar.gz
  • Upload date:
  • Size: 540.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sagaflow-0.10.23.tar.gz
Algorithm Hash digest
SHA256 3cb8062c3cbacf97554d15494dc5057691b9cc57d41e1391ba62916cef4edfcd
MD5 f69ffcd9c1a614fcdd82971c33040371
BLAKE2b-256 ab92d6aa1421aa674b2a5952d10f1c3cdb3a551d6bd7d11d26e1fa49631dd4b4

See more details on using hashes here.

Provenance

The following attestation bundles were made for sagaflow-0.10.23.tar.gz:

Publisher: publish.yml on npow/sagaflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sagaflow-0.10.23-py3-none-any.whl.

File metadata

  • Download URL: sagaflow-0.10.23-py3-none-any.whl
  • Upload date:
  • Size: 329.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sagaflow-0.10.23-py3-none-any.whl
Algorithm Hash digest
SHA256 10445a1784dab5a0744e5e77e2f3dff22efa6147a0aa3eedc60faae3b0325536
MD5 1a9c9158fd720424126c619fa32bc119
BLAKE2b-256 dd5f6170b90d061a4226b40973d395a564bc81611fe75237bbbf665d98d7f264

See more details on using hashes here.

Provenance

The following attestation bundles were made for sagaflow-0.10.23-py3-none-any.whl:

Publisher: publish.yml on npow/sagaflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page