Skip to main content

Qx events: NATS JetStream publisher/consumer, outbox relay, mediator dispatcher bridge

Project description

qx-events

NATS JetStream publisher/consumer, transactional outbox relay, and mediator-bridge event dispatcher for the Qx framework.

What lives here

  • qx.events.OutboxRelay — polls qx_outbox_events, publishes unpublished events to NATS JetStream, and marks them delivered. Runs as a background task alongside the HTTP server or as a standalone process. Supports optional leader election so only one relay instance publishes at a time.
  • qx.events.NatsPublisher — publishes a single IntegrationEvent to a JetStream subject derived from event_name.
  • qx.events.NatsConsumer — durable pull consumer over a JetStream stream. Used by WorkerRuntime to fetch and ack/nak messages.
  • qx.events.NatsSettings — Pydantic settings for NATS connection (URL, credentials, stream name, consumer name).
  • qx.events.EventRegistry — maps event_name strings to concrete IntegrationEvent subclasses. Required by both the relay (for serialisation) and the worker (for deserialisation).
  • qx.events.MediatorEventDispatcherEventDispatcher implementation that routes domain events to their in-process handlers via the Mediator after a UnitOfWork commit.
  • qx.events.create_nats_connection — async factory that opens a NATS connection with retry.

Usage

Outbox relay (alongside the API server)

from qx.events import OutboxRelay, NatsPublisher, NatsSettings, EventRegistry

registry = EventRegistry()
registry.register(UserRegisteredIntegration)

publisher = NatsPublisher(nc, registry)
relay = OutboxRelay(session_factory, publisher, registry)

# Run in background
asyncio.create_task(relay.run())

Consuming events in a worker

from qx.events import NatsConsumer, NatsSettings

settings = NatsSettings(url="nats://localhost:4222", stream="events", consumer="identity-worker")
consumer = NatsConsumer(nc, settings)

# WorkerRuntime handles the fetch/ack loop
worker = WorkerRuntime(container, consumer, registry, mediator)
await worker.run()

Design rules

  • At-least-once delivery — the outbox guarantees every INSERTed event is eventually published. Consumers are expected to be idempotent (use IdempotencyStore from qx-cache).
  • Transactional outboxUnitOfWork (in qx-db) writes events to qx_outbox_events in the same transaction as the aggregate; the relay reads and publishes asynchronously. No event is lost even if the process crashes between commit and publish.
  • Event envelope — each NATS message carries event_name, event_version, payload JSON, correlation_id, tenant_id, and OTel trace context headers for full observability continuity.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qx_events-1.1.0.tar.gz (9.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qx_events-1.1.0-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file qx_events-1.1.0.tar.gz.

File metadata

  • Download URL: qx_events-1.1.0.tar.gz
  • Upload date:
  • Size: 9.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for qx_events-1.1.0.tar.gz
Algorithm Hash digest
SHA256 762ea581e1adcaca8f72bca50c1aaebf8e09e911c45d55d84bcae3f0e105cb53
MD5 475fc62fc385957b03892ac5faef75c5
BLAKE2b-256 5096c1560ca8f28c46c659719f7c75dbc473e48e09903102ca6b7cc0226520b7

See more details on using hashes here.

File details

Details for the file qx_events-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: qx_events-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 10.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for qx_events-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1e8fb89d408703c72806dba18bb3a63639e4b319af8a463aca79f5730bbd8176
MD5 3fbb6db659edb5936dbef42202874a12
BLAKE2b-256 3d8119bb779bb31094edcd5b8ded99ee56840e5bbe8fd633bae2894e973621a2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page