A lightweight implementation of the Saga pattern for managing distributed transactions in Python
Project description
Simple Saga
A lightweight implementation of the Saga pattern for managing distributed transactions in Python, inspired by Arrow-kt's functional approach.
Overview
The Saga pattern breaks down distributed transactions into a series of local transactions, each with a compensating transaction that can undo the changes if a later step fails. This library provides a simple, type-safe implementation with Arrow-kt style DSL.
Features
- โ Arrow-kt Style DSL - Intuitive context manager API
- ๐ Automatic Compensation - Failed transactions are automatically rolled back
- ๐ Result Chaining - Use results from previous steps in subsequent steps
- โก Sync & Async Support - Separate
Saga(async) andSyncSaga(sync) implementations - ๐ Type Safe - Full type hints with mypy support
- ๐ชถ Lightweight - Zero dependencies (uses only Python standard library)
- ๐ Well Documented - Comprehensive docstrings and examples
Installation
pip install simple-saga
Or with Poetry:
poetry add simple-saga
Quick Start (Async)
import asyncio
from simple_saga import Saga
# Define your async business logic
async def create_order(order_id: str) -> dict:
print(f"Creating order: {order_id}")
return {"order_id": order_id, "status": "created"}
async def cancel_order(order: dict) -> None:
print(f"Cancelling order: {order['order_id']}")
async def reserve_inventory(product_id: str) -> dict:
print(f"Reserving inventory for: {product_id}")
return {"product_id": product_id, "reserved": True}
async def release_inventory(inventory: dict) -> None:
print(f"Releasing inventory for: {inventory['product_id']}")
async def charge_payment(amount: float) -> dict:
print(f"Charging payment: ${amount}")
# Simulating a payment failure
raise Exception("Payment failed")
async def refund_payment(payment: dict) -> None:
print("Refunding payment")
# Execute the saga
async def main():
try:
async with Saga() as saga:
# Step 1: Create order
order = await saga.step(
action=lambda: create_order("ORDER-123"),
compensation=lambda order: cancel_order(order)
)
# Step 2: Reserve inventory (uses order from step 1)
inventory = await saga.step(
action=lambda: reserve_inventory("PRODUCT-456"),
compensation=lambda inv: release_inventory(inv)
)
# Step 3: Charge payment (this will fail)
payment = await saga.step(
action=lambda: charge_payment(99.99),
compensation=lambda pay: refund_payment(pay)
)
print("โ
All steps completed successfully!")
except Exception as e:
print(f"โ Saga failed: {e}")
print("โ
All completed steps have been compensated automatically")
if __name__ == "__main__":
asyncio.run(main())
Quick Start (Sync)
For synchronous operations, use SyncSaga:
from simple_saga import SyncSaga
# Define your synchronous business logic
def create_order(order_id: str) -> dict:
print(f"Creating order: {order_id}")
return {"order_id": order_id, "status": "created"}
def cancel_order(order: dict) -> None:
print(f"Cancelling order: {order['order_id']}")
def reserve_inventory(product_id: str) -> dict:
print(f"Reserving inventory for: {product_id}")
return {"product_id": product_id, "reserved": True}
def release_inventory(inventory: dict) -> None:
print(f"Releasing inventory for: {inventory['product_id']}")
def charge_payment(amount: float) -> dict:
print(f"Charging payment: ${amount}")
# Simulating a payment failure
raise Exception("Payment failed")
def refund_payment(payment: dict) -> None:
print("Refunding payment")
# Execute the saga
def main():
try:
with SyncSaga() as saga:
# Step 1: Create order
order = saga.step(
action=lambda: create_order("ORDER-123"),
compensation=lambda order: cancel_order(order)
)
# Step 2: Reserve inventory (uses order from step 1)
inventory = saga.step(
action=lambda: reserve_inventory("PRODUCT-456"),
compensation=lambda inv: release_inventory(inv)
)
# Step 3: Charge payment (this will fail)
payment = saga.step(
action=lambda: charge_payment(99.99),
compensation=lambda pay: refund_payment(pay)
)
print("โ
All steps completed successfully!")
except Exception as e:
print(f"โ Saga failed: {e}")
print("โ
All completed steps have been compensated automatically")
if __name__ == "__main__":
main()
Output
Creating order: ORDER-123
โ Step 1 completed: <lambda>
Reserving inventory for: PRODUCT-456
โ Step 2 completed: <lambda>
Charging payment: $99.99
โ Error at step 3: Payment failed
๐ Starting compensation...
Releasing inventory for: PRODUCT-456
โ Compensated step 2: <lambda>
Cancelling order: ORDER-123
โ Compensated step 1: <lambda>
โ Saga failed: Payment failed
โ
All completed steps have been compensated automatically
Key Features
1. Result Chaining Between Steps
The most powerful feature is the ability to use results from previous steps:
async with Saga() as saga:
# Step 1: Create order
order = await saga.step(
action=lambda: create_order("ORDER-123"),
compensation=lambda order: cancel_order(order)
)
# Step 2: Use order data from step 1
inventory = await saga.step(
action=lambda: reserve_inventory(order["order_id"]), # Uses order
compensation=lambda inv: release_inventory(inv)
)
# Step 3: Use both order and inventory
shipment = await saga.step(
action=lambda: create_shipment(order, inventory), # Uses both
compensation=lambda ship: cancel_shipment(ship)
)
2. Automatic Compensation
Compensations receive the action result automatically:
async with Saga() as saga:
result = await saga.step(
action=lambda: {"id": 123, "status": "created"},
compensation=lambda result: delete_resource(result["id"]) # Gets action result
)
3. Passing Additional Arguments to Compensation
You can pass previous step results to compensations:
async with Saga() as saga:
order = await saga.step(
action=lambda: create_order("ORDER-123"),
compensation=lambda order: cancel_order(order)
)
inventory = await saga.step(
action=lambda: reserve_inventory(order["order_id"]),
compensation=lambda inv, order_ref: release_inventory(inv, order_ref),
compensation_args=(order,) # Pass order to compensation
)
The compensation receives:
- First argument: The action's result (
inv) - Following arguments: Values from
compensation_args(order_ref) - Keyword arguments: Values from
compensation_kwargs
4. Choosing Between Saga and SyncSaga
Use Saga (async) when:
- Working with async I/O operations (database, network, file I/O)
- Building async web applications (FastAPI, aiohttp)
- Need to handle multiple concurrent operations
- Using
asyncioecosystem
Use SyncSaga (sync) when:
- Working with synchronous libraries
- Building traditional web applications (Flask, Django)
- Simpler code without async/await complexity
- Performance is not I/O bound
5. Logging Control
The library uses Python's standard logging module:
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
# Or disable saga logs
logging.getLogger("simple_saga").setLevel(logging.WARNING)
API Reference
Saga (Async)
Asynchronous implementation for async/await operations.
async step(action, compensation, *, action_args=(), action_kwargs=None, compensation_args=(), compensation_kwargs=None)
Execute a single asynchronous step in the saga. Must be called within an async with Saga() context manager.
Parameters:
action: Async function to executecompensation: Async function to compensate if this or later steps failaction_args: Positional arguments for the actionaction_kwargs: Keyword arguments for the actioncompensation_args: Additional positional arguments for compensation (after action result)compensation_kwargs: Keyword arguments for the compensation
Returns: The result of the action function
Raises: Any exception raised by the action function (after running compensations)
Example:
async with Saga() as saga:
order = await saga.step(
action=create_order,
compensation=cancel_order
)
SyncSaga (Sync)
Synchronous implementation for traditional blocking operations.
step(action, compensation, *, action_args=(), action_kwargs=None, compensation_args=(), compensation_kwargs=None)
Execute a single synchronous step in the saga. Must be called within a with SyncSaga() context manager.
Parameters:
action: Synchronous function to executecompensation: Synchronous function to compensate if this or later steps failaction_args: Positional arguments for the actionaction_kwargs: Keyword arguments for the actioncompensation_args: Additional positional arguments for compensation (after action result)compensation_kwargs: Keyword arguments for the compensation
Returns: The result of the action function
Raises: Any exception raised by the action function (after running compensations)
Example:
with SyncSaga() as saga:
order = saga.step(
action=create_order,
compensation=cancel_order
)
StepResult
Dataclass containing the result of a saga step execution.
Attributes:
step_index: int - The index of the stepstep_name: str - The name of the action functionresult: Any - The result returned by the action
SagaStep / SyncSagaStep
Dataclass representing a single step in the saga with action and compensation.
Attributes:
action: Callable - The action functioncompensation: Callable - The compensation functionaction_args: tuple - Positional arguments for the actionaction_kwargs: dict - Keyword arguments for the actioncompensation_args: tuple - Additional positional arguments for the compensationcompensation_kwargs: dict - Keyword arguments for the compensation
Design Decisions
Why Separate Saga and SyncSaga?
Version 0.0.6 introduced separate classes for async and sync operations:
- Type Safety: Clearer type hints and better IDE support
- Performance: Optimized implementations for each use case
- Clarity: Explicit choice between async and sync patterns
- Maintenance: Easier to maintain and extend independently
Compensation Behavior
- Compensations run in reverse order (LIFO)
- Compensation failures are logged but don't stop the chain
- Each compensation receives the action's result as the first argument
- You can provide additional arguments via
compensation_argsandcompensation_kwargs
Development
Setup
# Clone the repository
git clone https://github.com/wakita181009/simple-saga.git
cd simple-saga
# Install dependencies
poetry install
# Run tests
poetry run pytest
# Run type checking
poetry run mypy simple_saga
# Run linting
poetry run ruff check simple_saga
Project Structure
simple-saga/
โโโ simple_saga/
โ โโโ __init__.py # Package exports
โ โโโ schema.py # Data classes (StepResult, SagaStep, SyncSagaStep)
โ โโโ saga/
โ โโโ __init__.py # Saga package exports
โ โโโ base.py # Base class with shared logic (_SagaBase)
โ โโโ saga.py # Saga (async implementation)
โ โโโ sync_saga.py # SyncSaga (sync implementation)
โโโ tests/
โ โโโ __init__.py
โ โโโ conftest.py
โ โโโ saga/
โ โโโ __init__.py
โ โโโ test_saga.py # Core async saga functionality
โ โโโ test_sync_saga.py # Sync saga functionality
โ โโโ test_compensation.py # Compensation behavior
โโโ pyproject.toml # Project configuration
โโโ README.md # This file
โโโ CLAUDE.md # Development guide
License
MIT License - see LICENSE file for details
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Acknowledgments
This library implements the Saga pattern as described in:
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file simple_saga-0.1.4.tar.gz.
File metadata
- Download URL: simple_saga-0.1.4.tar.gz
- Upload date:
- Size: 11.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bbcb477ea7e4b7b4dab28d03a73f7a477bc0ca33f878424eedd9b3327d2c17a6
|
|
| MD5 |
3fcbb1c5ca9f8d94e28f3a4f6a59b997
|
|
| BLAKE2b-256 |
4d642972fe19cb08e136d4bee5c5aaedf483efa785258c4c695320a55aef8872
|
Provenance
The following attestation bundles were made for simple_saga-0.1.4.tar.gz:
Publisher:
publish.yml on wakita181009/simple-saga
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
simple_saga-0.1.4.tar.gz -
Subject digest:
bbcb477ea7e4b7b4dab28d03a73f7a477bc0ca33f878424eedd9b3327d2c17a6 - Sigstore transparency entry: 1102996417
- Sigstore integration time:
-
Permalink:
wakita181009/simple-saga@5a4f48534966e9cf0c3f7e7969f6041e361883e2 -
Branch / Tag:
refs/tags/v0.1.4 - Owner: https://github.com/wakita181009
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5a4f48534966e9cf0c3f7e7969f6041e361883e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file simple_saga-0.1.4-py3-none-any.whl.
File metadata
- Download URL: simple_saga-0.1.4-py3-none-any.whl
- Upload date:
- Size: 11.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
16c67983bb1c1acd498c9aa6c35a6996315b5cc6c95c10aa099030ca040dc41a
|
|
| MD5 |
21336076bc2938538f71c445089f3b58
|
|
| BLAKE2b-256 |
b2ac2e3b869215327c1408d18fe053ba8d25c0213ab791cf9beca5945dd29042
|
Provenance
The following attestation bundles were made for simple_saga-0.1.4-py3-none-any.whl:
Publisher:
publish.yml on wakita181009/simple-saga
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
simple_saga-0.1.4-py3-none-any.whl -
Subject digest:
16c67983bb1c1acd498c9aa6c35a6996315b5cc6c95c10aa099030ca040dc41a - Sigstore transparency entry: 1102996445
- Sigstore integration time:
-
Permalink:
wakita181009/simple-saga@5a4f48534966e9cf0c3f7e7969f6041e361883e2 -
Branch / Tag:
refs/tags/v0.1.4 - Owner: https://github.com/wakita181009
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5a4f48534966e9cf0c3f7e7969f6041e361883e2 -
Trigger Event:
push
-
Statement type: