Skip to main content

Distributed transaction saga orchestration for Spakky Framework

Project description

spakky-saga

Spakky Framework를 위한 분산 트랜잭션 사가 오케스트레이션입니다. @Saga@saga_step으로 보상 기반 workflow를 DI-managed application component로 선언합니다.

설치

pip install spakky-saga

주요 기능

  • @Saga stereotype: saga orchestrator class를 표시합니다(@Pod 확장).
  • AbstractSaga[SagaDataT]: flow() 추상 메서드와 execute() entry point를 가진 generic base class입니다.
  • @saga_step decorator: instance method에서 >>, &, | 연산자를 활성화하는 typed descriptor입니다.
  • Flow DSL: 연산자 또는 builder 함수(saga_flow, step, parallel)로 선언적 구성
  • Error strategy: Compensate(기본값), Skip, ExponentialBackoff을 포함한 Retry(max_attempts, backoff, then)을 제공합니다.
  • Timeout: step별 timeout=과 saga별 SagaFlow.timeout() 지원
  • 병렬 실행: asyncio.gather 기반 group(& operator / parallel())
  • 보상: 실패 시 commit된 step을 역순으로 자동 rollback
  • AuthContextSnapshot 전파: AbstractSagaData.auth_context_snapshot signed envelope를 실행, retry, parallel, compensation 동안 유지하고 protected saga step을 enforcement합니다.
  • SagaResult[T]: status, data, failed_step, error, history, elapsed를 담는 non-throwing result
  • 구조화 로깅: [saga=... step=... status=... elapsed=...ms] format

빠른 시작

SagaData 정의

from spakky.saga import AbstractSagaData


class OrderSagaData(AbstractSagaData):
    order_id: int
    customer_id: int
    ticket_id: int | None = None

Saga 정의

from spakky.saga import AbstractSaga, Saga, SagaFlow, saga_step


@Saga()
class CreateOrderSaga(AbstractSaga[OrderSagaData]):
    @saga_step
    async def issue_ticket(self, data: OrderSagaData) -> OrderSagaData:
        ...

    @saga_step
    async def cancel_ticket(self, data: OrderSagaData) -> None:
        ...

    @saga_step
    async def reserve_stock(self, data: OrderSagaData) -> OrderSagaData:
        ...

    def flow(self) -> SagaFlow[OrderSagaData]:
        return SagaFlow(
            items=(
                self.issue_ticket >> self.cancel_ticket,  # Transaction
                self.reserve_stock,                        # SagaStep (no compensation)
            )
        )

실행

result = await saga.execute(OrderSagaData(order_id=1, customer_id=42))
if result.status is SagaStatus.COMPLETED:
    ...

Protected Saga Step

from spakky.auth import protected, require_scope
from spakky.saga import SagaAuthExecutionContext


@Saga()
class ProtectedOrderSaga(AbstractSaga[OrderSagaData]):
    @saga_step
    @protected
    @require_scope("orders:write")
    async def issue_ticket(self, data: OrderSagaData) -> OrderSagaData:
        ...


result = await saga.execute(
    OrderSagaData(order_id=1, customer_id=42, auth_context_snapshot="signed-envelope"),
    auth_context=SagaAuthExecutionContext(
        snapshot_verifier=snapshot_verifier,
        scope_checker=scope_checker,
    ),
)

Protected step은 raw bearer token을 받지 않고 signed AuthContextSnapshot envelope를 검증합니다. missing, invalid, expired snapshot은 CHALLENGE decision으로 step failure가 되며, provider unavailable은 ERROR decision으로 기존 saga failure/compensation policy를 따릅니다.

Builder 함수 대안

from spakky.saga import Retry, parallel, saga_flow, step


flow = saga_flow(
    step(issue_ticket_fn, compensate=cancel_ticket_fn),
    parallel(reserve_stock_fn, charge_payment_fn),
    step(confirm_order_fn, on_error=Retry(max_attempts=3)),
)

Flow 연산자

연산자 의미 결과 타입
>> compensate 함수 바인딩 Transaction[T]
& 병렬 실행 Parallel[T]
| 에러 전략 부착 좌변과 동일한 타입 + on_error

Error Strategy

