Distributed transaction saga orchestration for Spakky Framework
Project description
spakky-saga
Distributed transaction saga orchestration for Spakky Framework.
Installation
pip install spakky-saga
Features
@Sagastereotype: Marks saga orchestrator classes (extends@Pod)AbstractSaga[SagaDataT]: Generic base class withflow()abstract method andexecute()entry point@saga_stepdecorator: Typed descriptor that enables>>,&,|operators on instance methods- Flow DSL: Declarative composition via operators or builder functions (
saga_flow,step,parallel) - Error strategies:
Compensate(default),Skip,Retry(max_attempts, backoff, then)withExponentialBackoff - Timeouts: Per-step (
timeout=) and per-saga (SagaFlow.timeout()) - Parallel execution:
asyncio.gatherbased groups (&operator /parallel()) - Compensation: Automatic reverse-order rollback of committed steps on failure
SagaResult[T]: Non-throwing result withstatus,data,failed_step,error,history,elapsed- Structured logging:
[saga=... step=... status=... elapsed=...ms]format
Quick Start
Define Saga Data
from spakky.saga import AbstractSagaData
class OrderSagaData(AbstractSagaData):
order_id: int
customer_id: int
ticket_id: int | None = None
Define a Saga
from spakky.saga import AbstractSaga, Saga, SagaFlow, saga_step
@Saga()
class CreateOrderSaga(AbstractSaga[OrderSagaData]):
@saga_step
async def issue_ticket(self, data: OrderSagaData) -> OrderSagaData:
...
@saga_step
async def cancel_ticket(self, data: OrderSagaData) -> None:
...
@saga_step
async def reserve_stock(self, data: OrderSagaData) -> OrderSagaData:
...
def flow(self) -> SagaFlow[OrderSagaData]:
return SagaFlow(
items=(
self.issue_ticket >> self.cancel_ticket, # Transaction
self.reserve_stock, # SagaStep (no compensation)
)
)
Execute
result = await saga.execute(OrderSagaData(order_id=1, customer_id=42))
if result.status is SagaStatus.COMPLETED:
...
Builder Function Alternative
from spakky.saga import Retry, parallel, saga_flow, step
flow = saga_flow(
step(issue_ticket_fn, compensate=cancel_ticket_fn),
parallel(reserve_stock_fn, charge_payment_fn),
step(confirm_order_fn, on_error=Retry(max_attempts=3)),
)
Flow Operators
| Operator | Meaning | Result Type |
|---|---|---|
>> |
Bind compensate function | Transaction[T] |
& |
Parallel execution | Parallel[T] |
| |
Attach error strategy | Same as LHS + on_error |
Error Strategies
| Strategy | Signature | Description |
|---|---|---|
Compensate() |
(default) | Trigger reverse-order compensation |
Skip() |
— | Ignore failure and continue |
Retry(max_attempts, backoff, then) |
Retry(3, ExponentialBackoff(1.0), Compensate()) |
Retry N times, then apply then strategy |
ExponentialBackoff(base=1.0) |
— | base * 2^(attempt-1) delay between retries |
API Reference
Stereotype / Base
| Symbol | Description |
|---|---|
@Saga() |
Stereotype for saga orchestrator classes (extends @Pod) |
AbstractSaga[SagaDataT] |
ABC base with flow() abstract method and execute() |
@saga_step |
Descriptor decorator enabling >>, &, | operators |
AbstractSagaData |
Base data model (@immutable + AbstractDomainModel, auto-generates saga_id: UUID) |
Flow Types
| Symbol | Description |
|---|---|
SagaFlow[T] |
Top-level flow definition (items, saga_timeout, compensation_failure_handler) |
SagaStep[T] |
Single action without compensation |
Transaction[T] |
Action + compensate pair |
Parallel[T] |
Concurrent group of steps/transactions |
FlowItem[T] |
Union of flow-composable items |
ActionFn[T] / CompensateFn[T] |
Type aliases for action / compensate callables |
SagaDataT |
TypeVar bound to AbstractSagaData |
Builders
| Function | Description |
|---|---|
saga_flow(*items) |
Construct a SagaFlow from sequential items |
step(action, *, compensate=, on_error=, timeout=) |
Build SagaStep or Transaction |
parallel(*items) |
Build a Parallel group (requires ≥ 2 items) |
Execution
| Symbol | Description |
|---|---|
run_saga_flow(flow, data, *, saga_name=) |
Execute a flow; returns SagaResult |
AbstractSaga.execute(data) |
Thin wrapper over run_saga_flow using type(self).__name__ |
Result Types
| Symbol | Description |
|---|---|
SagaResult[T] |
status, data, failed_step, error, history, elapsed |
StepRecord |
name, status, elapsed — per-step execution record |
StepStatus |
COMMITTED, FAILED, COMPENSATED |
SagaStatus |
STARTED, RUNNING, COMPENSATING, COMPLETED, FAILED, TIMED_OUT |
Errors
| Error | Description |
|---|---|
AbstractSpakkySagaError |
ABC base for all saga errors |
SagaFlowDefinitionError |
Invalid saga flow definition (static validation) |
SagaCompensationFailedError |
Compensation failed during rollback |
SagaStepTimeoutError |
Raised internally when a step exceeds its timeout (routed through on_error) |
SagaParallelMergeConflictError |
Parallel steps modified the same field during data merge |
SagaEngineNotConnectedError |
execute() called before the saga engine is connected |
Related
- ADR-0007 — Architecture decision record
spakky-domain— providesAbstractDomainModel(parent ofAbstractSagaData)
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file spakky_saga-6.3.1.tar.gz.
File metadata
- Download URL: spakky_saga-6.3.1.tar.gz
- Upload date:
- Size: 12.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ed2a5ce2660e25402fb4eb9345077476c605bf9cd4a102709d55e302e6272bfb
|
|
| MD5 |
0ed01ebd940d78a87635e4ce6c3a27e7
|
|
| BLAKE2b-256 |
f0d58346b542e61c3f82741f4d0062e42116d74f12b46448e078bca982fbdfe8
|
Provenance
The following attestation bundles were made for spakky_saga-6.3.1.tar.gz:
Publisher:
publish-package.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_saga-6.3.1.tar.gz -
Subject digest:
ed2a5ce2660e25402fb4eb9345077476c605bf9cd4a102709d55e302e6272bfb - Sigstore transparency entry: 1281010625
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@19f6ea7b0409c33282354d1dce313e0b7399f9f1 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-package.yml@19f6ea7b0409c33282354d1dce313e0b7399f9f1 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file spakky_saga-6.3.1-py3-none-any.whl.
File metadata
- Download URL: spakky_saga-6.3.1-py3-none-any.whl
- Upload date:
- Size: 17.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
376917b3ce1564d043cccb05c3833eab1f1e40d13578bb0f51a601a65a577e99
|
|
| MD5 |
f4bed6ad4df25afb07506b9c8c324978
|
|
| BLAKE2b-256 |
7a0143aa8ac828c9aa854ca1917cab693c49bd404b5f09a16e42cb2e0926c969
|
Provenance
The following attestation bundles were made for spakky_saga-6.3.1-py3-none-any.whl:
Publisher:
publish-package.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_saga-6.3.1-py3-none-any.whl -
Subject digest:
376917b3ce1564d043cccb05c3833eab1f1e40d13578bb0f51a601a65a577e99 - Sigstore transparency entry: 1281010637
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@19f6ea7b0409c33282354d1dce313e0b7399f9f1 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-package.yml@19f6ea7b0409c33282354d1dce313e0b7399f9f1 -
Trigger Event:
workflow_dispatch
-
Statement type: