Skip to main content

Generic outbox worker for FastStream and custom repositories

Project description

outbox-worker

🌀 A generic asynchronous Outbox Worker for publishing events using FastStream and custom repositories. Supports payload validation and handler routing.

Installation

pip install git+https://github.com/your-org/outbox-worker.git

Or add it to your pyproject.toml:

[dependencies]
outbox-worker = { git = "https://github.com/your-org/outbox-worker.git" }

Quick Start

from faststream.rabbit import RabbitBroker
from outbox import OutboxWorker, EventHandlerRouter
from your_project.handlers import YourHandler
from your_project.repos import your_event_repository_factory

router = EventHandlerRouter(
    source="profile",
    handlers={
        "queue_name": YourHandler(),
    }
)

worker = OutboxWorker(
    event_repository_factory=your_event_repository_factory,
    broker=RabbitBroker("amqp://localhost/"),
    handler_router=router,
    batch_size=100,
    poll_interval=1.0,
)

await worker.run_polling()

Interfaces to Implement

HasOutboxPayload: Your event record type. Must have the fields:

  • id: int
  • queue: str
  • created_at: datetime
  • payload: dict
  • is_published: bool
  • retry_count: int
  • is_failed: bool

OutboxEventRepository: Repository that fetches batches of events.

  • Method: fetch_batch(limit: int) -> Sequence[HasOutboxPayload]

  • Property: .session.commit() must be an async method

EventHandler: transforms a HasOutboxPayload record into a publishable dict

  • Method: to_payload(record) -> dict

EventHandlerRouter: routes records to the appropriate handler by queue name.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

outbox_worker-0.2.0.tar.gz (6.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

outbox_worker-0.2.0-py3-none-any.whl (5.7 kB view details)

Uploaded Python 3

File details

Details for the file outbox_worker-0.2.0.tar.gz.

File metadata

  • Download URL: outbox_worker-0.2.0.tar.gz
  • Upload date:
  • Size: 6.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for outbox_worker-0.2.0.tar.gz
Algorithm Hash digest
SHA256 ecc34f4607551e0a0e766baa86e87f4b0e6d9abf0461d68c9a20deef3591cbea
MD5 5f9bd7240b54329d3d421ea87b84733e
BLAKE2b-256 c35cada20a584d6373826c498f1a025e757cbaaa5a663de6feb602a14c114d98

See more details on using hashes here.

File details

Details for the file outbox_worker-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: outbox_worker-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 5.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for outbox_worker-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4c26e53dbdb96c5d177f684dfc08747fd6585fc22beb767fba067d5c5fb4f689
MD5 24ace21878a17b04f2c1d0bc75272572
BLAKE2b-256 6c6e5fc6c5b98bfea86c4e4188cc633ded6cd10b25c2ff381f1c0f9d0af91519

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page