Strategy Signature 설명
Compensate() (기본값) 역순 compensation 실행
Skip() 실패를 무시하고 계속 진행
Retry(max_attempts, backoff, then) Retry(3, ExponentialBackoff(1.0), Compensate()) N회 재시도 후 then 전략 적용
ExponentialBackoff(base=1.0) retry 사이에 base * 2^(attempt-1) delay 적용

API 레퍼런스

Stereotype / Base

기호 설명
@Saga() saga orchestrator class용 stereotype(@Pod 확장)
AbstractSaga[SagaDataT] flow() 추상 메서드와 execute()를 가진 ABC 기반 클래스
@saga_step >>, &, `
AbstractSagaData base data model(@immutable + AbstractDomainModel, saga_id: UUID 자동 생성, auth_context_snapshot envelope 전파)
SagaAuthExecutionContext protected saga step enforcement에 사용할 snapshot verifier/checker port 묶음

Flow 타입

기호 설명
SagaFlow[T] 최상위 flow 정의(items, saga_timeout, compensation_failure_handler)
SagaStep[T] compensation 없는 단일 action
Transaction[T] action + compensate 쌍
Parallel[T] step/transaction 동시 실행 그룹
FlowItem[T] flow 구성 가능 item의 union
ActionFn[T] / CompensateFn[T] action / compensate callable용 type alias
SagaDataT AbstractSagaData에 bound된 TypeVar

Builder

함수 설명
saga_flow(*items) 순차 item으로 SagaFlow 생성
step(action, *, compensate=, on_error=, timeout=) SagaStep 또는 Transaction 생성
parallel(*items) Parallel group 생성(최소 2개 item 필요)

실행

기호 설명
run_saga_flow(flow, data, *, saga_name=) flow 실행 후 SagaResult 반환
AbstractSaga.execute(data) type(self).__name__을 사용하는 run_saga_flow 얇은 wrapper

Result 타입

기호 설명
SagaResult[T] status, data, failed_step, error, history, elapsed
StepRecord name, status, elapsed — per-step execution record
StepStatus COMMITTED, FAILED, COMPENSATED
SagaStatus STARTED, RUNNING, COMPENSATING, COMPLETED, FAILED, TIMED_OUT

에러

에러 설명
AbstractSpakkySagaError 모든 saga error의 ABC 기반 클래스
SagaFlowDefinitionError 유효하지 않은 saga flow 정의(정적 검증)
SagaCompensationFailedError rollback 중 compensation 실패
SagaStepTimeoutError step timeout 초과 시 내부에서 발생(on_error로 라우팅)
SagaParallelMergeConflictError 병렬 step이 data merge 중 같은 필드 변경
SagaEngineNotConnectedError saga engine 연결 전에 execute()가 호출됨

관련 문서

  • ADR-0007 — architecture decision record
  • spakky-domainAbstractSagaData의 부모인 AbstractDomainModel 제공

라이선스

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_saga-6.8.0.tar.gz (15.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_saga-6.8.0-py3-none-any.whl (21.1 kB view details)

Uploaded Python 3

File details

Details for the file spakky_saga-6.8.0.tar.gz.

File metadata

  • Download URL: spakky_saga-6.8.0.tar.gz
  • Upload date:
  • Size: 15.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_saga-6.8.0.tar.gz
Algorithm Hash digest
SHA256 b24702354f98ab1661141153105e410262f5aa9e158b885e1bc2c3dbf03f9597
MD5 7d1adf0690f0b72728280e043d17da3f
BLAKE2b-256 ae971e570718373602dd86140710783b92c095de5143a67d1f3759217f813e55

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_saga-6.8.0.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_saga-6.8.0-py3-none-any.whl.

File metadata

  • Download URL: spakky_saga-6.8.0-py3-none-any.whl
  • Upload date:
  • Size: 21.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_saga-6.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e7f5c7522ff5dcaf9ca69fb379f979bfb6ed2af2be98526c38eb78a6c5569609
MD5 d4e27373a0e0689ed743366ee43aa5ba
BLAKE2b-256 6d235e4ffa6dea3bec2b9cee8b3ce11906c6b9ffe985dea1f1d5adc97bbb3b01

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_saga-6.8.0-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page