Skip to main content

Qx worker runtime: NATS consumer with retry/DLQ/tracing

Project description

qx-worker

NATS JetStream consumer runtime for Qx integration-event handlers. Pulls messages in configurable batches, deserialises them via EventRegistry, dispatches to handlers through the Mediator, and acks/naks based on the handler result.

What lives here

  • qx.worker.WorkerRuntime — the main consumer loop. Fetches batches concurrently, opens a per-message DI scope, sets RequestContext from the event envelope (correlation_id, tenant_id, trace context), calls mediator.consume_integration(), and acks on success or naks on failure (letting JetStream handle retry and DLQ semantics via max_deliver).

Usage

from qx.events import EventRegistry, NatsConsumer, NatsSettings, create_nats_connection
from qx.worker import WorkerRuntime
import asyncio, signal

async def main() -> None:
    nc = await create_nats_connection(settings.nats.url)
    consumer = NatsConsumer(nc, settings.nats)

    registry = EventRegistry()
    registry.register(UserRegisteredIntegration)

    worker = WorkerRuntime(
        container=container,
        consumer=consumer,
        registry=registry,
        mediator=mediator,
        concurrency=8,
    )

    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(worker.stop()))

    await worker.run()

Integration event handler (registered via Mediator)

from qx.cqrs import integration_event_handler

@integration_event_handler(UserRegisteredIntegration)
class SendWelcomeEmailHandler:
    async def handle(self, event: UserRegisteredIntegration) -> Result[None]:
        await self._mailer.send_welcome(event.email, event.name)
        return Result.success(None)

Design rules

  • No in-process retry loop — the worker only decides ack vs nak. JetStream redelivers nak'd messages with exponential backoff up to max_deliver; after that, the message lands in the DLQ stream.
  • Concurrent processingconcurrency controls how many messages are processed in parallel per fetch() batch using asyncio.gather. Each message gets its own DI scope so shared state (sessions, connections) does not leak between concurrent handlers.
  • Observability — each message processing opens an OTel span linked to the incoming trace context extracted from NATS headers, so the full trace chain (HTTP → outbox relay → worker) is visible in your tracing backend.
  • Graceful shutdownworker.stop() sets an internal asyncio.Event; the loop drains the current batch before exiting.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qx_worker-1.1.0.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qx_worker-1.1.0-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file qx_worker-1.1.0.tar.gz.

File metadata

  • Download URL: qx_worker-1.1.0.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for qx_worker-1.1.0.tar.gz
Algorithm Hash digest
SHA256 54d226331add7ee7e2b34ae03f1091883bdd0df97fdb97b7cfadcecb54e3380e
MD5 5337cf0f3e50aefcea7f2a9289596c81
BLAKE2b-256 e353debe6a4e7d7ef825b20365687931e77abc073e9f896f8d1650581d845d71

See more details on using hashes here.

File details

Details for the file qx_worker-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: qx_worker-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 10.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for qx_worker-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6390db5361929f574ae148fe5c6a6ce624248e8c6e05b3f83b0181054d9a5da8
MD5 2e09a9c23485c5ed487bad28527c8c81
BLAKE2b-256 588616e5ff46d7755f3850653ba128c93656120c2d0ab227a162f30daf856b0c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page