Skip to main content

Qx worker runtime: NATS consumer with retry/DLQ/tracing

Project description

qx-worker

NATS JetStream consumer runtime for Qx integration-event handlers. Pulls messages in configurable batches, deserialises them via EventRegistry, dispatches to handlers through the Mediator, and acks/naks based on the handler result.

What lives here

  • qx.worker.WorkerRuntime — the main consumer loop. Fetches batches concurrently, opens a per-message DI scope, sets RequestContext from the event envelope (correlation_id, tenant_id, trace context), calls mediator.consume_integration(), and acks on success or naks on failure (letting JetStream handle retry and DLQ semantics via max_deliver).

Usage

from qx.events import EventRegistry, NatsConsumer, NatsSettings, create_nats_connection
from qx.worker import WorkerRuntime
import asyncio, signal

async def main() -> None:
    nc = await create_nats_connection(settings.nats.url)
    consumer = NatsConsumer(nc, settings.nats)

    registry = EventRegistry()
    registry.register(UserRegisteredIntegration)

    worker = WorkerRuntime(
        container=container,
        consumer=consumer,
        registry=registry,
        mediator=mediator,
        concurrency=8,
    )

    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(worker.stop()))

    await worker.run()

Integration event handler (registered via Mediator)

from qx.cqrs import integration_event_handler

@integration_event_handler(UserRegisteredIntegration)
class SendWelcomeEmailHandler:
    async def handle(self, event: UserRegisteredIntegration) -> Result[None]:
        await self._mailer.send_welcome(event.email, event.name)
        return Result.success(None)

Design rules

  • No in-process retry loop — the worker only decides ack vs nak. JetStream redelivers nak'd messages with exponential backoff up to max_deliver; after that, the message lands in the DLQ stream.
  • Concurrent processingconcurrency controls how many messages are processed in parallel per fetch() batch using asyncio.gather. Each message gets its own DI scope so shared state (sessions, connections) does not leak between concurrent handlers.
  • Observability — each message processing opens an OTel span linked to the incoming trace context extracted from NATS headers, so the full trace chain (HTTP → outbox relay → worker) is visible in your tracing backend.
  • Graceful shutdownworker.stop() sets an internal asyncio.Event; the loop drains the current batch before exiting.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qx_worker-1.0.0.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qx_worker-1.0.0-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file qx_worker-1.0.0.tar.gz.

File metadata

  • Download URL: qx_worker-1.0.0.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for qx_worker-1.0.0.tar.gz
Algorithm Hash digest
SHA256 f13f04f6d0c2ab9b4af905fe4b78fe73b1fbe176067fb87ba3a5734200387dda
MD5 7541ab312bbd94c07e29fe62ada9d83d
BLAKE2b-256 7448904941cffd12e4134eec6eb70aaa620a392e630908d59fda80f30f08a68b

See more details on using hashes here.

File details

Details for the file qx_worker-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: qx_worker-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 10.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for qx_worker-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8f999e6aa2176473fc5a8b6c428342eaa66d2e9d1ace2ad029325a3be5ed56cd
MD5 185ad30da18e40b96866cebcc3e42200
BLAKE2b-256 eafb0c1cfb50bc8ae994db2004c9b6b0f34534d2456482a9d779d1b967203b79

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page