Skip to main content

A lightweight implementation of the Saga pattern for managing distributed transactions in Python

Project description

Simple Saga

CI codecov PyPI Version License: MIT

A lightweight implementation of the Saga pattern for managing distributed transactions in Python, inspired by Arrow-kt's functional approach.

Overview

The Saga pattern breaks down distributed transactions into a series of local transactions, each with a compensating transaction that can undo the changes if a later step fails. This library provides a simple, type-safe implementation with Arrow-kt style DSL.

Features

  • โœ… Arrow-kt Style DSL - Intuitive context manager API
  • ๐Ÿ”„ Automatic Compensation - Failed transactions are automatically rolled back
  • ๐Ÿ”— Result Chaining - Use results from previous steps in subsequent steps
  • โšก Sync & Async Support - Separate Saga (async) and SyncSaga (sync) implementations
  • ๐Ÿ”’ Type Safe - Full type hints with mypy support
  • ๐Ÿชถ Lightweight - Zero dependencies (uses only Python standard library)
  • ๐Ÿ“š Well Documented - Comprehensive docstrings and examples

Installation

pip install simple-saga

Or with Poetry:

poetry add simple-saga

Quick Start (Async)

import asyncio
from simple_saga import Saga

# Define your async business logic
async def create_order(order_id: str) -> dict:
    print(f"Creating order: {order_id}")
    return {"order_id": order_id, "status": "created"}

async def cancel_order(order: dict) -> None:
    print(f"Cancelling order: {order['order_id']}")

async def reserve_inventory(product_id: str) -> dict:
    print(f"Reserving inventory for: {product_id}")
    return {"product_id": product_id, "reserved": True}

async def release_inventory(inventory: dict) -> None:
    print(f"Releasing inventory for: {inventory['product_id']}")

async def charge_payment(amount: float) -> dict:
    print(f"Charging payment: ${amount}")
    # Simulating a payment failure
    raise Exception("Payment failed")

async def refund_payment(payment: dict) -> None:
    print("Refunding payment")

# Execute the saga
async def main():
    try:
        async with Saga() as saga:
            # Step 1: Create order
            order = await saga.step(
                action=lambda: create_order("ORDER-123"),
                compensation=lambda order: cancel_order(order)
            )

            # Step 2: Reserve inventory (uses order from step 1)
            inventory = await saga.step(
                action=lambda: reserve_inventory("PRODUCT-456"),
                compensation=lambda inv: release_inventory(inv)
            )

            # Step 3: Charge payment (this will fail)
            payment = await saga.step(
                action=lambda: charge_payment(99.99),
                compensation=lambda pay: refund_payment(pay)
            )

            print("โœ… All steps completed successfully!")
    except Exception as e:
        print(f"โŒ Saga failed: {e}")
        print("โœ… All completed steps have been compensated automatically")

if __name__ == "__main__":
    asyncio.run(main())

Quick Start (Sync)

For synchronous operations, use SyncSaga:

from simple_saga import SyncSaga

# Define your synchronous business logic
def create_order(order_id: str) -> dict:
    print(f"Creating order: {order_id}")
    return {"order_id": order_id, "status": "created"}

def cancel_order(order: dict) -> None:
    print(f"Cancelling order: {order['order_id']}")

def reserve_inventory(product_id: str) -> dict:
    print(f"Reserving inventory for: {product_id}")
    return {"product_id": product_id, "reserved": True}

def release_inventory(inventory: dict) -> None:
    print(f"Releasing inventory for: {inventory['product_id']}")

def charge_payment(amount: float) -> dict:
    print(f"Charging payment: ${amount}")
    # Simulating a payment failure
    raise Exception("Payment failed")

def refund_payment(payment: dict) -> None:
    print("Refunding payment")

# Execute the saga
def main():
    try:
        with SyncSaga() as saga:
            # Step 1: Create order
            order = saga.step(
                action=lambda: create_order("ORDER-123"),
                compensation=lambda order: cancel_order(order)
            )

            # Step 2: Reserve inventory (uses order from step 1)
            inventory = saga.step(
                action=lambda: reserve_inventory("PRODUCT-456"),
                compensation=lambda inv: release_inventory(inv)
            )

            # Step 3: Charge payment (this will fail)
            payment = saga.step(
                action=lambda: charge_payment(99.99),
                compensation=lambda pay: refund_payment(pay)
            )

            print("โœ… All steps completed successfully!")
    except Exception as e:
        print(f"โŒ Saga failed: {e}")
        print("โœ… All completed steps have been compensated automatically")

