Skip to main content

Saga pattern implementation for Restate durable workflows with automatic compensation

Project description

restate-saga

Saga pattern implementation for Restate durable workflows in Python with automatic compensation.

Features

  • Automatic compensation - When a step fails, all previous steps are rolled back in reverse order
  • Flexible step types - Hybrid (create_saga_step) or strict (create_saga_step_strict) compensation modes
  • Decorator API - @saga_step, @saga_step_strict, @saga_workflow decorators for concise definitions
  • Global error registry - Register error classes that should always trigger compensation
  • Composable workflows - Embed workflows within workflows using run_as_step
  • Virtual Object support - Saga pattern for stateful keyed entities
  • Restate Workflows - Long-running workflows with signals, queries, and saga support
  • Pydantic validation - Automatic input validation when type hints use Pydantic models

Installation

pip install restate-saga

Requires: Python 3.11+, restate_sdk[serde]

Quick Start

import restate
from restate_saga import (
    SagaContext,
    StepResponse,
    saga_step_strict,
    saga_workflow,
)

# Define steps with compensation

async def cancel_order(data):
    await order_service.cancel(data["order_id"])

@saga_step_strict("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order = await order_service.create(input_val["customer_id"])
    return StepResponse(output={"order_id": order.id})

async def refund_payment(data):
    await payment_service.refund(data["payment_id"])

@saga_step_strict("ProcessPayment", compensate=refund_payment)
async def process_payment(ctx: restate.Context, input_val):
    payment = await payment_service.charge(input_val["order_id"])
    return StepResponse(output={"payment_id": payment.id})

# Define the workflow

@saga_workflow("CheckoutWorkflow")
async def checkout_workflow(saga: SagaContext, input_val):
    # If process_payment fails, create_order.compensate() runs automatically
    order = await create_order(saga, input_val)
    payment = await process_payment(saga, {"order_id": order["order_id"]})
    return {"order_id": order["order_id"], "payment_id": payment["payment_id"]}

Core Concepts

Saga Pattern

Each step has a corresponding compensation (undo) action. If a later step fails, all earlier compensations run in reverse order.

Step 1 → Step 2 → Step 3 (fails!)
                    ↓
         Compensate 2 ← Compensate 1

Step Types

Hybrid (create_saga_step / @saga_step)

Registers compensation before execution. Compensation runs even if the step fails partway through. The failed keyword argument indicates whether the forward action completed.

async def cancel_order(data, failed=False):
    # `failed` tells you if the step threw an error
    await order_service.cancel(data.get("order_id"))

@saga_step("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order_id = await order_service.create(input_val)
    return StepResponse(
        output={"order_id": order_id},
        compensation_data={"order_id": order_id},
    )

Strict (create_saga_step_strict / @saga_step_strict)

Registers compensation after success. Use when compensation requires data that only exists after completion.

async def cancel_order(data):
    await order_service.cancel(data["order_id"])

@saga_step_strict("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order = await order_service.create(input_val)
    return StepResponse(
        output={"order_id": order.id},
        compensation_data={"order_id": order.id},
    )

StepResponse

# compensation_data defaults to output when omitted
StepResponse(output={"order_id": "123"})

# explicit compensation_data
StepResponse(output={"order_id": "123"}, compensation_data={"order_id": "123", "extra": True})

# no compensation data (e.g. validation step)
StepResponse(output={"valid": True}, compensation_data=None)

# permanent failure — triggers compensation with provided data
StepResponse.permanent_failure("Payment declined", {"auth_id": "abc"})

Steps Without Compensation

For validation, read-only operations, or idempotent actions, omit the compensate function:

@saga_step("ValidateInput")
async def validate_input(ctx: restate.Context, input_val):
    if not input_val.get("email"):
        return StepResponse.permanent_failure("Email required", None)
    return StepResponse(output={"valid": True}, compensation_data=None)

Function API

Steps and workflows can also be created without decorators:

from restate_saga import create_saga_step, create_saga_step_strict, create_saga_workflow

create_order = create_saga_step(
    name="CreateOrder",
    run=create_order_handler,
    compensate=cancel_order,
)

checkout = create_saga_workflow(
    name="CheckoutWorkflow",
    handler=checkout_handler,
)

Global Error Registry

Register error classes that should always trigger compensation without retrying:

from restate_saga import register_terminal_errors

class ValidationError(Exception): ...
class NotFoundError(Exception): ...

register_terminal_errors([ValidationError, NotFoundError])

# Now any step that raises these will trigger compensation
@saga_step("MyStep")
async def my_step(ctx, input_val):
    raise ValidationError("Invalid input")  # → triggers compensation

Custom error mapping:

from restate_saga import set_global_error_mapper
import restate

def my_mapper(err: Exception) -> restate.TerminalError | None:
    if isinstance(err, BusinessError):
        return restate.TerminalError(str(err))
    return None

set_global_error_mapper(my_mapper)

Composing Workflows

Use run_as_step to embed a workflow within another, sharing the compensation context:

@saga_workflow("PaymentWorkflow")
async def payment_workflow(saga, input_val):
    auth = await authorize_payment(saga, input_val)
    capture = await capture_payment(saga, {"auth_id": auth["id"]})
    return {"payment_id": capture["id"]}

@saga_workflow("OrderWorkflow")
async def order_workflow(saga, input_val):
    order = await create_order(saga, input_val)

    # Payment workflow's compensations join this saga
    payment = await payment_workflow.run_as_step(saga, {"amount": order["total"]})

    # If shipping fails, both order AND payment are compensated
    shipment = await create_shipment(saga, {"order_id": order["id"]})

    return {"order_id": order["id"], "payment_id": payment["payment_id"]}

Nested Sagas

For inline nested logic without a full workflow:

from restate_saga import run_nested_saga, create_saga_module

# Inline nested saga
async def handle_payment(saga):
    auth = await authorize(saga, {"amount": 100})
    capture = await capture(saga, {"auth_id": auth["id"]})
    return capture

result = await run_nested_saga(saga, handle_payment)

# Reusable saga module (not a Restate service)
payment_module = create_saga_module(payment_handler)
result = await payment_module(saga, input_val)

Virtual Objects

Stateful entities with saga support:

from restate_saga import create_saga_virtual_object, SagaContext

cart = create_saga_virtual_object("ShoppingCart")

@cart.handler()
async def checkout(saga: SagaContext, ctx: restate.ObjectContext, input_val):
    payment = await charge_payment(saga, {"amount": input_val["total"]})
    ctx.clear("items")
    return {"order_id": payment["order_id"]}

@cart.handler(kind="shared")
async def get_items(ctx: restate.ObjectSharedContext):
    return await ctx.get("items") or []

Restate Workflows (Long-Running)

For workflows with signals and queries:

from restate_saga import create_saga_restate_workflow, SagaContext

wf = create_saga_restate_workflow("ApprovalWorkflow")

@wf.main()
async def run(saga: SagaContext, ctx: restate.WorkflowContext, input_val):
    order = await create_order(saga, input_val)

    # Wait for approval signal (durable promise)
    approved = await ctx.promise("approval").value()

    if not approved:
        raise restate.TerminalError("Order rejected")

    shipment = await create_shipment(saga, {"order_id": order["order_id"]})
    return {"order_id": order["order_id"]}

@wf.handler()
async def approve(ctx: restate.WorkflowSharedContext, input_val):
    await ctx.promise("approval").resolve(input_val["approved"])

Pydantic Validation

When step or workflow handlers use Pydantic model type hints, inputs are automatically validated:

from pydantic import BaseModel

class CheckoutInput(BaseModel):
    customer_id: str

@saga_workflow("CheckoutWorkflow")
async def checkout(saga: SagaContext, input_val: CheckoutInput):
    # input_val is validated and converted to CheckoutInput
    order = await create_order(saga, input_val)
    return {"order_id": order["order_id"]}

Step & Workflow Options

Configure retry policies and service-level options:

from datetime import timedelta
from restate_saga import SagaStepOptions, StepRetryPolicy, SagaWorkflowOptions, WorkflowRetryPolicy

# Step-level retry
step = create_saga_step(
    name="ChargePayment",
    run=charge_handler,
    compensate=refund_handler,
    options=SagaStepOptions(
        retry=StepRetryPolicy(
            max_retry_attempts=3,
            initial_retry_interval=timedelta(seconds=1),
            retry_interval_factor=2.0,
            max_retry_interval=timedelta(seconds=30),
        ),
        compensation_retry=StepRetryPolicy(max_retry_attempts=5),
    ),
)

# Workflow-level options
workflow = create_saga_workflow(
    name="OrderWorkflow",
    handler=order_handler,
    options=SagaWorkflowOptions(
        retry_policy=WorkflowRetryPolicy(max_attempts=3),
        idempotency_retention=timedelta(days=1),
        ingress_private=True,
    ),
)

Project Structure

restate_saga/
├── __init__.py            # Public API exports
├── steps.py               # create_saga_step, create_saga_step_strict, decorators
├── workflows.py           # create_saga_workflow, SagaWorkflowService
├── restate_workflows.py   # create_saga_restate_workflow (long-running)
├── virtual_objects.py     # create_saga_virtual_object
├── step_response.py       # StepResponse class
├── error_registry.py      # Terminal error registration
├── nested.py              # run_nested_saga, create_saga_module
├── types.py               # SagaContext, options, policies
└── _validation.py         # Pydantic input validation
app/
├── main.py                # FastAPI app with Restate mounted
└── services/              # Example service handlers
tests/                     # Test suite

Running the Example App

Prerequisites

  1. Install Restate Server:

    # macOS
    brew install restatedev/tap/restate-server
    
    # Or Docker
    docker run -d --name restate -p 8080:8080 -p 9070:9070 docker.io/restatedev/restate:latest
    
  2. Start Restate Server:

    restate-server
    

Start the Service

# Install dependencies
pip install -e ".[dev]"

# Run the server
hypercorn app.main:api --bind 0.0.0.0:9080

Register with Restate

restate deployments register http://localhost:9080/restate/v1

Invoke a Workflow

curl -X POST http://localhost:8080/CheckoutWorkflow/run \
  -H "Content-Type: application/json" \
  -d '{"customer_id": "cust_123"}'

API Reference

Steps

  • create_saga_step(name, run, compensate?, options?) - Hybrid compensation (registered before execution)
  • create_saga_step_strict(name, run, compensate?, options?) - Strict compensation (registered after success)
  • @saga_step(name, compensate?, options?) - Decorator for hybrid steps
  • @saga_step_strict(name, compensate?, options?) - Decorator for strict steps
  • StepResponse(output, compensation_data?) - Step result with optional compensation data
  • StepResponse.permanent_failure(message, compensation_data) - Signal permanent failure

Workflows

  • create_saga_workflow(name, handler, options?) - Create a saga workflow service
  • @saga_workflow(name, options?) - Decorator for workflows
  • SagaWorkflowService.run_as_step(parent_saga, input) - Embed in parent workflow

Restate Workflows

  • create_saga_restate_workflow(name, options?) - Create a long-running workflow
  • SagaRestateWorkflow.main() - Register main handler with saga support
  • SagaRestateWorkflow.handler() - Register shared signal/query handlers
  • SagaRestateWorkflow.run_as_step(parent_saga, input) - Embed in parent workflow

Virtual Objects

  • create_saga_virtual_object(name, options?) - Create a Virtual Object
  • SagaVirtualObject.handler(kind="exclusive") - Exclusive handler with saga
  • SagaVirtualObject.handler(kind="shared") - Shared handler without saga

Error Registry

  • register_terminal_errors(error_classes) - Register error classes as terminal
  • unregister_terminal_errors(error_classes) - Unregister error classes
  • clear_terminal_errors() - Clear all registered errors
  • set_global_error_mapper(mapper) - Set a custom error mapper
  • resolve_terminal_error(err, step_mapper?) - Resolve error to terminal

Nested Sagas

  • run_nested_saga(saga, handler) - Run inline saga with shared compensation
  • create_saga_module(handler) - Create a reusable saga module

Types

  • SagaContext - Context with Restate ctx and compensations stack
  • StepRetryPolicy - Retry config for step-level operations
  • SagaStepOptions - Step options (retry, compensation_retry, as_terminal_error)
  • WorkflowRetryPolicy - Retry config for service/workflow level
  • SagaWorkflowOptions - Workflow options (retry, retention, timeouts)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

restate_saga-0.1.0.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

restate_saga-0.1.0-py3-none-any.whl (18.4 kB view details)

Uploaded Python 3

File details

Details for the file restate_saga-0.1.0.tar.gz.

File metadata

  • Download URL: restate_saga-0.1.0.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.9

File hashes

Hashes for restate_saga-0.1.0.tar.gz
Algorithm Hash digest
SHA256 7476b9c1ffe90d23ce23a4422713651ba3b8c2d260f4ce40114d3a0a15548d06
MD5 74f6885fee0bd3b4dbf47e359cd70c67
BLAKE2b-256 8ea80e16681c90a75d2068166082f7ff5e6c808e15f0bb0ee1652d2fa4700de7

See more details on using hashes here.

File details

Details for the file restate_saga-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: restate_saga-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 18.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.9

File hashes

Hashes for restate_saga-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1a01842b62e0af8dbcb825824a2b5fbb386ece767fb4bf9c1c14169772e74573
MD5 f1117b4f23236f5dd9d55cd0a25db0e9
BLAKE2b-256 0abcbfd361ef6cde09d7267ef50348e47b327de78a9fbe1acd31c491da9d83f0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page