Qx worker runtime: NATS consumer with retry/DLQ/tracing
Project description
qx-worker
NATS JetStream consumer runtime for Qx integration-event handlers. Pulls messages in configurable batches, deserialises them via EventRegistry, dispatches to handlers through the Mediator, and acks/naks based on the handler result.
What lives here
qx.worker.WorkerRuntime— the main consumer loop. Fetches batches concurrently, opens a per-message DI scope, setsRequestContextfrom the event envelope (correlation_id, tenant_id, trace context), callsmediator.consume_integration(), and acks on success or naks on failure (letting JetStream handle retry and DLQ semantics viamax_deliver).
Usage
from qx.events import EventRegistry, NatsConsumer, NatsSettings, create_nats_connection
from qx.worker import WorkerRuntime
import asyncio, signal
async def main() -> None:
nc = await create_nats_connection(settings.nats.url)
consumer = NatsConsumer(nc, settings.nats)
registry = EventRegistry()
registry.register(UserRegisteredIntegration)
worker = WorkerRuntime(
container=container,
consumer=consumer,
registry=registry,
mediator=mediator,
concurrency=8,
)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(worker.stop()))
await worker.run()
Integration event handler (registered via Mediator)
from qx.cqrs import integration_event_handler
@integration_event_handler(UserRegisteredIntegration)
class SendWelcomeEmailHandler:
async def handle(self, event: UserRegisteredIntegration) -> Result[None]:
await self._mailer.send_welcome(event.email, event.name)
return Result.success(None)
Design rules
- No in-process retry loop — the worker only decides ack vs nak. JetStream redelivers nak'd messages with exponential backoff up to
max_deliver; after that, the message lands in the DLQ stream. - Concurrent processing —
concurrencycontrols how many messages are processed in parallel perfetch()batch usingasyncio.gather. Each message gets its own DI scope so shared state (sessions, connections) does not leak between concurrent handlers. - Observability — each message processing opens an OTel span linked to the incoming trace context extracted from NATS headers, so the full trace chain (HTTP → outbox relay → worker) is visible in your tracing backend.
- Graceful shutdown —
worker.stop()sets an internalasyncio.Event; the loop drains the current batch before exiting.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
qx_worker-1.0.0.tar.gz
(12.0 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
qx_worker-1.0.0-py3-none-any.whl
(10.6 kB
view details)
File details
Details for the file qx_worker-1.0.0.tar.gz.
File metadata
- Download URL: qx_worker-1.0.0.tar.gz
- Upload date:
- Size: 12.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f13f04f6d0c2ab9b4af905fe4b78fe73b1fbe176067fb87ba3a5734200387dda
|
|
| MD5 |
7541ab312bbd94c07e29fe62ada9d83d
|
|
| BLAKE2b-256 |
7448904941cffd12e4134eec6eb70aaa620a392e630908d59fda80f30f08a68b
|
File details
Details for the file qx_worker-1.0.0-py3-none-any.whl.
File metadata
- Download URL: qx_worker-1.0.0-py3-none-any.whl
- Upload date:
- Size: 10.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8f999e6aa2176473fc5a8b6c428342eaa66d2e9d1ace2ad029325a3be5ed56cd
|
|
| MD5 |
185ad30da18e40b96866cebcc3e42200
|
|
| BLAKE2b-256 |
eafb0c1cfb50bc8ae994db2004c9b6b0f34534d2456482a9d779d1b967203b79
|