Skip to main content

Transactional Outbox pattern plugin for Spakky Framework

Project description

spakky-outbox

Transactional Outbox pattern plugin for Spakky Framework.

Installation

pip install spakky-outbox spakky-sqlalchemy

Note: spakky-outbox provides the core abstractions; spakky-sqlalchemy automatically detects and registers the Outbox storage implementation when both packages are installed.

Features

  • Transactional Outbox: Events are saved atomically with business data
  • Automatic relay: Background relay publishes events to external transports (Kafka, RabbitMQ)
  • Retry support: Failed messages are retried with configurable limits
  • Multi-instance safe: Atomic claim prevents duplicate publishing

Usage

1. Load plugins in your application

from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext

app = (
    SpakkyApplication(ApplicationContext())
    .load_plugins()  # Loads outbox and sqlalchemy plugins automatically
    .scan()
    .start()
)

2. Publish events from use cases

Events published via IAsyncEventPublisher are automatically routed:

  • AbstractDomainEvent → in-process dispatch
  • AbstractIntegrationEventIEventBus (Outbox intercepts via @Primary)
from spakky.core.common.mutability import immutable
from spakky.core.stereotype.usecase import UseCase
from spakky.data.aspects.transactional import Transactional
from spakky.domain.models.event import AbstractIntegrationEvent
from spakky.event.event_publisher import IAsyncEventPublisher


@immutable
class OrderCreatedEvent(AbstractIntegrationEvent):
    order_id: int
    customer_id: int


@UseCase()
class CreateOrderUseCase:
    def __init__(self, event_publisher: IAsyncEventPublisher) -> None:
        self._event_publisher = event_publisher

    @Transactional()
    async def execute(self, command: CreateOrderCommand) -> Order:
        order = Order.create(...)
        # Event is saved in the same transaction as the order
        await self._event_publisher.publish(
            OrderCreatedEvent(order_id=order.id, customer_id=command.customer_id)
        )
        return order

3. Configure via environment variables

Variable Default Description
SPAKKY_OUTBOX__POLLING_INTERVAL_SECONDS 1.0 Relay polling interval
SPAKKY_OUTBOX__BATCH_SIZE 100 Messages per batch
SPAKKY_OUTBOX__MAX_RETRY_COUNT 5 Max retries before giving up
SPAKKY_OUTBOX__CLAIM_TIMEOUT_SECONDS 300.0 Claim expiry for crash recovery

Components

Component Description
IOutboxStorage / IAsyncOutboxStorage Outbox message storage port
OutboxEventBus / AsyncOutboxEventBus Event bus seam for Outbox pattern (@Primary replaces DirectEventBus)
OutboxRelayBackgroundService / AsyncOutboxRelayBackgroundService Background relay service (polls & sends)
OutboxConfig Configuration via environment variables
OutboxMessage Outbox message model

Custom Storage Implementation

To implement a custom storage backend:

from spakky.outbox.ports.storage import IAsyncOutboxStorage
from spakky.outbox.common.message import OutboxMessage

class MyCustomStorage(IAsyncOutboxStorage):
    async def save(self, message: OutboxMessage) -> None:
        # Save within the current transaction
        ...

    async def fetch_pending(self, limit: int, max_retry: int) -> list[OutboxMessage]:
        # Atomic claim and return pending messages
        ...

    async def mark_published(self, message_id: UUID) -> None:
        ...

    async def increment_retry(self, message_id: UUID) -> None:
        ...

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_outbox-6.1.3.tar.gz (5.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_outbox-6.1.3-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file spakky_outbox-6.1.3.tar.gz.

File metadata

  • Download URL: spakky_outbox-6.1.3.tar.gz
  • Upload date:
  • Size: 5.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spakky_outbox-6.1.3.tar.gz
Algorithm Hash digest
SHA256 c2e7c259686685903fc7d05f57178b8b60cb90e390a5f19faca03836eef51c6f
MD5 ba29a9db730eacce18a3439398250fb8
BLAKE2b-256 a2370b432dd108431aac966944d9d1f4e98c7a24fafa94e6aa9e8613312bc6be

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_outbox-6.1.3.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_outbox-6.1.3-py3-none-any.whl.

File metadata

  • Download URL: spakky_outbox-6.1.3-py3-none-any.whl
  • Upload date:
  • Size: 8.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spakky_outbox-6.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 5a4cdb0099331e45963248c4e44abe79d29d7144c601a8ed2e2af306156438dc
MD5 3c6ca77038092fd286debfa18b3869b9
BLAKE2b-256 7018183bf65f3a3ab20246c86adbc1a3b1ddae80324c2a2957bbc4b6fa9130db

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_outbox-6.1.3-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page