Skip to main content

Qx events: NATS JetStream publisher/consumer, outbox relay, mediator dispatcher bridge

Project description

qx-events

NATS JetStream publisher/consumer, transactional outbox relay, and mediator-bridge event dispatcher for the Qx framework.

What lives here

  • qx.events.OutboxRelay — polls qx_outbox_events, publishes unpublished events to NATS JetStream, and marks them delivered. Runs as a background task alongside the HTTP server or as a standalone process. Supports optional leader election so only one relay instance publishes at a time.
  • qx.events.NatsPublisher — publishes a single IntegrationEvent to a JetStream subject derived from event_name.
  • qx.events.NatsConsumer — durable pull consumer over a JetStream stream. Used by WorkerRuntime to fetch and ack/nak messages.
  • qx.events.NatsSettings — Pydantic settings for NATS connection (URL, credentials, stream name, consumer name).
  • qx.events.EventRegistry — maps event_name strings to concrete IntegrationEvent subclasses. Required by both the relay (for serialisation) and the worker (for deserialisation).
  • qx.events.MediatorEventDispatcherEventDispatcher implementation that routes domain events to their in-process handlers via the Mediator after a UnitOfWork commit.
  • qx.events.create_nats_connection — async factory that opens a NATS connection with retry.

Usage

Outbox relay (alongside the API server)

from qx.events import OutboxRelay, NatsPublisher, NatsSettings, EventRegistry

registry = EventRegistry()
registry.register(UserRegisteredIntegration)

publisher = NatsPublisher(nc, registry)
relay = OutboxRelay(session_factory, publisher, registry)

# Run in background
asyncio.create_task(relay.run())

Consuming events in a worker

from qx.events import NatsConsumer, NatsSettings

settings = NatsSettings(url="nats://localhost:4222", stream="events", consumer="identity-worker")
consumer = NatsConsumer(nc, settings)

# WorkerRuntime handles the fetch/ack loop
worker = WorkerRuntime(container, consumer, registry, mediator)
await worker.run()

Design rules

  • At-least-once delivery — the outbox guarantees every INSERTed event is eventually published. Consumers are expected to be idempotent (use IdempotencyStore from qx-cache).
  • Transactional outboxUnitOfWork (in qx-db) writes events to qx_outbox_events in the same transaction as the aggregate; the relay reads and publishes asynchronously. No event is lost even if the process crashes between commit and publish.
  • Event envelope — each NATS message carries event_name, event_version, payload JSON, correlation_id, tenant_id, and OTel trace context headers for full observability continuity.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qx_events-0.2.0.tar.gz (9.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qx_events-0.2.0-py3-none-any.whl (10.1 kB view details)

Uploaded Python 3

File details

Details for the file qx_events-0.2.0.tar.gz.

File metadata

  • Download URL: qx_events-0.2.0.tar.gz
  • Upload date:
  • Size: 9.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for qx_events-0.2.0.tar.gz
Algorithm Hash digest
SHA256 e9aae82a170d27dce7979f4590948c3fcb81532f7cbc3b157cb81ee9ebdb2dd1
MD5 4414e45195d37b3530a12c264b41196a
BLAKE2b-256 2bb79bbff3558bbc0c9aa21ea1661c4e5851d6d4458b87685fbe76ed3428a3c4

See more details on using hashes here.

File details

Details for the file qx_events-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: qx_events-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 10.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for qx_events-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d0a562f8ef82547def07fd27013fa1f3fc2bc78e8caad7beab5256f919753335
MD5 944efc533dabe5218c90d9753f471e17
BLAKE2b-256 06c33490567ca6da4288a197136d3c0c4c16f91816ad0d44b16701ba9eaa41b1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page