if __name__ == "__main__":
    main()

Output

Creating order: ORDER-123
โœ“ Step 1 completed: <lambda>
Reserving inventory for: PRODUCT-456
โœ“ Step 2 completed: <lambda>
Charging payment: $99.99
โœ— Error at step 3: Payment failed
๐Ÿ”„ Starting compensation...
Releasing inventory for: PRODUCT-456
โœ“ Compensated step 2: <lambda>
Cancelling order: ORDER-123
โœ“ Compensated step 1: <lambda>
โŒ Saga failed: Payment failed
โœ… All completed steps have been compensated automatically

Key Features

1. Result Chaining Between Steps

The most powerful feature is the ability to use results from previous steps:

async with Saga() as saga:
    # Step 1: Create order
    order = await saga.step(
        action=lambda: create_order("ORDER-123"),
        compensation=lambda order: cancel_order(order)
    )

    # Step 2: Use order data from step 1
    inventory = await saga.step(
        action=lambda: reserve_inventory(order["order_id"]),  # Uses order
        compensation=lambda inv: release_inventory(inv)
    )

    # Step 3: Use both order and inventory
    shipment = await saga.step(
        action=lambda: create_shipment(order, inventory),  # Uses both
        compensation=lambda ship: cancel_shipment(ship)
    )

2. Automatic Compensation

Compensations receive the action result automatically:

async with Saga() as saga:
    result = await saga.step(
        action=lambda: {"id": 123, "status": "created"},
        compensation=lambda result: delete_resource(result["id"])  # Gets action result
    )

3. Passing Additional Arguments to Compensation

You can pass previous step results to compensations:

async with Saga() as saga:
    order = await saga.step(
        action=lambda: create_order("ORDER-123"),
        compensation=lambda order: cancel_order(order)
    )

    inventory = await saga.step(
        action=lambda: reserve_inventory(order["order_id"]),
        compensation=lambda inv, order_ref: release_inventory(inv, order_ref),
        compensation_args=(order,)  # Pass order to compensation
    )

The compensation receives:

  1. First argument: The action's result (inv)
  2. Following arguments: Values from compensation_args (order_ref)
  3. Keyword arguments: Values from compensation_kwargs

4. Choosing Between Saga and SyncSaga

Use Saga (async) when:

  • Working with async I/O operations (database, network, file I/O)
  • Building async web applications (FastAPI, aiohttp)
  • Need to handle multiple concurrent operations
  • Using asyncio ecosystem

Use SyncSaga (sync) when:

  • Working with synchronous libraries
  • Building traditional web applications (Flask, Django)
  • Simpler code without async/await complexity
  • Performance is not I/O bound

5. Logging Control

The library uses Python's standard logging module:

import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# Or disable saga logs
logging.getLogger("simple_saga").setLevel(logging.WARNING)

API Reference

Saga (Async)

Asynchronous implementation for async/await operations.

async step(action, compensation, *, action_args=(), action_kwargs=None, compensation_args=(), compensation_kwargs=None)

Execute a single asynchronous step in the saga. Must be called within an async with Saga() context manager.

Parameters:

  • action: Async function to execute
  • compensation: Async function to compensate if this or later steps fail
  • action_args: Positional arguments for the action
  • action_kwargs: Keyword arguments for the action
  • compensation_args: Additional positional arguments for compensation (after action result)
  • compensation_kwargs: Keyword arguments for the compensation

Returns: The result of the action function

Raises: Any exception raised by the action function (after running compensations)

Example:

async with Saga() as saga:
    order = await saga.step(
        action=create_order,
        compensation=cancel_order
    )

SyncSaga (Sync)

Synchronous implementation for traditional blocking operations.

step(action, compensation, *, action_args=(), action_kwargs=None, compensation_args=(), compensation_kwargs=None)

Execute a single synchronous step in the saga. Must be called within a with SyncSaga() context manager.

Parameters:

  • action: Synchronous function to execute
  • compensation: Synchronous function to compensate if this or later steps fail
  • action_args: Positional arguments for the action
  • action_kwargs: Keyword arguments for the action
  • compensation_args: Additional positional arguments for compensation (after action result)
  • compensation_kwargs: Keyword arguments for the compensation

Returns: The result of the action function

Raises: Any exception raised by the action function (after running compensations)

Example:

with SyncSaga() as saga:
    order = saga.step(
        action=create_order,
        compensation=cancel_order
    )

