Skip to main content

Generic outbox worker for FastStream and custom repositories

Project description

outbox-worker

🌀 A generic asynchronous Outbox Worker for publishing events using FastStream and custom repositories. Supports payload validation and handler routing.

Installation

pip install git+https://github.com/your-org/outbox-worker.git

Or add it to your pyproject.toml:

[dependencies]
outbox-worker = { git = "https://github.com/your-org/outbox-worker.git" }

Quick Start

from faststream.rabbit import RabbitBroker
from outbox import OutboxWorker, EventHandlerRouter
from your_project.handlers import YourHandler
from your_project.repos import your_event_repository_factory

router = EventHandlerRouter(
    source="profile",
    handlers={
        "queue_name": YourHandler(),
    }
)

worker = OutboxWorker(
    event_repository_factory=your_event_repository_factory,
    broker=RabbitBroker("amqp://localhost/"),
    handler_router=router,
    batch_size=100,
    poll_interval=1.0,
)

await worker.run_polling()

Interfaces to Implement

HasOutboxPayload: Your event record type. Must have the fields:

  • id: int
  • queue: str
  • created_at: datetime
  • payload: dict
  • is_published: bool
  • retry_count: int
  • is_failed: bool

OutboxEventRepository: Repository that fetches batches of events.

  • Method: fetch_batch(limit: int) -> Sequence[HasOutboxPayload]

  • Property: .session.commit() must be an async method

EventHandler: transforms a HasOutboxPayload record into a publishable dict

  • Method: to_payload(record) -> dict

EventHandlerRouter: routes records to the appropriate handler by queue name.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

outbox_worker-0.2.1.tar.gz (6.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

outbox_worker-0.2.1-py3-none-any.whl (5.7 kB view details)

Uploaded Python 3

File details

Details for the file outbox_worker-0.2.1.tar.gz.

File metadata

  • Download URL: outbox_worker-0.2.1.tar.gz
  • Upload date:
  • Size: 6.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for outbox_worker-0.2.1.tar.gz
Algorithm Hash digest
SHA256 9aa4c72c5fa4faf1e64f112a257ef666b19b822a271bfe5b4d2c9c2223def4e1
MD5 20e9835cf9b8adcc3fedc20feac59368
BLAKE2b-256 9dd5c05f8c7caf146fdb6e05716d22aa72b4aa51763fa5181a8053cb03266616

See more details on using hashes here.

File details

Details for the file outbox_worker-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: outbox_worker-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 5.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for outbox_worker-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 91cf9f8e3e20b5a267a32e4c7b0709e4803ab02c47695768cdc46f3d32e4faa9
MD5 71acc722024b083cc2351641788d4bbb
BLAKE2b-256 2715c4346bcc34a42776d5be38c579c74cff229c367d7144fa8bd428c19f8568

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page