Skip to main content

Distributed transaction saga orchestration for Spakky Framework

Project description

spakky-saga

Spakky Framework를 위한 분산 트랜잭션 사가 오케스트레이션입니다.

설치

pip install spakky-saga

주요 기능

  • @Saga stereotype: saga orchestrator class를 표시합니다(@Pod 확장).
  • AbstractSaga[SagaDataT]: flow() 추상 메서드와 execute() entry point를 가진 generic base class입니다.
  • @saga_step decorator: instance method에서 >>, &, | 연산자를 활성화하는 typed descriptor입니다.
  • Flow DSL: 연산자 또는 builder 함수(saga_flow, step, parallel)로 선언적 구성
  • Error strategy: Compensate(기본값), Skip, ExponentialBackoff을 포함한 Retry(max_attempts, backoff, then)을 제공합니다.
  • Timeout: step별 timeout=과 saga별 SagaFlow.timeout() 지원
  • 병렬 실행: asyncio.gather 기반 group(& operator / parallel())
  • 보상: 실패 시 commit된 step을 역순으로 자동 rollback
  • SagaResult[T]: status, data, failed_step, error, history, elapsed를 담는 non-throwing result
  • 구조화 로깅: [saga=... step=... status=... elapsed=...ms] format

빠른 시작

SagaData 정의

from spakky.saga import AbstractSagaData


class OrderSagaData(AbstractSagaData):
    order_id: int
    customer_id: int
    ticket_id: int | None = None

Saga 정의

from spakky.saga import AbstractSaga, Saga, SagaFlow, saga_step


@Saga()
class CreateOrderSaga(AbstractSaga[OrderSagaData]):
    @saga_step
    async def issue_ticket(self, data: OrderSagaData) -> OrderSagaData:
        ...

    @saga_step
    async def cancel_ticket(self, data: OrderSagaData) -> None:
        ...

    @saga_step
    async def reserve_stock(self, data: OrderSagaData) -> OrderSagaData:
        ...

    def flow(self) -> SagaFlow[OrderSagaData]:
        return SagaFlow(
            items=(
                self.issue_ticket >> self.cancel_ticket,  # Transaction
                self.reserve_stock,                        # SagaStep (no compensation)
            )
        )

실행

result = await saga.execute(OrderSagaData(order_id=1, customer_id=42))
if result.status is SagaStatus.COMPLETED:
    ...

Builder 함수 대안

from spakky.saga import Retry, parallel, saga_flow, step


flow = saga_flow(
    step(issue_ticket_fn, compensate=cancel_ticket_fn),
    parallel(reserve_stock_fn, charge_payment_fn),
    step(confirm_order_fn, on_error=Retry(max_attempts=3)),
)

Flow 연산자

연산자 의미 결과 타입
>> compensate 함수 바인딩 Transaction[T]
& 병렬 실행 Parallel[T]
| 에러 전략 부착 좌변과 동일한 타입 + on_error

Error Strategy

Strategy Signature 설명
Compensate() (기본값) 역순 compensation 실행
Skip() 실패를 무시하고 계속 진행
Retry(max_attempts, backoff, then) Retry(3, ExponentialBackoff(1.0), Compensate()) N회 재시도 후 then 전략 적용
ExponentialBackoff(base=1.0) retry 사이에 base * 2^(attempt-1) delay 적용

API 레퍼런스

Stereotype / Base

기호 설명
@Saga() saga orchestrator class용 stereotype(@Pod 확장)
AbstractSaga[SagaDataT] flow() 추상 메서드와 execute()를 가진 ABC 기반 클래스
@saga_step >>, &, `
AbstractSagaData base data model(@immutable + AbstractDomainModel, saga_id: UUID 자동 생성)

Flow 타입

기호 설명
SagaFlow[T] 최상위 flow 정의(items, saga_timeout, compensation_failure_handler)
SagaStep[T] compensation 없는 단일 action
Transaction[T] action + compensate 쌍
Parallel[T] step/transaction 동시 실행 그룹
FlowItem[T] flow 구성 가능 item의 union
ActionFn[T] / CompensateFn[T] action / compensate callable용 type alias
SagaDataT AbstractSagaData에 bound된 TypeVar

Builder

함수 설명
saga_flow(*items) 순차 item으로 SagaFlow 생성
step(action, *, compensate=, on_error=, timeout=) SagaStep 또는 Transaction 생성
parallel(*items) Parallel group 생성(최소 2개 item 필요)

실행

기호 설명
run_saga_flow(flow, data, *, saga_name=) flow 실행 후 SagaResult 반환
AbstractSaga.execute(data) type(self).__name__을 사용하는 run_saga_flow 얇은 wrapper

Result 타입

기호 설명
SagaResult[T] status, data, failed_step, error, history, elapsed
StepRecord name, status, elapsed — per-step execution record
StepStatus COMMITTED, FAILED, COMPENSATED
SagaStatus STARTED, RUNNING, COMPENSATING, COMPLETED, FAILED, TIMED_OUT

에러

에러 설명
AbstractSpakkySagaError 모든 saga error의 ABC 기반 클래스
SagaFlowDefinitionError 유효하지 않은 saga flow 정의(정적 검증)
SagaCompensationFailedError rollback 중 compensation 실패
SagaStepTimeoutError step timeout 초과 시 내부에서 발생(on_error로 라우팅)
SagaParallelMergeConflictError 병렬 step이 data merge 중 같은 필드 변경
SagaEngineNotConnectedError saga engine 연결 전에 execute()가 호출됨

관련 문서

  • ADR-0007 — architecture decision record
  • spakky-domainAbstractSagaData의 부모인 AbstractDomainModel 제공

라이선스

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_saga-6.5.0.tar.gz (12.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_saga-6.5.0-py3-none-any.whl (17.8 kB view details)

Uploaded Python 3

File details

Details for the file spakky_saga-6.5.0.tar.gz.

File metadata

  • Download URL: spakky_saga-6.5.0.tar.gz
  • Upload date:
  • Size: 12.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_saga-6.5.0.tar.gz
Algorithm Hash digest
SHA256 0bad76535a95fbb689f85eefa23bf969287c8ded6b331743f447d4725df3baf0
MD5 5115d995b0bb5f10b0a86bf469cc7af4
BLAKE2b-256 ca7a6a79307b517e9f228a8cc1da7ee7527cd469327053a03df690541edaca5a

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_saga-6.5.0.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_saga-6.5.0-py3-none-any.whl.

File metadata

  • Download URL: spakky_saga-6.5.0-py3-none-any.whl
  • Upload date:
  • Size: 17.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_saga-6.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 853318b242f74415ad919f017cad911abd6c6457a5d5e76db482cf625ffbdbe5
MD5 f90a31451c34e6722476eb306bca5680
BLAKE2b-256 285a95c09bc1977efc54ff817e8b69d06b7428b74a9656ba2ba6e819aadef85b

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_saga-6.5.0-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page