Skip to main content

Transactional Outbox pattern plugin for Spakky Framework

Project description

spakky-outbox

Transactional Outbox pattern plugin for Spakky Framework.

Installation

pip install spakky-outbox spakky-outbox-sqlalchemy

Note: spakky-outbox provides the core abstractions; you must also install a storage implementation like spakky-outbox-sqlalchemy.

Features

  • Transactional Outbox: Events are saved atomically with business data
  • Automatic relay: Background relay publishes events to external transports (Kafka, RabbitMQ)
  • Retry support: Failed messages are retried with configurable limits
  • Multi-instance safe: Atomic claim prevents duplicate publishing

Usage

1. Load plugins in your application

from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext

app = (
    SpakkyApplication(ApplicationContext())
    .load_plugins()  # Loads outbox and outbox-sqlalchemy plugins automatically
    .scan()
    .start()
)

2. Publish events from use cases

Events published via IAsyncEventPublisher are automatically routed:

  • AbstractDomainEvent → in-process dispatch
  • AbstractIntegrationEventIEventBus (Outbox intercepts via @Primary)
from spakky.core.common.mutability import immutable
from spakky.core.stereotype.usecase import UseCase
from spakky.data.aspects.transactional import Transactional
from spakky.domain.models.event import AbstractIntegrationEvent
from spakky.event.event_publisher import IAsyncEventPublisher


@immutable
class OrderCreatedEvent(AbstractIntegrationEvent):
    order_id: int
    customer_id: int


@UseCase()
class CreateOrderUseCase:
    def __init__(self, event_publisher: IAsyncEventPublisher) -> None:
        self._event_publisher = event_publisher

    @Transactional()
    async def execute(self, command: CreateOrderCommand) -> Order:
        order = Order.create(...)
        # Event is saved in the same transaction as the order
        await self._event_publisher.publish(
            OrderCreatedEvent(order_id=order.id, customer_id=command.customer_id)
        )
        return order

3. Configure via environment variables

Variable Default Description
SPAKKY_OUTBOX__POLLING_INTERVAL_SECONDS 1.0 Relay polling interval
SPAKKY_OUTBOX__BATCH_SIZE 100 Messages per batch
SPAKKY_OUTBOX__MAX_RETRY_COUNT 5 Max retries before giving up
SPAKKY_OUTBOX__CLAIM_TIMEOUT_SECONDS 300.0 Claim expiry for crash recovery

Components

Component Description
IOutboxStorage / IAsyncOutboxStorage Outbox message storage port
OutboxEventBus / AsyncOutboxEventBus Event bus seam for Outbox pattern (@Primary replaces DirectEventBus)
OutboxRelayBackgroundService / AsyncOutboxRelayBackgroundService Background relay service (polls & sends)
OutboxConfig Configuration via environment variables
OutboxMessage Outbox message model

Custom Storage Implementation

To implement a custom storage backend:

from spakky.plugins.outbox.ports.storage import IAsyncOutboxStorage
from spakky.plugins.outbox.common.message import OutboxMessage

class MyCustomStorage(IAsyncOutboxStorage):
    async def save(self, message: OutboxMessage) -> None:
        # Save within the current transaction
        ...

    async def fetch_pending(self, limit: int, max_retry: int) -> list[OutboxMessage]:
        # Atomic claim and return pending messages
        ...

    async def mark_published(self, message_id: UUID) -> None:
        ...

    async def increment_retry(self, message_id: UUID) -> None:
        ...

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_outbox-6.0.0.tar.gz (4.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_outbox-6.0.0-py3-none-any.whl (9.2 kB view details)

Uploaded Python 3

File details

Details for the file spakky_outbox-6.0.0.tar.gz.

File metadata

  • Download URL: spakky_outbox-6.0.0.tar.gz
  • Upload date:
  • Size: 4.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spakky_outbox-6.0.0.tar.gz
Algorithm Hash digest
SHA256 da812de43211cda93fef8d9114ee51974f14dc276b303c34aae2602ad938542b
MD5 b04ce2e6a80bb3933e703657a0c482c8
BLAKE2b-256 8f8e95eb67b51adde5328d1e3a8d3af6e812db85cfb6d53850641542485e9853

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_outbox-6.0.0.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_outbox-6.0.0-py3-none-any.whl.

File metadata

  • Download URL: spakky_outbox-6.0.0-py3-none-any.whl
  • Upload date:
  • Size: 9.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spakky_outbox-6.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 68f92240726b28cfc553484cdd974fac3c428415ef05d31b046f187018697186
MD5 0324b8a1c52d33162f161b4a64c0cccf
BLAKE2b-256 66b18db0f6e6f3dbc849274eaaef284bc467477efffeac431c8b27d2c523d448

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_outbox-6.0.0-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page