Saga pattern implementation for Restate durable workflows with automatic compensation
Project description
restate-saga
Saga pattern implementation for Restate durable workflows in Python with automatic compensation.
Features
- Automatic compensation - When a step fails, all previous steps are rolled back in reverse order
- Flexible step types - Hybrid (
create_saga_step) or strict (create_saga_step_strict) compensation modes - Decorator API -
@saga_step,@saga_step_strict,@saga_workflowdecorators for concise definitions - Global error registry - Register error classes that should always trigger compensation
- Composable workflows - Embed workflows within workflows using
run_as_step - Virtual Object support - Saga pattern for stateful keyed entities
- Restate Workflows - Long-running workflows with signals, queries, and saga support
- Pydantic validation - Automatic input validation when type hints use Pydantic models
Installation
pip install restate-saga
Requires: Python 3.11+, restate_sdk[serde]
Quick Start
import restate
from restate_saga import (
SagaContext,
StepResponse,
saga_step_strict,
saga_workflow,
)
# Define steps with compensation
async def cancel_order(data):
await order_service.cancel(data["order_id"])
@saga_step_strict("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
order = await order_service.create(input_val["customer_id"])
return StepResponse(output={"order_id": order.id})
async def refund_payment(data):
await payment_service.refund(data["payment_id"])
@saga_step_strict("ProcessPayment", compensate=refund_payment)
async def process_payment(ctx: restate.Context, input_val):
payment = await payment_service.charge(input_val["order_id"])
return StepResponse(output={"payment_id": payment.id})
# Define the workflow
@saga_workflow("CheckoutWorkflow")
async def checkout_workflow(saga: SagaContext, input_val):
# If process_payment fails, create_order.compensate() runs automatically
order = await create_order(saga, input_val)
payment = await process_payment(saga, {"order_id": order["order_id"]})
return {"order_id": order["order_id"], "payment_id": payment["payment_id"]}
Core Concepts
Saga Pattern
Each step has a corresponding compensation (undo) action. If a later step fails, all earlier compensations run in reverse order.
Step 1 → Step 2 → Step 3 (fails!)
↓
Compensate 2 ← Compensate 1
Step Types
Hybrid (create_saga_step / @saga_step)
Registers compensation before execution. Compensation runs even if the step fails partway through. The failed keyword argument indicates whether the forward action completed.
async def cancel_order(data, failed=False):
# `failed` tells you if the step threw an error
await order_service.cancel(data.get("order_id"))
@saga_step("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
order_id = await order_service.create(input_val)
return StepResponse(
output={"order_id": order_id},
compensation_data={"order_id": order_id},
)
Strict (create_saga_step_strict / @saga_step_strict)
Registers compensation after success. Use when compensation requires data that only exists after completion.
async def cancel_order(data):
await order_service.cancel(data["order_id"])
@saga_step_strict("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
order = await order_service.create(input_val)
return StepResponse(
output={"order_id": order.id},
compensation_data={"order_id": order.id},
)
StepResponse
# compensation_data defaults to output when omitted
StepResponse(output={"order_id": "123"})
# explicit compensation_data
StepResponse(output={"order_id": "123"}, compensation_data={"order_id": "123", "extra": True})
# no compensation data (e.g. validation step)
StepResponse(output={"valid": True}, compensation_data=None)
# permanent failure — triggers compensation with provided data
StepResponse.permanent_failure("Payment declined", {"auth_id": "abc"})
Steps Without Compensation
For validation, read-only operations, or idempotent actions, omit the compensate function:
@saga_step("ValidateInput")
async def validate_input(ctx: restate.Context, input_val):
if not input_val.get("email"):
return StepResponse.permanent_failure("Email required", None)
return StepResponse(output={"valid": True}, compensation_data=None)
Function API
Steps and workflows can also be created without decorators:
from restate_saga import create_saga_step, create_saga_step_strict, create_saga_workflow
create_order = create_saga_step(
name="CreateOrder",
run=create_order_handler,
compensate=cancel_order,
)
checkout = create_saga_workflow(
name="CheckoutWorkflow",
handler=checkout_handler,
)
Global Error Registry
Register error classes that should always trigger compensation without retrying:
from restate_saga import register_terminal_errors
class ValidationError(Exception): ...
class NotFoundError(Exception): ...
register_terminal_errors([ValidationError, NotFoundError])
# Now any step that raises these will trigger compensation
@saga_step("MyStep")
async def my_step(ctx, input_val):
raise ValidationError("Invalid input") # → triggers compensation
Custom error mapping:
from restate_saga import set_global_error_mapper
import restate
def my_mapper(err: Exception) -> restate.TerminalError | None:
if isinstance(err, BusinessError):
return restate.TerminalError(str(err))
return None
set_global_error_mapper(my_mapper)
Composing Workflows
Use run_as_step to embed a workflow within another, sharing the compensation context:
@saga_workflow("PaymentWorkflow")
async def payment_workflow(saga, input_val):
auth = await authorize_payment(saga, input_val)
capture = await capture_payment(saga, {"auth_id": auth["id"]})
return {"payment_id": capture["id"]}
@saga_workflow("OrderWorkflow")
async def order_workflow(saga, input_val):
order = await create_order(saga, input_val)
# Payment workflow's compensations join this saga
payment = await payment_workflow.run_as_step(saga, {"amount": order["total"]})
# If shipping fails, both order AND payment are compensated
shipment = await create_shipment(saga, {"order_id": order["id"]})
return {"order_id": order["id"], "payment_id": payment["payment_id"]}
Nested Sagas
For inline nested logic without a full workflow:
from restate_saga import run_nested_saga, create_saga_module
# Inline nested saga
async def handle_payment(saga):
auth = await authorize(saga, {"amount": 100})
capture = await capture(saga, {"auth_id": auth["id"]})
return capture
result = await run_nested_saga(saga, handle_payment)
# Reusable saga module (not a Restate service)
payment_module = create_saga_module(payment_handler)
result = await payment_module(saga, input_val)
Virtual Objects
Stateful entities with saga support:
from restate_saga import create_saga_virtual_object, SagaContext
cart = create_saga_virtual_object("ShoppingCart")
@cart.handler()
async def checkout(saga: SagaContext, ctx: restate.ObjectContext, input_val):
payment = await charge_payment(saga, {"amount": input_val["total"]})
ctx.clear("items")
return {"order_id": payment["order_id"]}
@cart.handler(kind="shared")
async def get_items(ctx: restate.ObjectSharedContext):
return await ctx.get("items") or []
Restate Workflows (Long-Running)
For workflows with signals and queries:
from restate_saga import create_saga_restate_workflow, SagaContext
wf = create_saga_restate_workflow("ApprovalWorkflow")
@wf.main()
async def run(saga: SagaContext, ctx: restate.WorkflowContext, input_val):
order = await create_order(saga, input_val)
# Wait for approval signal (durable promise)
approved = await ctx.promise("approval").value()
if not approved:
raise restate.TerminalError("Order rejected")
shipment = await create_shipment(saga, {"order_id": order["order_id"]})
return {"order_id": order["order_id"]}
@wf.handler()
async def approve(ctx: restate.WorkflowSharedContext, input_val):
await ctx.promise("approval").resolve(input_val["approved"])
Pydantic Validation
When step or workflow handlers use Pydantic model type hints, inputs are automatically validated:
from pydantic import BaseModel
class CheckoutInput(BaseModel):
customer_id: str
@saga_workflow("CheckoutWorkflow")
async def checkout(saga: SagaContext, input_val: CheckoutInput):
# input_val is validated and converted to CheckoutInput
order = await create_order(saga, input_val)
return {"order_id": order["order_id"]}
Step & Workflow Options
Configure retry policies and service-level options:
from datetime import timedelta
from restate_saga import SagaStepOptions, StepRetryPolicy, SagaWorkflowOptions, WorkflowRetryPolicy
# Step-level retry
step = create_saga_step(
name="ChargePayment",
run=charge_handler,
compensate=refund_handler,
options=SagaStepOptions(
retry=StepRetryPolicy(
max_retry_attempts=3,
initial_retry_interval=timedelta(seconds=1),
retry_interval_factor=2.0,
max_retry_interval=timedelta(seconds=30),
),
compensation_retry=StepRetryPolicy(max_retry_attempts=5),
),
)
# Workflow-level options
workflow = create_saga_workflow(
name="OrderWorkflow",
handler=order_handler,
options=SagaWorkflowOptions(
retry_policy=WorkflowRetryPolicy(max_attempts=3),
idempotency_retention=timedelta(days=1),
ingress_private=True,
),
)
Project Structure
restate_saga/
├── __init__.py # Public API exports
├── steps.py # create_saga_step, create_saga_step_strict, decorators
├── workflows.py # create_saga_workflow, SagaWorkflowService
├── restate_workflows.py # create_saga_restate_workflow (long-running)
├── virtual_objects.py # create_saga_virtual_object
├── step_response.py # StepResponse class
├── error_registry.py # Terminal error registration
├── nested.py # run_nested_saga, create_saga_module
├── types.py # SagaContext, options, policies
└── _validation.py # Pydantic input validation
app/
├── main.py # FastAPI app with Restate mounted
└── services/ # Example service handlers
tests/ # Test suite
Running the Example App
Prerequisites
-
Install Restate Server:
# macOS brew install restatedev/tap/restate-server # Or Docker docker run -d --name restate -p 8080:8080 -p 9070:9070 docker.io/restatedev/restate:latest
-
Start Restate Server:
restate-server
Start the Service
# Install dependencies
pip install -e ".[dev]"
# Run the server
hypercorn app.main:api --bind 0.0.0.0:9080
Register with Restate
restate deployments register http://localhost:9080/restate/v1
Invoke a Workflow
curl -X POST http://localhost:8080/CheckoutWorkflow/run \
-H "Content-Type: application/json" \
-d '{"customer_id": "cust_123"}'
API Reference
Steps
create_saga_step(name, run, compensate?, options?)- Hybrid compensation (registered before execution)create_saga_step_strict(name, run, compensate?, options?)- Strict compensation (registered after success)@saga_step(name, compensate?, options?)- Decorator for hybrid steps@saga_step_strict(name, compensate?, options?)- Decorator for strict stepsStepResponse(output, compensation_data?)- Step result with optional compensation dataStepResponse.permanent_failure(message, compensation_data)- Signal permanent failure
Workflows
create_saga_workflow(name, handler, options?)- Create a saga workflow service@saga_workflow(name, options?)- Decorator for workflowsSagaWorkflowService.run_as_step(parent_saga, input)- Embed in parent workflow
Restate Workflows
create_saga_restate_workflow(name, options?)- Create a long-running workflowSagaRestateWorkflow.main()- Register main handler with saga supportSagaRestateWorkflow.handler()- Register shared signal/query handlersSagaRestateWorkflow.run_as_step(parent_saga, input)- Embed in parent workflow
Virtual Objects
create_saga_virtual_object(name, options?)- Create a Virtual ObjectSagaVirtualObject.handler(kind="exclusive")- Exclusive handler with sagaSagaVirtualObject.handler(kind="shared")- Shared handler without saga
Error Registry
register_terminal_errors(error_classes)- Register error classes as terminalunregister_terminal_errors(error_classes)- Unregister error classesclear_terminal_errors()- Clear all registered errorsset_global_error_mapper(mapper)- Set a custom error mapperresolve_terminal_error(err, step_mapper?)- Resolve error to terminal
Nested Sagas
run_nested_saga(saga, handler)- Run inline saga with shared compensationcreate_saga_module(handler)- Create a reusable saga module
Types
SagaContext- Context with RestatectxandcompensationsstackStepRetryPolicy- Retry config for step-level operationsSagaStepOptions- Step options (retry, compensation_retry, as_terminal_error)WorkflowRetryPolicy- Retry config for service/workflow levelSagaWorkflowOptions- Workflow options (retry, retention, timeouts)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file restate_saga-0.1.0.tar.gz.
File metadata
- Download URL: restate_saga-0.1.0.tar.gz
- Upload date:
- Size: 19.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7476b9c1ffe90d23ce23a4422713651ba3b8c2d260f4ce40114d3a0a15548d06
|
|
| MD5 |
74f6885fee0bd3b4dbf47e359cd70c67
|
|
| BLAKE2b-256 |
8ea80e16681c90a75d2068166082f7ff5e6c808e15f0bb0ee1652d2fa4700de7
|
File details
Details for the file restate_saga-0.1.0-py3-none-any.whl.
File metadata
- Download URL: restate_saga-0.1.0-py3-none-any.whl
- Upload date:
- Size: 18.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1a01842b62e0af8dbcb825824a2b5fbb386ece767fb4bf9c1c14169772e74573
|
|
| MD5 |
f1117b4f23236f5dd9d55cd0a25db0e9
|
|
| BLAKE2b-256 |
0abcbfd361ef6cde09d7267ef50348e47b327de78a9fbe1acd31c491da9d83f0
|