Distributed transaction saga orchestration for Spakky Framework
Project description
spakky-saga
Spakky Framework를 위한 분산 트랜잭션 사가 오케스트레이션입니다.
설치
pip install spakky-saga
주요 기능
@Sagastereotype: saga orchestrator class를 표시합니다(@Pod확장).AbstractSaga[SagaDataT]:flow()추상 메서드와execute()entry point를 가진 generic base class입니다.@saga_stepdecorator: instance method에서>>,&,|연산자를 활성화하는 typed descriptor입니다.- Flow DSL: 연산자 또는 builder 함수(
saga_flow,step,parallel)로 선언적 구성 - Error strategy:
Compensate(기본값),Skip,ExponentialBackoff을 포함한Retry(max_attempts, backoff, then)을 제공합니다. - Timeout: step별
timeout=과 saga별SagaFlow.timeout()지원 - 병렬 실행:
asyncio.gather기반 group(&operator /parallel()) - 보상: 실패 시 commit된 step을 역순으로 자동 rollback
SagaResult[T]:status,data,failed_step,error,history,elapsed를 담는 non-throwing result- 구조화 로깅:
[saga=... step=... status=... elapsed=...ms]format
빠른 시작
SagaData 정의
from spakky.saga import AbstractSagaData
class OrderSagaData(AbstractSagaData):
order_id: int
customer_id: int
ticket_id: int | None = None
Saga 정의
from spakky.saga import AbstractSaga, Saga, SagaFlow, saga_step
@Saga()
class CreateOrderSaga(AbstractSaga[OrderSagaData]):
@saga_step
async def issue_ticket(self, data: OrderSagaData) -> OrderSagaData:
...
@saga_step
async def cancel_ticket(self, data: OrderSagaData) -> None:
...
@saga_step
async def reserve_stock(self, data: OrderSagaData) -> OrderSagaData:
...
def flow(self) -> SagaFlow[OrderSagaData]:
return SagaFlow(
items=(
self.issue_ticket >> self.cancel_ticket, # Transaction
self.reserve_stock, # SagaStep (no compensation)
)
)
실행
result = await saga.execute(OrderSagaData(order_id=1, customer_id=42))
if result.status is SagaStatus.COMPLETED:
...
Builder 함수 대안
from spakky.saga import Retry, parallel, saga_flow, step
flow = saga_flow(
step(issue_ticket_fn, compensate=cancel_ticket_fn),
parallel(reserve_stock_fn, charge_payment_fn),
step(confirm_order_fn, on_error=Retry(max_attempts=3)),
)
Flow 연산자
| 연산자 | 의미 | 결과 타입 |
|---|---|---|
>> |
compensate 함수 바인딩 | Transaction[T] |
& |
병렬 실행 | Parallel[T] |
| |
에러 전략 부착 | 좌변과 동일한 타입 + on_error |
Error Strategy
| Strategy | Signature | 설명 |
|---|---|---|
Compensate() |
(기본값) | 역순 compensation 실행 |
Skip() |
— | 실패를 무시하고 계속 진행 |
Retry(max_attempts, backoff, then) |
Retry(3, ExponentialBackoff(1.0), Compensate()) |
N회 재시도 후 then 전략 적용 |
ExponentialBackoff(base=1.0) |
— | retry 사이에 base * 2^(attempt-1) delay 적용 |
API 레퍼런스
Stereotype / Base
| 기호 | 설명 |
|---|---|
@Saga() |
saga orchestrator class용 stereotype(@Pod 확장) |
AbstractSaga[SagaDataT] |
flow() 추상 메서드와 execute()를 가진 ABC 기반 클래스 |
@saga_step |
>>, &, ` |
AbstractSagaData |
base data model(@immutable + AbstractDomainModel, saga_id: UUID 자동 생성) |
Flow 타입
| 기호 | 설명 |
|---|---|
SagaFlow[T] |
최상위 flow 정의(items, saga_timeout, compensation_failure_handler) |
SagaStep[T] |
compensation 없는 단일 action |
Transaction[T] |
action + compensate 쌍 |
Parallel[T] |
step/transaction 동시 실행 그룹 |
FlowItem[T] |
flow 구성 가능 item의 union |
ActionFn[T] / CompensateFn[T] |
action / compensate callable용 type alias |
SagaDataT |
AbstractSagaData에 bound된 TypeVar |
Builder
| 함수 | 설명 |
|---|---|
saga_flow(*items) |
순차 item으로 SagaFlow 생성 |
step(action, *, compensate=, on_error=, timeout=) |
SagaStep 또는 Transaction 생성 |
parallel(*items) |
Parallel group 생성(최소 2개 item 필요) |
실행
| 기호 | 설명 |
|---|---|
run_saga_flow(flow, data, *, saga_name=) |
flow 실행 후 SagaResult 반환 |
AbstractSaga.execute(data) |
type(self).__name__을 사용하는 run_saga_flow 얇은 wrapper |
Result 타입
| 기호 | 설명 |
|---|---|
SagaResult[T] |
status, data, failed_step, error, history, elapsed |
StepRecord |
name, status, elapsed — per-step execution record |
StepStatus |
COMMITTED, FAILED, COMPENSATED |
SagaStatus |
STARTED, RUNNING, COMPENSATING, COMPLETED, FAILED, TIMED_OUT |
에러
| 에러 | 설명 |
|---|---|
AbstractSpakkySagaError |
모든 saga error의 ABC 기반 클래스 |
SagaFlowDefinitionError |
유효하지 않은 saga flow 정의(정적 검증) |
SagaCompensationFailedError |
rollback 중 compensation 실패 |
SagaStepTimeoutError |
step timeout 초과 시 내부에서 발생(on_error로 라우팅) |
SagaParallelMergeConflictError |
병렬 step이 data merge 중 같은 필드 변경 |
SagaEngineNotConnectedError |
saga engine 연결 전에 execute()가 호출됨 |
관련 문서
- ADR-0007 — architecture decision record
spakky-domain—AbstractSagaData의 부모인AbstractDomainModel제공
라이선스
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file spakky_saga-6.5.0.tar.gz.
File metadata
- Download URL: spakky_saga-6.5.0.tar.gz
- Upload date:
- Size: 12.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0bad76535a95fbb689f85eefa23bf969287c8ded6b331743f447d4725df3baf0
|
|
| MD5 |
5115d995b0bb5f10b0a86bf469cc7af4
|
|
| BLAKE2b-256 |
ca7a6a79307b517e9f228a8cc1da7ee7527cd469327053a03df690541edaca5a
|
Provenance
The following attestation bundles were made for spakky_saga-6.5.0.tar.gz:
Publisher:
release.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_saga-6.5.0.tar.gz -
Subject digest:
0bad76535a95fbb689f85eefa23bf969287c8ded6b331743f447d4725df3baf0 - Sigstore transparency entry: 1437042352
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@86c1d43bdc948ac432f27efeeeb2b56692d18ee4 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@86c1d43bdc948ac432f27efeeeb2b56692d18ee4 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file spakky_saga-6.5.0-py3-none-any.whl.
File metadata
- Download URL: spakky_saga-6.5.0-py3-none-any.whl
- Upload date:
- Size: 17.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
853318b242f74415ad919f017cad911abd6c6457a5d5e76db482cf625ffbdbe5
|
|
| MD5 |
f90a31451c34e6722476eb306bca5680
|
|
| BLAKE2b-256 |
285a95c09bc1977efc54ff817e8b69d06b7428b74a9656ba2ba6e819aadef85b
|
Provenance
The following attestation bundles were made for spakky_saga-6.5.0-py3-none-any.whl:
Publisher:
release.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_saga-6.5.0-py3-none-any.whl -
Subject digest:
853318b242f74415ad919f017cad911abd6c6457a5d5e76db482cf625ffbdbe5 - Sigstore transparency entry: 1437042355
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@86c1d43bdc948ac432f27efeeeb2b56692d18ee4 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@86c1d43bdc948ac432f27efeeeb2b56692d18ee4 -
Trigger Event:
workflow_dispatch
-
Statement type: