Temporal-backed workflow runtime for Claude Code skills
Project description
sagaflow
Durable execution for long-running agent workflows, on top of Temporal.
You write a Python workflow that calls models, runs tools, and writes artifacts. sagaflow runs each step as a Temporal activity, so when the worker dies — or a 40-minute fan-out crashes halfway — the next launch resumes from the last completed step instead of starting over. Results land in ~/.sagaflow/INBOX.md whether or not you're still attached to the session that started them.
Quick start
pip install sagaflow
temporal server start-dev &
export ANTHROPIC_API_KEY=sk-ant-...
sagaflow launch hello-world --name alice --await
# → hello, alice
Kill the terminal mid-run and re-launch the same workflow ID: it picks up from where the worker died.
What you get
- Resumes after crashes. Activity-level checkpointing via Temporal — workers, sessions, and laptops can all die without losing in-flight work.
- Decoupled from the caller. Fire-and-forget submissions land in an append-only inbox; a session-start hook surfaces unread results next time you open Claude Code.
- Provider-agnostic transport. Anthropic SDK by default; point
ANTHROPIC_BASE_URLat Bedrock, a model gateway, or any compatible proxy. - Auto-managed worker. First
sagaflow launchspawns a worker daemon;sagaflow doctorreports health.
Install
pip install sagaflow
Requirements:
- Python 3.11+
- Temporal CLI running locally:
brew install temporal && temporal server start-dev - An Anthropic API key (or a compatible proxy via
ANTHROPIC_BASE_URL)
Authoring a skill
A skill is a directory under ~/.claude/skills/<skill-name>/ containing three things:
workflow.py— a Temporal workflow class (the durable orchestration)__init__.py— aregister()function that wires the workflow into sagaflowprompts/*.md— the system/user prompts the workflow's activities load
Here is the complete hello-world skill (the one sagaflow launch hello-world --name alice runs).
~/.claude/skills/hello-world/workflow.py — the durable workflow:
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from sagaflow.durable.activities import (
EmitFindingInput, SpawnSubagentInput, WriteArtifactInput,
)
from sagaflow.durable.retry_policies import HAIKU_POLICY
@dataclass(frozen=True)
class HelloWorldInput:
run_id: str
name: str
inbox_path: str
run_dir: str
greeter_system_prompt: str
greeter_user_prompt: str
@workflow.defn(name="HelloWorldWorkflow")
class HelloWorldWorkflow:
@workflow.run
async def run(self, inp: HelloWorldInput) -> str:
prompt_path = f"{inp.run_dir}/prompt.txt"
await workflow.execute_activity(
"write_artifact",
WriteArtifactInput(path=prompt_path, content=inp.greeter_user_prompt),
start_to_close_timeout=timedelta(seconds=10),
retry_policy=HAIKU_POLICY,
)
parsed = await workflow.execute_activity(
"spawn_subagent",
SpawnSubagentInput(
role="greeter", tier_name="HAIKU",
system_prompt=inp.greeter_system_prompt,
user_prompt_path=prompt_path,
max_tokens=64, tools_needed=False,
),
start_to_close_timeout=timedelta(seconds=600),
heartbeat_timeout=timedelta(seconds=120),
retry_policy=HAIKU_POLICY,
)
greeting = parsed.get("GREETING", "hello")
await workflow.execute_activity(
"emit_finding",
EmitFindingInput(
inbox_path=inp.inbox_path, run_id=inp.run_id,
skill="hello-world", status="DONE", summary=greeting,
timestamp_iso=workflow.now().isoformat(timespec="seconds"),
),
start_to_close_timeout=timedelta(seconds=10),
retry_policy=HAIKU_POLICY,
)
return greeting
Each execute_activity call is a checkpoint. If the worker dies between them, replay resumes from the last completed one.
~/.claude/skills/hello-world/__init__.py — registration:
from typing import Any
from sagaflow.durable.activities import emit_finding, spawn_subagent, write_artifact
from sagaflow.prompts import load_prompt
from sagaflow.registry import SkillRegistry, SkillSpec
from skills.hello_world.workflow import HelloWorldInput, HelloWorldWorkflow
def _build_input(*, run_id, run_dir, inbox_path, cli_args: dict[str, Any]) -> HelloWorldInput:
name = str(cli_args.get("name", "world"))
return HelloWorldInput(
run_id=run_id, name=name,
inbox_path=inbox_path, run_dir=run_dir,
greeter_system_prompt=load_prompt(__file__, "greeter.system"),
greeter_user_prompt=load_prompt(__file__, "greeter.user", substitutions={"name": name}),
)
def register(registry: SkillRegistry) -> None:
registry.register(SkillSpec(
name="hello-world",
workflow_cls=HelloWorldWorkflow,
activities=[write_artifact, emit_finding, spawn_subagent],
build_input=_build_input,
))
register() is what the worker calls at startup to discover the skill. _build_input translates CLI args (--name alice) into the workflow's input dataclass and loads prompts from disk.
~/.claude/skills/hello-world/prompts/greeter.system.md:
You are a greeter. Output a greeting using the format
STRUCTURED_OUTPUT_START / GREETING|<text> / STRUCTURED_OUTPUT_END.
Do not include any other text.
~/.claude/skills/hello-world/prompts/greeter.user.md:
Greet $name
That's the whole skill. sagaflow launch hello-world --name alice finds the registration, builds the input, hands the workflow to Temporal, and the worker runs it durably.
CLI
sagaflow launch <name> --arg key=value [--await] # submit a workflow
sagaflow inbox # list unread results
sagaflow dismiss <run-id> # mark as read
sagaflow doctor # diagnose temporal/worker/hook
How it works
sagaflow launch
│
▼
preflight (install hook, spawn worker if missing)
│
▼
Temporal (localhost:7233) ── workflow ID ── worker daemon
│
▼
activities:
• model calls
• file I/O
• inbox emit
│
▼
~/.sagaflow/INBOX.md + desktop notify
│
▼
next session: SessionStart
hook surfaces unread runs
If the worker crashes mid-run, the next sagaflow launch (or the next worker poll) resumes from the last completed activity. Activities that already succeeded don't re-execute.
Development
git clone https://github.com/npow/sagaflow
cd sagaflow
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
ruff check sagaflow tests
mypy sagaflow
pytest
# Opt-in end-to-end tests (require live Temporal + real Anthropic access)
SAGAFLOW_E2E=1 pytest
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sagaflow-0.10.26.tar.gz.
File metadata
- Download URL: sagaflow-0.10.26.tar.gz
- Upload date:
- Size: 541.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
afadeec1209049e0731f05b14bdec25ab497bff74d85a1dfe67081d9c5c0bb59
|
|
| MD5 |
f14676a99c79626ba797f639704584d1
|
|
| BLAKE2b-256 |
c388bbf8492a4b9a52667879b486bbac4c4860a6ccf427c7f239242248a79415
|
Provenance
The following attestation bundles were made for sagaflow-0.10.26.tar.gz:
Publisher:
publish.yml on npow/sagaflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sagaflow-0.10.26.tar.gz -
Subject digest:
afadeec1209049e0731f05b14bdec25ab497bff74d85a1dfe67081d9c5c0bb59 - Sigstore transparency entry: 1701923644
- Sigstore integration time:
-
Permalink:
npow/sagaflow@1989896eebdba61acdf356a9bf727f22ef29c6af -
Branch / Tag:
refs/tags/v0.10.26 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@1989896eebdba61acdf356a9bf727f22ef29c6af -
Trigger Event:
release
-
Statement type:
File details
Details for the file sagaflow-0.10.26-py3-none-any.whl.
File metadata
- Download URL: sagaflow-0.10.26-py3-none-any.whl
- Upload date:
- Size: 285.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bf61cd4a44fe16bc8f0f519c2b39bb9c9e40b1b5eadbaa7504f26312e1989aff
|
|
| MD5 |
7e2b5897b8e1a170a167948097b3c4c6
|
|
| BLAKE2b-256 |
9d188eb8a204d6a546bc466f3a3c5755d41547b59af492e5907f18680d21ded9
|
Provenance
The following attestation bundles were made for sagaflow-0.10.26-py3-none-any.whl:
Publisher:
publish.yml on npow/sagaflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sagaflow-0.10.26-py3-none-any.whl -
Subject digest:
bf61cd4a44fe16bc8f0f519c2b39bb9c9e40b1b5eadbaa7504f26312e1989aff - Sigstore transparency entry: 1701923659
- Sigstore integration time:
-
Permalink:
npow/sagaflow@1989896eebdba61acdf356a9bf727f22ef29c6af -
Branch / Tag:
refs/tags/v0.10.26 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@1989896eebdba61acdf356a9bf727f22ef29c6af -
Trigger Event:
release
-
Statement type: