Skip to main content

Distributed transaction saga orchestration for Spakky Framework

Project description

spakky-saga

Spakky Framework를 위한 분산 트랜잭션 사가 오케스트레이션입니다. @Saga@saga_step으로 보상 기반 workflow를 DI-managed application component로 선언합니다.

설치

pip install spakky-saga

주요 기능

  • @Saga stereotype: saga orchestrator class를 표시합니다(@Pod 확장).
  • AbstractSaga[SagaDataT]: flow() 추상 메서드와 execute() entry point를 가진 generic base class입니다.
  • @saga_step decorator: instance method에서 >>, &, | 연산자를 활성화하는 typed descriptor입니다.
  • Flow DSL: 연산자 또는 builder 함수(saga_flow, step, parallel)로 선언적 구성
  • Error strategy: Compensate(기본값), Skip, ExponentialBackoff을 포함한 Retry(max_attempts, backoff, then)을 제공합니다.
  • Timeout: step별 timeout=과 saga별 SagaFlow.timeout() 지원
  • 병렬 실행: asyncio.gather 기반 group(& operator / parallel())
  • 보상: 실패 시 commit된 step을 역순으로 자동 rollback
  • AuthContextSnapshot 전파: AbstractSagaData.auth_context_snapshot signed envelope를 실행, retry, parallel, compensation 동안 유지하고 protected saga step을 enforcement합니다.
  • SagaResult[T]: status, data, failed_step, error, history, elapsed를 담는 non-throwing result
  • 구조화 로깅: [saga=... step=... status=... elapsed=...ms] format

빠른 시작

SagaData 정의

from spakky.saga import AbstractSagaData


class OrderSagaData(AbstractSagaData):
    order_id: int
    customer_id: int
    ticket_id: int | None = None

Saga 정의

from spakky.saga import AbstractSaga, Saga, SagaFlow, saga_step


@Saga()
class CreateOrderSaga(AbstractSaga[OrderSagaData]):
    @saga_step
    async def issue_ticket(self, data: OrderSagaData) -> OrderSagaData:
        ...

    @saga_step
    async def cancel_ticket(self, data: OrderSagaData) -> None:
        ...

    @saga_step
    async def reserve_stock(self, data: OrderSagaData) -> OrderSagaData:
        ...

    def flow(self) -> SagaFlow[OrderSagaData]:
        return SagaFlow(
            items=(
                self.issue_ticket >> self.cancel_ticket,  # Transaction
                self.reserve_stock,                        # SagaStep (no compensation)
            )
        )

실행

result = await saga.execute(OrderSagaData(order_id=1, customer_id=42))
if result.status is SagaStatus.COMPLETED:
    ...

Protected Saga Step

from spakky.auth import protected, require_scope
from spakky.saga import SagaAuthExecutionContext


@Saga()
class ProtectedOrderSaga(AbstractSaga[OrderSagaData]):
    @saga_step
    @protected
    @require_scope("orders:write")
    async def issue_ticket(self, data: OrderSagaData) -> OrderSagaData:
        ...


result = await saga.execute(
    OrderSagaData(order_id=1, customer_id=42, auth_context_snapshot="signed-envelope"),
    auth_context=SagaAuthExecutionContext(
        snapshot_verifier=snapshot_verifier,
        scope_checker=scope_checker,
    ),
)

Protected step은 raw bearer token을 받지 않고 signed AuthContextSnapshot envelope를 검증합니다. missing, invalid, expired snapshot은 CHALLENGE decision으로 step failure가 되며, provider unavailable은 ERROR decision으로 기존 saga failure/compensation policy를 따릅니다.

Builder 함수 대안

from spakky.saga import Retry, parallel, saga_flow, step


flow = saga_flow(
    step(issue_ticket_fn, compensate=cancel_ticket_fn),
    parallel(reserve_stock_fn, charge_payment_fn),
    step(confirm_order_fn, on_error=Retry(max_attempts=3)),
)

Flow 연산자

연산자 의미 결과 타입
>> compensate 함수 바인딩 Transaction[T]
& 병렬 실행 Parallel[T]
| 에러 전략 부착 좌변과 동일한 타입 + on_error

Error Strategy

Strategy Signature 설명
Compensate() (기본값) 역순 compensation 실행
Skip() 실패를 무시하고 계속 진행
Retry(max_attempts, backoff, then) Retry(3, ExponentialBackoff(1.0), Compensate()) N회 재시도 후 then 전략 적용
ExponentialBackoff(base=1.0) retry 사이에 base * 2^(attempt-1) delay 적용

API 레퍼런스

Stereotype / Base

기호 설명
@Saga() saga orchestrator class용 stereotype(@Pod 확장)
AbstractSaga[SagaDataT] flow() 추상 메서드와 execute()를 가진 ABC 기반 클래스
@saga_step >>, &, `
AbstractSagaData base data model(@immutable + AbstractDomainModel, saga_id: UUID 자동 생성, auth_context_snapshot envelope 전파)
SagaAuthExecutionContext protected saga step enforcement에 사용할 snapshot verifier/checker port 묶음

Flow 타입

기호 설명
SagaFlow[T] 최상위 flow 정의(items, saga_timeout, compensation_failure_handler)
SagaStep[T] compensation 없는 단일 action
Transaction[T] action + compensate 쌍
Parallel[T] step/transaction 동시 실행 그룹
FlowItem[T] flow 구성 가능 item의 union
ActionFn[T] / CompensateFn[T] action / compensate callable용 type alias
SagaDataT AbstractSagaData에 bound된 TypeVar

Builder

함수 설명
saga_flow(*items) 순차 item으로 SagaFlow 생성
step(action, *, compensate=, on_error=, timeout=) SagaStep 또는 Transaction 생성
parallel(*items) Parallel group 생성(최소 2개 item 필요)

실행

기호 설명
run_saga_flow(flow, data, *, saga_name=) flow 실행 후 SagaResult 반환
AbstractSaga.execute(data) type(self).__name__을 사용하는 run_saga_flow 얇은 wrapper

Result 타입

기호 설명
SagaResult[T] status, data, failed_step, error, history, elapsed
StepRecord name, status, elapsed — per-step execution record
StepStatus COMMITTED, FAILED, COMPENSATED
SagaStatus STARTED, RUNNING, COMPENSATING, COMPLETED, FAILED, TIMED_OUT

에러

에러 설명
AbstractSpakkySagaError 모든 saga error의 ABC 기반 클래스
SagaFlowDefinitionError 유효하지 않은 saga flow 정의(정적 검증)
SagaCompensationFailedError rollback 중 compensation 실패
SagaStepTimeoutError step timeout 초과 시 내부에서 발생(on_error로 라우팅)
SagaParallelMergeConflictError 병렬 step이 data merge 중 같은 필드 변경
SagaEngineNotConnectedError saga engine 연결 전에 execute()가 호출됨

관련 문서

  • ADR-0007 — architecture decision record
  • spakky-domainAbstractSagaData의 부모인 AbstractDomainModel 제공

라이선스

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_saga-6.9.1.tar.gz (15.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_saga-6.9.1-py3-none-any.whl (21.1 kB view details)

Uploaded Python 3

File details

Details for the file spakky_saga-6.9.1.tar.gz.

File metadata

  • Download URL: spakky_saga-6.9.1.tar.gz
  • Upload date:
  • Size: 15.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_saga-6.9.1.tar.gz
Algorithm Hash digest
SHA256 492fae651be1c9bcc4029d4dcf66b94b6bddf8f7265ae97d39b9696340a80648
MD5 323dea516a27e69be046ef225af85a53
BLAKE2b-256 80a59c7fb0ba5bae5755e1fa8e3ee8d429ce829960092687b86ab22546c0bc5f

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_saga-6.9.1.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_saga-6.9.1-py3-none-any.whl.

File metadata

  • Download URL: spakky_saga-6.9.1-py3-none-any.whl
  • Upload date:
  • Size: 21.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_saga-6.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e81b0d8ccabea88284f6116d08646dc71db9a7fe57ab47cda1a8dcdb48d6b945
MD5 86a230da026d8b56bc6eadb63755acd0
BLAKE2b-256 4ecc10b7922c1050b135e2609ae87a784e66b241a1199a421a931d0ff5fce9f5

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_saga-6.9.1-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page