A lightweight implementation of the Saga pattern for managing distributed transactions in Python
Project description
Simple Saga
A lightweight implementation of the Saga pattern for managing distributed transactions in Python, inspired by Arrow-kt's functional approach.
Overview
The Saga pattern breaks down distributed transactions into a series of local transactions, each with a compensating transaction that can undo the changes if a later step fails. This library provides a simple, type-safe implementation with Arrow-kt style DSL.
Features
- โ Arrow-kt Style DSL - Intuitive async context manager API
- ๐ Automatic Compensation - Failed transactions are automatically rolled back
- ๐ Result Chaining - Use results from previous steps in subsequent steps
- โก Sync & Async Support - Works with both synchronous and asynchronous functions
- ๐ Type Safe - Full type hints with mypy support
- ๐ชถ Lightweight - Zero dependencies (uses only Python standard library)
- ๐ Well Documented - Comprehensive docstrings and examples
Installation
pip install simple-saga
Or with Poetry:
poetry add simple-saga
Quick Start
import asyncio
from simple_saga import Saga
# Define your business logic
def create_order(order_id: str) -> dict:
print(f"Creating order: {order_id}")
return {"order_id": order_id, "status": "created"}
def cancel_order(order: dict) -> None:
print(f"Cancelling order: {order['order_id']}")
async def reserve_inventory(product_id: str) -> dict:
print(f"Reserving inventory for: {product_id}")
return {"product_id": product_id, "reserved": True}
async def release_inventory(inventory: dict) -> None:
print(f"Releasing inventory for: {inventory['product_id']}")
def charge_payment(amount: float) -> dict:
print(f"Charging payment: ${amount}")
# Simulating a payment failure
raise Exception("Payment failed")
def refund_payment(payment: dict) -> None:
print("Refunding payment")
# Execute the saga
async def main():
try:
async with Saga() as saga:
# Step 1: Create order
order = await saga.step(
action=lambda: create_order("ORDER-123"),
compensation=lambda order: cancel_order(order)
)
# Step 2: Reserve inventory (uses order from step 1)
inventory = await saga.step(
action=lambda: reserve_inventory("PRODUCT-456"),
compensation=lambda inv: release_inventory(inv)
)
# Step 3: Charge payment (this will fail)
payment = await saga.step(
action=lambda: charge_payment(99.99),
compensation=lambda pay: refund_payment(pay)
)
print("โ
All steps completed successfully!")
except Exception as e:
print(f"โ Saga failed: {e}")
print("โ
All completed steps have been compensated automatically")
if __name__ == "__main__":
asyncio.run(main())
Output
Creating order: ORDER-123
โ Step 1 completed: <lambda>
Reserving inventory for: PRODUCT-456
โ Step 2 completed: <lambda>
Charging payment: $99.99
โ Error at step 3: Payment failed
๐ Starting compensation...
Releasing inventory for: PRODUCT-456
โ Compensated step 2: <lambda>
Cancelling order: ORDER-123
โ Compensated step 1: <lambda>
โ Saga failed: Payment failed
โ
All completed steps have been compensated automatically
Key Features
1. Result Chaining Between Steps
The most powerful feature is the ability to use results from previous steps:
async with Saga() as saga:
# Step 1: Create order
order = await saga.step(
action=lambda: create_order("ORDER-123"),
compensation=lambda order: cancel_order(order)
)
# Step 2: Use order data from step 1
inventory = await saga.step(
action=lambda: reserve_inventory(order["order_id"]), # Uses order
compensation=lambda inv: release_inventory(inv)
)
# Step 3: Use both order and inventory
shipment = await saga.step(
action=lambda: create_shipment(order, inventory), # Uses both
compensation=lambda ship: cancel_shipment(ship)
)
2. Automatic Compensation
Compensations receive the action result automatically:
async with Saga() as saga:
result = await saga.step(
action=lambda: {"id": 123, "status": "created"},
compensation=lambda result: delete_resource(result["id"]) # Gets action result
)
3. Passing Additional Arguments to Compensation
You can pass previous step results to compensations:
async with Saga() as saga:
order = await saga.step(
action=lambda: create_order("ORDER-123"),
compensation=lambda order: cancel_order(order)
)
inventory = await saga.step(
action=lambda: reserve_inventory(order["order_id"]),
compensation=lambda inv, order_ref: release_inventory(inv, order_ref),
compensation_args=(order,) # Pass order to compensation
)
The compensation receives:
- First argument: The action's result (
inv) - Following arguments: Values from
compensation_args(order_ref) - Keyword arguments: Values from
compensation_kwargs
4. Mixed Sync and Async Operations
async with Saga() as saga:
# Synchronous step
order = await saga.step(
action=lambda: create_order("ORDER-123"), # Sync
compensation=lambda order: cancel_order(order)
)
# Asynchronous step
inventory = await saga.step(
action=lambda: reserve_inventory("PRODUCT-456"), # Async
compensation=lambda inv: release_inventory(inv)
)
5. Logging Control
The library uses Python's standard logging module:
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
# Or disable saga logs
logging.getLogger("simple_saga").setLevel(logging.WARNING)
API Reference
Saga
Main class for defining and executing sagas using Arrow-kt style DSL.
async step(action, compensation, *, action_args=(), action_kwargs=None, compensation_args=(), compensation_kwargs=None)
Execute a single step in the saga. Must be called within an async with Saga() context manager.
Parameters:
action: Function to execute (can be sync or async)compensation: Function to compensate if this or later steps fail (can be sync or async)action_args: Positional arguments for the actionaction_kwargs: Keyword arguments for the actioncompensation_args: Additional positional arguments for compensation (after action result)compensation_kwargs: Keyword arguments for the compensation
Returns: The result of the action function
Raises: Any exception raised by the action function (after running compensations)
Example:
async with Saga() as saga:
order = await saga.step(
action=lambda: create_order("ORDER-123"),
compensation=lambda order: cancel_order(order)
)
inventory = await saga.step(
action=lambda: reserve_inventory(order["order_id"]),
compensation=lambda inv, order_ref: release_inventory(inv, order_ref),
compensation_args=(order,) # Pass order to compensation
)
StepResult
Dataclass containing the result of a saga step execution.
Attributes:
step_index: int - The index of the stepstep_name: str - The name of the action functionresult: Any - The result returned by the action
SagaStep
Dataclass representing a single step in the saga with action and compensation.
Attributes:
action: Callable - The action functioncompensation: Callable - The compensation functionaction_args: tuple - Positional arguments for the actionaction_kwargs: dict - Keyword arguments for the actioncompensation_args: tuple - Additional positional arguments for the compensationcompensation_kwargs: dict - Keyword arguments for the compensation
Design Decisions
Why Arrow-kt Style?
The Arrow-kt style DSL with async context managers provides:
- Natural result chaining: Use previous results directly as variables
- Automatic cleanup: Context manager ensures compensations run on failure
- Intuitive flow: Code reads like a sequence of operations
- Type safety: Results are properly typed variables
Compensation Behavior
- Compensations run in reverse order (LIFO)
- Compensation failures are logged but don't stop the chain
- Each compensation receives the action's result as the first argument
- You can provide additional arguments via
compensation_argsandcompensation_kwargs
Why Always Async?
Even though the library supports synchronous functions, the step() method is async to:
- Handle mixed sync/async steps uniformly
- Use async patterns for I/O-bound operations
- Keep the API simple and consistent
Development
Setup
# Clone the repository
git clone https://github.com/yourusername/simple-saga.git
cd simple-saga
# Install dependencies
poetry install
# Run tests
poetry run pytest
# Run type checking
poetry run mypy simple_saga
# Run linting
poetry run ruff check simple_saga
Project Structure
simple-saga/
โโโ simple_saga/
โ โโโ __init__.py # Package exports
โ โโโ saga.py # Main Saga implementation
โ โโโ schema.py # Data classes (StepResult, SagaStep)
โโโ tests/
โ โโโ test_saga.py # Core saga functionality
โ โโโ test_compensation.py # Compensation behavior
โ โโโ test_sync_async.py # Mixed sync/async scenarios
โโโ pyproject.toml # Project configuration
โโโ README.md # This file
โโโ CLAUDE.md # Development guide
License
MIT License - see LICENSE file for details
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Acknowledgments
This library implements the Saga pattern as described in:
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file simple_saga-0.0.6.tar.gz.
File metadata
- Download URL: simple_saga-0.0.6.tar.gz
- Upload date:
- Size: 10.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
03cded2d6992d11327de778ad1da28e5603476575932bb2a15b22fd23926dde0
|
|
| MD5 |
d98fd1d15d67e74f35ff53ea63a796fd
|
|
| BLAKE2b-256 |
de9103801f5680959f96992e7bca033a213dfe72fcea0bcb26db9a90ed677fdf
|
Provenance
The following attestation bundles were made for simple_saga-0.0.6.tar.gz:
Publisher:
publish.yml on wakita181009/simple-saga
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
simple_saga-0.0.6.tar.gz -
Subject digest:
03cded2d6992d11327de778ad1da28e5603476575932bb2a15b22fd23926dde0 - Sigstore transparency entry: 705335439
- Sigstore integration time:
-
Permalink:
wakita181009/simple-saga@82177a4967375b00dcfe8a0fc1b06084b55b365d -
Branch / Tag:
refs/tags/v0.0.6 - Owner: https://github.com/wakita181009
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@82177a4967375b00dcfe8a0fc1b06084b55b365d -
Trigger Event:
push
-
Statement type:
File details
Details for the file simple_saga-0.0.6-py3-none-any.whl.
File metadata
- Download URL: simple_saga-0.0.6-py3-none-any.whl
- Upload date:
- Size: 11.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
298681ca16429cfe5363b29248bb0553febe757ff1eff060156a884fa72d8597
|
|
| MD5 |
f802381a7c02a95b9503c29c12023b3a
|
|
| BLAKE2b-256 |
52719325687f9a75f397a5da6bba79be8701e198887fd69ae2316349183734b7
|
Provenance
The following attestation bundles were made for simple_saga-0.0.6-py3-none-any.whl:
Publisher:
publish.yml on wakita181009/simple-saga
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
simple_saga-0.0.6-py3-none-any.whl -
Subject digest:
298681ca16429cfe5363b29248bb0553febe757ff1eff060156a884fa72d8597 - Sigstore transparency entry: 705335449
- Sigstore integration time:
-
Permalink:
wakita181009/simple-saga@82177a4967375b00dcfe8a0fc1b06084b55b365d -
Branch / Tag:
refs/tags/v0.0.6 - Owner: https://github.com/wakita181009
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@82177a4967375b00dcfe8a0fc1b06084b55b365d -
Trigger Event:
push
-
Statement type: