Qx worker runtime: NATS consumer with retry/DLQ/tracing
Project description
qx-worker
NATS JetStream consumer runtime for Qx integration-event handlers. Pulls messages in configurable batches, deserialises them via EventRegistry, dispatches to handlers through the Mediator, and acks/naks based on the handler result.
What lives here
qx.worker.WorkerRuntime— the main consumer loop. Fetches batches concurrently, opens a per-message DI scope, setsRequestContextfrom the event envelope (correlation_id, tenant_id, trace context), callsmediator.consume_integration(), and acks on success or naks on failure (letting JetStream handle retry and DLQ semantics viamax_deliver).
Usage
from qx.events import EventRegistry, NatsConsumer, NatsSettings, create_nats_connection
from qx.worker import WorkerRuntime
import asyncio, signal
async def main() -> None:
nc = await create_nats_connection(settings.nats.url)
consumer = NatsConsumer(nc, settings.nats)
registry = EventRegistry()
registry.register(UserRegisteredIntegration)
worker = WorkerRuntime(
container=container,
consumer=consumer,
registry=registry,
mediator=mediator,
concurrency=8,
)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(worker.stop()))
await worker.run()
Integration event handler (registered via Mediator)
from qx.cqrs import integration_event_handler
@integration_event_handler(UserRegisteredIntegration)
class SendWelcomeEmailHandler:
async def handle(self, event: UserRegisteredIntegration) -> Result[None]:
await self._mailer.send_welcome(event.email, event.name)
return Result.success(None)
Design rules
- No in-process retry loop — the worker only decides ack vs nak. JetStream redelivers nak'd messages with exponential backoff up to
max_deliver; after that, the message lands in the DLQ stream. - Concurrent processing —
concurrencycontrols how many messages are processed in parallel perfetch()batch usingasyncio.gather. Each message gets its own DI scope so shared state (sessions, connections) does not leak between concurrent handlers. - Observability — each message processing opens an OTel span linked to the incoming trace context extracted from NATS headers, so the full trace chain (HTTP → outbox relay → worker) is visible in your tracing backend.
- Graceful shutdown —
worker.stop()sets an internalasyncio.Event; the loop drains the current batch before exiting.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
qx_worker-1.1.0.tar.gz
(12.0 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
qx_worker-1.1.0-py3-none-any.whl
(10.6 kB
view details)
File details
Details for the file qx_worker-1.1.0.tar.gz.
File metadata
- Download URL: qx_worker-1.1.0.tar.gz
- Upload date:
- Size: 12.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
54d226331add7ee7e2b34ae03f1091883bdd0df97fdb97b7cfadcecb54e3380e
|
|
| MD5 |
5337cf0f3e50aefcea7f2a9289596c81
|
|
| BLAKE2b-256 |
e353debe6a4e7d7ef825b20365687931e77abc073e9f896f8d1650581d845d71
|
File details
Details for the file qx_worker-1.1.0-py3-none-any.whl.
File metadata
- Download URL: qx_worker-1.1.0-py3-none-any.whl
- Upload date:
- Size: 10.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6390db5361929f574ae148fe5c6a6ce624248e8c6e05b3f83b0181054d9a5da8
|
|
| MD5 |
2e09a9c23485c5ed487bad28527c8c81
|
|
| BLAKE2b-256 |
588616e5ff46d7755f3850653ba128c93656120c2d0ab227a162f30daf856b0c
|