Saga pattern for Celery with automatic compensation
Project description
celery-saga
Saga pattern for Celery with automatic compensation.
Define distributed transactions as a series of steps, each with a compensation (rollback) function. If any step fails, previously completed steps are automatically compensated in reverse order.
Install
pip install celery-saga
With Redis state backend:
pip install celery-saga[redis]
Quick Start
Define Steps
from celery_saga import saga_step, StepResponse
@saga_step(compensate="myapp.tasks.refund_payment")
@app.task
def charge_payment(**kwargs):
txn = payment_service.charge(kwargs["order_id"], kwargs["amount"])
return StepResponse(
output={"transaction_id": txn.id},
compensation_data={"transaction_id": txn.id, "amount": kwargs["amount"]},
)
@app.task
def refund_payment(compensation_data):
payment_service.refund(
compensation_data["transaction_id"],
compensation_data["amount"],
)
Define and Run a Saga
Builder API:
from celery_saga import Saga
order_saga = (
Saga("order_saga")
.add_step(validate_order, no_compensation=True)
.add_step(charge_payment)
.add_step(reserve_inventory)
.add_step(send_confirmation, no_compensation=True)
)
result = order_saga.run(order_id="abc-123", amount=99.99)
output = result.get(timeout=30)
Functional API:
from celery_saga import saga, step
@saga("order_saga")
def order_saga(input):
order = step(validate_order, input)
payment = step(charge_payment, order)
step(send_confirmation, payment)
return payment
result = order_saga.run(order_id="abc-123", amount=99.99)
Features
Parallel Steps
from celery_saga import Saga, parallelize
order_saga = (
Saga("order_saga")
.add_step(validate_order)
.add_parallel(charge_payment, reserve_inventory)
.add_step(send_confirmation)
)
Data Transforms
order_saga = (
Saga("order_saga")
.add_step(validate_order)
# Global context transform
.add_transform(lambda ctx: {**ctx, "amount_cents": ctx["amount"] * 100})
.add_step(charge_in_cents)
# Per-step input mapper
.add_step(
send_confirmation,
input=lambda ctx: {"order_id": ctx["order_id"], "txn": ctx["transaction_id"]},
)
)
StepResponse
Separates forward output from compensation data:
return StepResponse(
output={"transaction_id": txn.id}, # passed to next steps
compensation_data={"transaction_id": txn.id}, # passed to rollback function
)
Permanent Failure
Skip retries and trigger compensation with partial cleanup data:
@saga_step(compensate="cleanup_records")
@app.task
def process_batch(**kwargs):
processed = []
for item in kwargs["items"]:
if item.invalid:
raise StepResponse.permanent_failure(
"Invalid item found",
compensation_data={"processed_ids": processed},
)
processed.append(do_work(item))
return StepResponse(output={"processed": processed})
Idempotency
result = order_saga.run(
order_id="abc-123",
idempotency_key="order-abc-123",
)
State Backends
from celery_saga import set_default_backend
from celery_saga.backends import RedisSagaBackend, MemorySagaBackend
# Redis (production)
set_default_backend(RedisSagaBackend(url="redis://localhost:6379/0"))
# Memory (testing)
set_default_backend(MemorySagaBackend())
Saga Lifecycle
PENDING → RUNNING → COMPLETED (happy path)
↘ COMPENSATING → COMPENSATED (step failed, rollback succeeded)
↘ FAILED (rollback also failed)
How It Works
Under the hood, saga.run():
- Creates a
SagaExecutionrecord in the state backend - Builds a Celery
chainwith orchestrator tasks between each step - Each orchestrator task records step results and merges output into context
- On failure, reads completed steps and dispatches a reverse compensation chain
- Compensation tasks run in reverse order of completion
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery_saga-0.1.1.tar.gz.
File metadata
- Download URL: celery_saga-0.1.1.tar.gz
- Upload date:
- Size: 19.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8b20cf4dfe525cfcf06c49c927a25efd82754c5fed19322c45d7fa8201cd6188
|
|
| MD5 |
a4b596811b460942e5a7f6914dbb6b7b
|
|
| BLAKE2b-256 |
053dc9aa57dae925a65407e9fc0f88f3452eda575f400140480ea67492774436
|
File details
Details for the file celery_saga-0.1.1-py3-none-any.whl.
File metadata
- Download URL: celery_saga-0.1.1-py3-none-any.whl
- Upload date:
- Size: 17.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
25235f02560008fe36c9b0e910606e62b104f6643c04f729554b51c52184298d
|
|
| MD5 |
72fc5d76e1bf3afe379da33a54d920f1
|
|
| BLAKE2b-256 |
a09baee744c3361504923136fb77e8e58a4233854a81400524b3337d06e1261d
|