StepResult

Dataclass containing the result of a saga step execution.

Attributes:

  • step_index: int - The index of the step
  • step_name: str - The name of the action function
  • result: Any - The result returned by the action

SagaStep / SyncSagaStep

Dataclass representing a single step in the saga with action and compensation.

Attributes:

  • action: Callable - The action function
  • compensation: Callable - The compensation function
  • action_args: tuple - Positional arguments for the action
  • action_kwargs: dict - Keyword arguments for the action
  • compensation_args: tuple - Additional positional arguments for the compensation
  • compensation_kwargs: dict - Keyword arguments for the compensation

Design Decisions

Why Separate Saga and SyncSaga?

Version 0.0.6 introduced separate classes for async and sync operations:

  1. Type Safety: Clearer type hints and better IDE support
  2. Performance: Optimized implementations for each use case
  3. Clarity: Explicit choice between async and sync patterns
  4. Maintenance: Easier to maintain and extend independently

Compensation Behavior

  • Compensations run in reverse order (LIFO)
  • Compensation failures are logged but don't stop the chain
  • Each compensation receives the action's result as the first argument
  • You can provide additional arguments via compensation_args and compensation_kwargs

Development

Setup

# Clone the repository
git clone https://github.com/wakita181009/simple-saga.git
cd simple-saga

# Install dependencies
poetry install

# Run tests
poetry run pytest

# Run type checking
poetry run mypy simple_saga

# Run linting
poetry run ruff check simple_saga

Project Structure

simple-saga/
โ”œโ”€โ”€ simple_saga/
โ”‚   โ”œโ”€โ”€ __init__.py           # Package exports
โ”‚   โ”œโ”€โ”€ schema.py             # Data classes (StepResult, SagaStep, SyncSagaStep)
โ”‚   โ””โ”€โ”€ saga/
โ”‚       โ”œโ”€โ”€ __init__.py       # Saga package exports
โ”‚       โ”œโ”€โ”€ base.py           # Base class with shared logic (_SagaBase)
โ”‚       โ”œโ”€โ”€ saga.py           # Saga (async implementation)
โ”‚       โ””โ”€โ”€ sync_saga.py      # SyncSaga (sync implementation)
โ”œโ”€โ”€ tests/
โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”œโ”€โ”€ conftest.py
โ”‚   โ””โ”€โ”€ saga/
โ”‚       โ”œโ”€โ”€ __init__.py
โ”‚       โ”œโ”€โ”€ test_saga.py           # Core async saga functionality
โ”‚       โ”œโ”€โ”€ test_sync_saga.py      # Sync saga functionality
โ”‚       โ””โ”€โ”€ test_compensation.py   # Compensation behavior
โ”œโ”€โ”€ pyproject.toml       # Project configuration
โ”œโ”€โ”€ README.md            # This file
โ””โ”€โ”€ CLAUDE.md            # Development guide

License

MIT License - see LICENSE file for details

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Acknowledgments

This library implements the Saga pattern as described in:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_saga-0.1.4.tar.gz (11.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_saga-0.1.4-py3-none-any.whl (11.9 kB view details)

Uploaded Python 3

File details

Details for the file simple_saga-0.1.4.tar.gz.

File metadata

  • Download URL: simple_saga-0.1.4.tar.gz
  • Upload date:
  • Size: 11.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for simple_saga-0.1.4.tar.gz
Algorithm Hash digest
SHA256 bbcb477ea7e4b7b4dab28d03a73f7a477bc0ca33f878424eedd9b3327d2c17a6
MD5 3fcbb1c5ca9f8d94e28f3a4f6a59b997
BLAKE2b-256 4d642972fe19cb08e136d4bee5c5aaedf483efa785258c4c695320a55aef8872

See more details on using hashes here.

Provenance

The following attestation bundles were made for simple_saga-0.1.4.tar.gz:

Publisher: publish.yml on wakita181009/simple-saga

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file simple_saga-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: simple_saga-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 11.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for simple_saga-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 16c67983bb1c1acd498c9aa6c35a6996315b5cc6c95c10aa099030ca040dc41a
MD5 21336076bc2938538f71c445089f3b58
BLAKE2b-256 b2ac2e3b869215327c1408d18fe053ba8d25c0213ab791cf9beca5945dd29042

See more details on using hashes here.

Provenance

The following attestation bundles were made for simple_saga-0.1.4-py3-none-any.whl:

Publisher: publish.yml on wakita181009/simple-saga

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page