Production-ready Saga pattern with DAG support
Project description
Sagaz - Production-Ready Saga Pattern for Python
Enterprise-grade distributed transaction orchestration with exactly-once semantics.
โ 96% Test Coverage Achieved - Exceeding 95% target with 793 passing tests (includes chaos engineering tests)
๐ Features
Core Saga Pattern
- โ Sequential & Parallel (DAG) execution - Optimize throughput with dependency graphs
- โ Automatic compensation - Rollback on failures with transaction safety
- โ Three failure strategies - FAIL_FAST, WAIT_ALL, FAIL_FAST_WITH_GRACE
- โ Retry logic - Exponential backoff with configurable limits
- โ Timeout protection - Per-step and global timeouts
- โ Idempotency support - Safe retries and recovery
Transactional Outbox Pattern
- โ Exactly-once delivery - Transactional event publishing
- ๐ Optimistic sending - 10x latency improvement (<10ms)
- ๐ Consumer inbox - Exactly-once processing guarantee
- โ Multiple brokers - Redis Streams, Kafka, RabbitMQ, or in-memory
- โ Dead letter queue - Automatic failure handling
- โ Worker auto-scaling - Kubernetes HPA support
Storage Backends
- โ PostgreSQL - Production-grade with ACID guarantees
- โ Redis - High-performance caching layer
- โ In-Memory - Testing and development
Monitoring & Operations
- โ Prometheus metrics - 40+ metrics exposed
- โ OpenTelemetry tracing - Distributed tracing support
- โ Structured logging - JSON logs with correlation IDs
- ๐ Kubernetes manifests - Production-ready deployment
- โ Health checks - Liveness and readiness probes
- ๐ Chaos engineering tests - 12 resilience tests validating production readiness
๐ฆ Installation
# Core library
pip install sagaz
# With PostgreSQL support
pip install sagaz[postgresql]
# With Kafka broker
pip install sagaz[kafka]
# All features
pip install sagaz[all]
๐ฏ Quick Start
Basic Saga (Declarative API)
from sagaz import Saga, action, compensate
class OrderSaga(Saga):
saga_name = "order-processing"
@action("reserve_inventory")
async def reserve_inventory(self, ctx):
inventory_id = await inventory_service.reserve(ctx["order_id"])
return {"inventory_id": inventory_id}
@compensate("reserve_inventory")
async def release_inventory(self, ctx):
await inventory_service.release(ctx["inventory_id"])
@action("charge_payment", depends_on=["reserve_inventory"])
async def charge_payment(self, ctx):
return await payment_service.charge(ctx["amount"])
# Execute saga
saga = OrderSaga()
result = await saga.run({"order_id": "123", "amount": 99.99})
Classic API (Imperative)
from sagaz import ClassicSaga
saga = ClassicSaga(name="OrderSaga")
# These run in parallel (no dependencies)
await saga.add_step("check_inventory", check_inventory, compensate_inventory, dependencies=set())
await saga.add_step("validate_address", validate_address, None, dependencies=set())
# This waits for both
await saga.add_step(
"reserve_items",
reserve_items,
release_items,
dependencies={"check_inventory", "validate_address"}
)
result = await saga.execute()
Transactional Outbox + Optimistic Sending ๐
from sagaz.outbox import OptimisticPublisher, OutboxWorker
from sagaz.outbox.storage import PostgreSQLOutboxStorage
from sagaz.outbox.brokers import KafkaBroker
# Setup
storage = PostgreSQLOutboxStorage("postgresql://localhost/db")
broker = KafkaBroker(bootstrap_servers="localhost:9092")
publisher = OptimisticPublisher(storage, broker, enabled=True)
# Publish event transactionally
async with db.transaction():
await saga_storage.save(saga)
await outbox_storage.insert(event)
# Transaction committed
# Immediate publish (< 10ms) ๐ฅ
await publisher.publish_after_commit(event)
# Falls back to worker if fails
Consumer Inbox (Exactly-Once) ๐
from sagaz.outbox import ConsumerInbox
inbox = ConsumerInbox(storage, consumer_name="order-service")
async def process_order(payload: dict):
order = await create_order(payload)
return {"order_id": order.id}
# Exactly-once processing - duplicates automatically skipped
result = await inbox.process_idempotent(
event_id=msg.headers['message_id'],
source_topic=msg.topic,
event_type="OrderCreated",
payload=msg.value,
handler=process_order
)
โธ๏ธ Kubernetes Deployment
# One-command deployment
kubectl create namespace sagaz
kubectl apply -f k8s/
# Deployed components:
# - PostgreSQL StatefulSet (20Gi persistent storage)
# - Outbox Worker Deployment (3-10 replicas with HPA)
# - Prometheus ServiceMonitor + 8 Alert Rules
# - Database Migration Job
Features:
- Auto-scaling based on pending events
- Zero-downtime rolling updates
- Built-in health checks
- Production security (non-root, read-only fs)
- Complete monitoring stack
See k8s/README.md for detailed deployment guide.
๐ Monitoring
Prometheus Metrics
# Saga metrics
saga_execution_total{status}
saga_execution_duration_seconds
saga_step_duration_seconds{step_name}
# Outbox metrics
outbox_pending_events_total
outbox_published_events_total
outbox_optimistic_send_success_total # ๐
consumer_inbox_duplicates_total # ๐
Grafana Alerts
- OutboxHighLag - >5000 pending events for 10min
- OutboxWorkerDown - No workers running
- OutboxHighErrorRate - >1% publish failures
- OptimisticSendHighFailureRate - >10% optimistic failures ๐
๐ฅ Chaos Engineering
Production readiness validated through deliberate failure injection.
The library includes comprehensive chaos engineering tests that verify system resilience:
Test Categories
- โ Worker Crash Recovery - Workers can recover from crashes, no data loss
- โ Database Connection Loss - Graceful handling of DB failures with retry
- โ Broker Downtime - Messages not lost when broker unavailable
- โ Network Partitions - No duplicate processing under split-brain
- โ Concurrent Failures - System recovers from multiple simultaneous failures
- โ Data Consistency - Exactly-once guarantees maintained under chaos
Run Chaos Tests
# Run all chaos engineering tests
pytest tests/test_chaos_engineering.py -v -m chaos
# Test specific failure scenario
pytest tests/test_chaos_engineering.py::TestWorkerCrashRecovery -v
Key Findings:
- โ No data loss even with 30% random failure rate
- โ Exactly-once processing with 5 concurrent workers
- โ Graceful handling of 50 events under extreme load
- โ Automatic recovery with exponential backoff
See docs/CHAOS_ENGINEERING.md for detailed chaos test documentation.
๐ Documentation
| Topic | Link |
|---|---|
| Documentation Index | docs/DOCUMENTATION_INDEX.md |
| DAG Pattern | docs/feature_compensation_graph.md |
| Optimistic Sending ๐ | docs/optimistic-sending.md |
| Consumer Inbox ๐ | docs/consumer-inbox.md |
| Kubernetes Deploy ๐ | k8s/README.md |
| Chaos Engineering ๐ | docs/CHAOS_ENGINEERING.md |
| Implementation Details | docs/IMPLEMENTATION_SUMMARY.md |
| Changelog | docs/CHANGELOG.md |
๐ Performance
| Operation | Latency | Improvement |
|---|---|---|
| Saga execution | ~50ms | Baseline |
| Outbox polling | ~100ms | Baseline |
| Optimistic publish ๐ | <10ms | 10x faster โก |
| Inbox dedup check | <1ms | Sub-millisecond |
Tested on:
- PostgreSQL 16
- Kafka 3.x
- 4 CPU cores, 8GB RAM
๐ Production Stats
- โ 96% test coverage (793 passing tests)
- โ Type-safe - Full type hints
- โ Zero dependencies - Core features work standalone
- โ Well-documented - Comprehensive examples
- โ Battle-tested - Production-ready
- ๐ Kubernetes-native - Cloud-ready deployment
๐งช Development
# Clone repository
git clone https://github.com/yourusername/sage.git
cd sagaz
# Install dependencies
pip install -e ".[dev]"
# Run tests
pytest
# With coverage
pytest --cov=sage --cov-report=html
# Current: 96% coverage
๐ License
MIT License - see LICENSE file for details.
๐ Project Status
Current Version: 1.0.0 (December 2024)
Recent Updates (December 2024):
- ๐ Optimistic sending pattern (10x latency improvement)
- ๐ Consumer inbox pattern (exactly-once processing)
- ๐ Kubernetes manifests (production deployment)
- โ 96% test coverage achieved
- โ 793 passing tests
See docs/FINAL_STATUS.md for detailed status.
Need Help?
Built with โค๏ธ for distributed systems
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sagaz-1.0.1.tar.gz.
File metadata
- Download URL: sagaz-1.0.1.tar.gz
- Upload date:
- Size: 137.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8fa371c40bee9dda218179ebb5acf47d75e1a763d1ac39381c1affdaaac2752c
|
|
| MD5 |
c5a328c26e112692eb9c0a5973abaa6f
|
|
| BLAKE2b-256 |
347257d0e274d2511dbb7b5e32281763cef6cb559d6b2b64d0d8b00f35883cfd
|
File details
Details for the file sagaz-1.0.1-py3-none-any.whl.
File metadata
- Download URL: sagaz-1.0.1-py3-none-any.whl
- Upload date:
- Size: 31.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
456dfe24b197afb17a433d358522ba2bd6477410cc5bdcc7dd9620736902e956
|
|
| MD5 |
6543b335c30036d7cfecf1c453a12941
|
|
| BLAKE2b-256 |
394f377122fb0b47a6dad400c93101734e43c4425aa2390bd058c2f180792c86
|