Generic outbox worker for FastStream and custom repositories
Project description
outbox-worker
A generic asynchronous Outbox Worker for publishing events using FastStream (RabbitMQ) and custom repositories. Supports payload validation, handler routing, retry policies, and dead-letter queues.
Installation
Install from PyPI:
pip install outbox-worker
Or install the latest development version from GitHub:
pip install git+https://github.com/Redosh/outbox_worker.git
Or add to your pyproject.toml:
[tool.poetry.dependencies]
outbox-worker = "^0.1.0"
Quick Start
import asyncio
from faststream.rabbit import RabbitBroker
from outbox_worker import OutboxWorker, EventHandlerRouter
# Define your event schema
from src.outbox.schemas import BaseEventSchema
class UserCreatedSchema(BaseEventSchema):
user_name: str
# Create a handler via decorator
from outbox_worker import event_handler, PydanticValidatedHandler
@event_handler("user_events")
class UserCreatedHandler(PydanticValidatedHandler):
model = UserCreatedSchema
async def main():
# Build handler router (handlers registered via decorator are auto-discovered)
router = EventHandlerRouter(source="profile_service")
# Configure broker
broker = RabbitBroker("amqp://guest:guest@localhost/")
# Instantiate worker
worker = OutboxWorker(
event_repository_factory=your_event_repository_factory,
broker=broker,
handler_router=router,
batch_size=100,
poll_interval=1.0, # seconds between polls
max_concurrent=5,
dead_letter_queue="dead_letter",
)
await worker.run_polling()
if __name__ == "__main__":
asyncio.run(main())
Decorator Usage
Use the @event_handler(queue_name) decorator to register handlers without manually populating the router:
from outbox_worker import event_handler, PydanticValidatedHandler
from src.outbox.schemas import BaseEventSchema
class OrderShippedSchema(BaseEventSchema):
order_id: int
shipped_at: datetime
@event_handler("order_events")
class OrderShippedHandler(PydanticValidatedHandler):
model = OrderShippedSchema
async def handle(self, payload: dict[str, Any]) -> None:
# process the shipped event
...
Handlers decorated this way are automatically registered in the global registry and picked up by EventHandlerRouter.
Configuration Options
- batch_size (
int): Number of records fetched per batch. - poll_interval (
float): Delay in seconds between batch polls. - max_concurrent (
int, default 5): Maximum number of concurrent batch workers. - dead_letter_queue (
str, default "dead_letter"): Queue name for messages that exceed retry limit.
Core Interfaces
HasOutboxPayload
Your record type must expose:
id: intqueue: strcreated_at: datetimepayload: dict[str, Any]is_published: boolretry_count: intis_failed: bool
OutboxEventRepository
class MyRepo:
async def fetch_batch(self, limit: int) -> Sequence[HasOutboxPayload]:
...
async def mark_published(self, record: HasOutboxPayload) -> None: ...
async def mark_failed(self, record: HasOutboxPayload) -> None: ...
EventHandlerRouter
Routes by record.queue, with optional default handler.
Contributing
- Fork the repo
- Create a feature branch
- Open a PR against
main
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file outbox_worker-1.0.0.tar.gz.
File metadata
- Download URL: outbox_worker-1.0.0.tar.gz
- Upload date:
- Size: 11.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ee5ff3f9683f30c8523de32a41731ae3833f7abdd9e3b1ef6d39db07db5ad3f1
|
|
| MD5 |
e56f92a43644945609cba9e8b67860a5
|
|
| BLAKE2b-256 |
ee252704f8a5a0149e90a4b99e1217f48d799c5c8fd81f4f0d7fd1c993fc667f
|
File details
Details for the file outbox_worker-1.0.0-py3-none-any.whl.
File metadata
- Download URL: outbox_worker-1.0.0-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d338231010aa5d1befabf1fa1e8feb6c755151658d1062a16de996d5b3c6b4f
|
|
| MD5 |
1c96b64ae681a2b8ebbb8aff2c5f39b1
|
|
| BLAKE2b-256 |
e3a42d00d586a40133a46f9cf7e6cda44e029ad4064715e8ba4dc64613e487f2
|