Skip to main content

Saga pattern for Celery with automatic compensation

Project description

celery-saga

Saga pattern for Celery with automatic compensation.

Define distributed transactions as a series of steps, each with a compensation (rollback) function. If any step fails, previously completed steps are automatically compensated in reverse order.

Install

pip install celery-saga

With Redis state backend:

pip install celery-saga[redis]

Quick Start

Define Steps

from celery_saga import saga_step, StepResponse

@saga_step(compensate="myapp.tasks.refund_payment")
@app.task
def charge_payment(**kwargs):
    txn = payment_service.charge(kwargs["order_id"], kwargs["amount"])
    return StepResponse(
        output={"transaction_id": txn.id},
        compensation_data={"transaction_id": txn.id, "amount": kwargs["amount"]},
    )

@app.task
def refund_payment(compensation_data):
    payment_service.refund(
        compensation_data["transaction_id"],
        compensation_data["amount"],
    )

Define and Run a Saga

Builder API:

from celery_saga import Saga

order_saga = (
    Saga("order_saga")
    .add_step(validate_order, no_compensation=True)
    .add_step(charge_payment)
    .add_step(reserve_inventory)
    .add_step(send_confirmation, no_compensation=True)
)

result = order_saga.run(order_id="abc-123", amount=99.99)
output = result.get(timeout=30)

Functional API:

from celery_saga import saga, step

@saga("order_saga")
def order_saga(input):
    order = step(validate_order, input)
    payment = step(charge_payment, order)
    step(send_confirmation, payment)
    return payment

result = order_saga.run(order_id="abc-123", amount=99.99)

Features

Parallel Steps

from celery_saga import Saga, parallelize

order_saga = (
    Saga("order_saga")
    .add_step(validate_order)
    .add_parallel(charge_payment, reserve_inventory)
    .add_step(send_confirmation)
)

Data Transforms

order_saga = (
    Saga("order_saga")
    .add_step(validate_order)
    # Global context transform
    .add_transform(lambda ctx: {**ctx, "amount_cents": ctx["amount"] * 100})
    .add_step(charge_in_cents)
    # Per-step input mapper
    .add_step(
        send_confirmation,
        input=lambda ctx: {"order_id": ctx["order_id"], "txn": ctx["transaction_id"]},
    )
)

StepResponse

Separates forward output from compensation data:

return StepResponse(
    output={"transaction_id": txn.id},           # passed to next steps
    compensation_data={"transaction_id": txn.id}, # passed to rollback function
)

Permanent Failure

Skip retries and trigger compensation with partial cleanup data:

@saga_step(compensate="cleanup_records")
@app.task
def process_batch(**kwargs):
    processed = []
    for item in kwargs["items"]:
        if item.invalid:
            raise StepResponse.permanent_failure(
                "Invalid item found",
                compensation_data={"processed_ids": processed},
            )
        processed.append(do_work(item))
    return StepResponse(output={"processed": processed})

Idempotency

result = order_saga.run(
    order_id="abc-123",
    idempotency_key="order-abc-123",
)

State Backends

from celery_saga import set_default_backend
from celery_saga.backends import RedisSagaBackend, MemorySagaBackend

# Redis (production)
set_default_backend(RedisSagaBackend(url="redis://localhost:6379/0"))

# Memory (testing)
set_default_backend(MemorySagaBackend())

Saga Lifecycle

PENDING → RUNNING → COMPLETED                    (happy path)
                  ↘ COMPENSATING → COMPENSATED    (step failed, rollback succeeded)
                                 ↘ FAILED         (rollback also failed)

How It Works

Under the hood, saga.run():

  1. Creates a SagaExecution record in the state backend
  2. Builds a Celery chain with orchestrator tasks between each step
  3. Each orchestrator task records step results and merges output into context
  4. On failure, reads completed steps and dispatches a reverse compensation chain
  5. Compensation tasks run in reverse order of completion

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_saga-0.1.0.tar.gz (18.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_saga-0.1.0-py3-none-any.whl (16.7 kB view details)

Uploaded Python 3

File details

Details for the file celery_saga-0.1.0.tar.gz.

File metadata

  • Download URL: celery_saga-0.1.0.tar.gz
  • Upload date:
  • Size: 18.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.11

File hashes

Hashes for celery_saga-0.1.0.tar.gz
Algorithm Hash digest
SHA256 4eb09c32c24d221e12da17a97a7c2939ff438ba0a188067c64e8f88f2fb4a706
MD5 0bd7dc2199e354027077bf5165234d88
BLAKE2b-256 20d8772fa585335af3618866658cb7ebf22a719b1adc6f6b2e58b60ff766f26b

See more details on using hashes here.

File details

Details for the file celery_saga-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: celery_saga-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 16.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.11

File hashes

Hashes for celery_saga-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a227cf81efbc21267c5dc8f874e9cb951e9b096b24f00b1e5eec0378261e71d2
MD5 ab5e6d8beca5e06138110267b4a0bd56
BLAKE2b-256 fe8668a77c8db45f20753972220fbff2021d5110a5ed150d0dd780e7bbddfec1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page