Skip to main content

Generic outbox worker for FastStream and custom repositories

Project description

outbox-worker

A generic asynchronous Outbox Worker for publishing events using FastStream (RabbitMQ) and custom repositories. Supports payload validation, handler routing, retry policies, and dead-letter queues.

Installation

Install from PyPI:

pip install outbox-worker

Or install the latest development version from GitHub:

pip install git+https://github.com/Redosh/outbox_worker.git

Or add to your pyproject.toml:

[tool.poetry.dependencies]
outbox-worker = "^0.1.0"

Quick Start

import asyncio
from faststream.rabbit import RabbitBroker
from outbox_worker import OutboxWorker, EventHandlerRouter

# Define your event schema
from src.outbox.schemas import BaseEventSchema
class UserCreatedSchema(BaseEventSchema):
    user_name: str

# Create a handler via decorator
from outbox_worker import event_handler, PydanticValidatedHandler
@event_handler("user_events")
class UserCreatedHandler(PydanticValidatedHandler):
    model = UserCreatedSchema

async def main():
    # Build handler router (handlers registered via decorator are auto-discovered)
    router = EventHandlerRouter(source="profile_service")

    # Configure broker
    broker = RabbitBroker("amqp://guest:guest@localhost/")

    # Instantiate worker
    worker = OutboxWorker(
        event_repository_factory=your_event_repository_factory,
        broker=broker,
        handler_router=router,
        batch_size=100,
        poll_interval=1.0,  # seconds between polls
        max_concurrent=5,
        dead_letter_queue="dead_letter",
    )

    await worker.run_polling()

if __name__ == "__main__":
    asyncio.run(main())

Decorator Usage

Use the @event_handler(queue_name) decorator to register handlers without manually populating the router:

from outbox_worker import event_handler, PydanticValidatedHandler
from src.outbox.schemas import BaseEventSchema

class OrderShippedSchema(BaseEventSchema):
    order_id: int
    shipped_at: datetime

@event_handler("order_events")
class OrderShippedHandler(PydanticValidatedHandler):
    model = OrderShippedSchema
    
    async def handle(self, payload: dict[str, Any]) -> None:
        # process the shipped event
        ...

Handlers decorated this way are automatically registered in the global registry and picked up by EventHandlerRouter.

Configuration Options

  • batch_size (int): Number of records fetched per batch.
  • poll_interval (float): Delay in seconds between batch polls.
  • max_concurrent (int, default 5): Maximum number of concurrent batch workers.
  • dead_letter_queue (str, default "dead_letter"): Queue name for messages that exceed retry limit.

Core Interfaces

HasOutboxPayload

Your record type must expose:

  • id: int
  • queue: str
  • created_at: datetime
  • payload: dict[str, Any]
  • is_published: bool
  • retry_count: int
  • is_failed: bool

OutboxEventRepository

class MyRepo:
    async def fetch_batch(self, limit: int) -> Sequence[HasOutboxPayload]:
        ...
    async def mark_published(self, record: HasOutboxPayload) -> None: ...
    async def mark_failed(self, record: HasOutboxPayload) -> None: ...

EventHandlerRouter

Routes by record.queue, with optional default handler.

Contributing

  1. Fork the repo
  2. Create a feature branch
  3. Open a PR against main

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

outbox_worker-1.0.0.tar.gz (11.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

outbox_worker-1.0.0-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file outbox_worker-1.0.0.tar.gz.

File metadata

  • Download URL: outbox_worker-1.0.0.tar.gz
  • Upload date:
  • Size: 11.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for outbox_worker-1.0.0.tar.gz
Algorithm Hash digest
SHA256 ee5ff3f9683f30c8523de32a41731ae3833f7abdd9e3b1ef6d39db07db5ad3f1
MD5 e56f92a43644945609cba9e8b67860a5
BLAKE2b-256 ee252704f8a5a0149e90a4b99e1217f48d799c5c8fd81f4f0d7fd1c993fc667f

See more details on using hashes here.

File details

Details for the file outbox_worker-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: outbox_worker-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for outbox_worker-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5d338231010aa5d1befabf1fa1e8feb6c755151658d1062a16de996d5b3c6b4f
MD5 1c96b64ae681a2b8ebbb8aff2c5f39b1
BLAKE2b-256 e3a42d00d586a40133a46f9cf7e6cda44e029ad4064715e8ba4dc64613e487f2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page