Qx events: NATS JetStream publisher/consumer, outbox relay, mediator dispatcher bridge
Project description
qx-events
NATS JetStream publisher/consumer, transactional outbox relay, and mediator-bridge event dispatcher for the Qx framework.
What lives here
qx.events.OutboxRelay— pollsqx_outbox_events, publishes unpublished events to NATS JetStream, and marks them delivered. Runs as a background task alongside the HTTP server or as a standalone process. Supports optional leader election so only one relay instance publishes at a time.qx.events.NatsPublisher— publishes a singleIntegrationEventto a JetStream subject derived fromevent_name.qx.events.NatsConsumer— durable pull consumer over a JetStream stream. Used byWorkerRuntimeto fetch and ack/nak messages.qx.events.NatsSettings— Pydantic settings for NATS connection (URL, credentials, stream name, consumer name).qx.events.EventRegistry— mapsevent_namestrings to concreteIntegrationEventsubclasses. Required by both the relay (for serialisation) and the worker (for deserialisation).qx.events.MediatorEventDispatcher—EventDispatcherimplementation that routes domain events to their in-process handlers via the Mediator after a UnitOfWork commit.qx.events.create_nats_connection— async factory that opens a NATS connection with retry.
Usage
Outbox relay (alongside the API server)
from qx.events import OutboxRelay, NatsPublisher, NatsSettings, EventRegistry
registry = EventRegistry()
registry.register(UserRegisteredIntegration)
publisher = NatsPublisher(nc, registry)
relay = OutboxRelay(session_factory, publisher, registry)
# Run in background
asyncio.create_task(relay.run())
Consuming events in a worker
from qx.events import NatsConsumer, NatsSettings
settings = NatsSettings(url="nats://localhost:4222", stream="events", consumer="identity-worker")
consumer = NatsConsumer(nc, settings)
# WorkerRuntime handles the fetch/ack loop
worker = WorkerRuntime(container, consumer, registry, mediator)
await worker.run()
Design rules
- At-least-once delivery — the outbox guarantees every
INSERTed event is eventually published. Consumers are expected to be idempotent (useIdempotencyStorefromqx-cache). - Transactional outbox —
UnitOfWork(inqx-db) writes events toqx_outbox_eventsin the same transaction as the aggregate; the relay reads and publishes asynchronously. No event is lost even if the process crashes between commit and publish. - Event envelope — each NATS message carries
event_name,event_version, payload JSON,correlation_id,tenant_id, and OTel trace context headers for full observability continuity.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
qx_events-1.0.0.tar.gz
(9.8 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
qx_events-1.0.0-py3-none-any.whl
(10.2 kB
view details)
File details
Details for the file qx_events-1.0.0.tar.gz.
File metadata
- Download URL: qx_events-1.0.0.tar.gz
- Upload date:
- Size: 9.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a423a75a09d41d2f19a859be3ef2bbb3738ab83238bcf3a34bd287f946d77fb1
|
|
| MD5 |
97fdbbb93cdcda79e34a6d3181fba474
|
|
| BLAKE2b-256 |
34fad7f70d1ce773d7f09001365aae55e99ea68ad8d665f06937ec5d01a82310
|
File details
Details for the file qx_events-1.0.0-py3-none-any.whl.
File metadata
- Download URL: qx_events-1.0.0-py3-none-any.whl
- Upload date:
- Size: 10.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4d485bd94af1b179e232c36870e9e97e4804f932faab9115be8269f059dc9c5d
|
|
| MD5 |
c4e5b0207939b24cf5401de7d90efafa
|
|
| BLAKE2b-256 |
e02033eb0c5146c58342549a07803f4c029e2a6fbfb8848e51024a07fa3fe896
|