NATS JetStream event bus backend for varco — NatsEventBus built on nats-py
Project description
varco-nats
NATS JetStream event bus backend for varco —
NatsEventBus built on nats-py.
varco_nats implements varco_core's AbstractEventBus, ChannelManager,
AbstractDeadLetterQueue and HealthCheck contracts on top of NATS JetStream —
the persistent, at-least-once layer of NATS (its analogue of Apache Kafka).
JetStream only. Core NATS at-most-once pub/sub is intentionally not exposed. If you need fire-and-forget delivery, use
varco_redis's Pub/Sub bus.
Installation
uv add varco-nats # or: pip install varco-nats
Requires a running NATS server with JetStream enabled (nats-server -js).
Quick start
from varco_nats import NatsEventBus, NatsEventBusSettings
from varco_core.event import BusEventProducer, EventConsumer, listen
class OrderPlacedEvent(Event):
__event_type__ = "order.placed"
order_id: str
config = NatsEventBusSettings(
servers="nats://localhost:4222",
durable_name="order-service", # the JetStream analogue of a Kafka group_id
)
async with NatsEventBus(config) as bus:
class OrderConsumer(EventConsumer):
@listen(OrderPlacedEvent, channel="orders")
async def on_placed(self, event: OrderPlacedEvent) -> None:
print(f"Order placed: {event.order_id}")
OrderConsumer().register_to(bus)
producer = BusEventProducer(bus)
await producer._produce(OrderPlacedEvent(order_id="abc"), channel="orders")
How channels map to NATS
Unlike Kafka — where each channel is a topic — NATS channels are subjects under a single JetStream stream's wildcard:
| varco concept | NATS concept |
|---|---|
stream (stream_name) |
one JetStream stream capturing {subject_prefix}.> |
channel "orders" |
subject {subject_prefix}.{channel_prefix}orders |
durable_name |
base name for durable consumers (≈ Kafka consumer group) |
CHANNEL_ALL |
local-only filter — opens no consumer |
The bus creates the backing stream automatically on start() when
auto_create_stream=True (the default).
Delivery semantics
NatsEventBus mirrors KafkaEventBus: JetStream redelivery is the
broker-level safety net, while handler-level retries are the job of
varco's @listen(retry_policy=..., dlq=...) machinery.
delivery_semantics |
Behaviour |
|---|---|
at_most_once |
Message acked before dispatch. Crash → message lost. No duplicates. |
at_least_once (default) |
Message acked after dispatch. Crash before ack → JetStream redelivers. |
exactly_once |
As at_least_once + every publish carries Nats-Msg-Id = event.event_id, so JetStream drops producer-retry duplicates within duplicate_window. |
from varco_nats import NatsDeliverySemantics
config = NatsEventBusSettings(
servers="nats://localhost:4222",
delivery_semantics=NatsDeliverySemantics.EXACTLY_ONCE,
)
Configuration
All settings are read from environment variables with the VARCO_NATS_ prefix:
VARCO_NATS_SERVERS=nats://nats.internal:4222
VARCO_NATS_STREAM_NAME=orders-events
VARCO_NATS_SUBJECT_PREFIX=orders
VARCO_NATS_DURABLE_NAME=order-service
VARCO_NATS_DELIVERY_SEMANTICS=at_least_once
VARCO_NATS_CHANNEL_PREFIX=prod.
config = NatsEventBusSettings.from_env()
For structured connection/security config (TLS, user/password, token), use
NatsConnectionSettings with the NATS_ prefix:
NATS_SERVERS=nats://nats1:4222,nats://nats2:4222
NATS_SSL__CA_CERT=/etc/ssl/nats-ca.pem
NATS_AUTH__TYPE=basic
NATS_AUTH__USERNAME=alice
NATS_AUTH__PASSWORD=secret
from varco_nats import NatsConnectionSettings
conn = NatsConnectionSettings.from_env()
config = NatsEventBusSettings(connect_kwargs=conn.to_nats_kwargs())
Stream management
NatsStreamManager administers the backing JetStream stream:
from varco_nats import NatsStreamManager, NatsChannelManagerSettings
settings = NatsChannelManagerSettings(servers="nats://localhost:4222")
async with NatsStreamManager(settings) as manager:
await manager.declare_channel("orders") # ensures the backing stream
exists = await manager.channel_exists("orders") # has the subject any message?
channels = await manager.list_channels() # channels carrying messages
await manager.delete_channel("orders") # purge that channel's messages
Dead letter queue
NatsDLQ stores exhausted events in a dedicated WorkQueue-retention
JetStream stream — so count() returns the exact pending-entry count:
from varco_nats import NatsDLQ
async with NatsDLQ(settings=NatsEventBusSettings()) as dlq:
# Usually wired automatically via @listen(dlq=dlq):
class OrderConsumer(EventConsumer):
@listen(
OrderPlacedEvent,
channel="orders",
retry_policy=RetryPolicy(max_attempts=3),
dlq=dlq,
)
async def on_order(self, event: OrderPlacedEvent) -> None: ...
# Relay:
entries = await dlq.pop_batch(limit=10)
for entry in entries:
await alert_ops(entry)
await dlq.ack(entry.entry_id)
Dependency injection (Providify)
from varco_nats.di import bootstrap
from varco_core.event import AbstractEventBus
container = bootstrap() # scans varco_nats
bus = await container.aget(AbstractEventBus) # NatsEventBus singleton
# ...
await container.ashutdown() # stops the bus via @PreDestroy
Install the DLQ explicitly when needed:
from varco_nats.dlq import NatsDLQConfiguration
from varco_core.event.dlq import AbstractDeadLetterQueue
await container.ainstall(NatsDLQConfiguration)
dlq = await container.aget(AbstractDeadLetterQueue)
Running tests
# Unit tests — no broker required (nats-py is faked)
uv run pytest varco_nats/tests/
# Integration tests — require Docker (a real NATS server is started)
uv run pytest varco_nats/tests/ -m integration
# or
VARCO_RUN_INTEGRATION=1 uv run pytest varco_nats/tests/
Migration 1.x → 2.0
The no-op @Configuration aliases (NatsEventBusConfiguration,
NatsChannelManagerConfiguration) were removed. Register the bus via scan:
# Before (1.x)
await container.ainstall(NatsEventBusConfiguration)
# After (2.0)
from varco_nats.di import bootstrap
bootstrap(container) # or: container.scan("varco_nats", recursive=True)
The opt-in NatsDLQConfiguration is unchanged — still await container.ainstall(...).
License
Apache-2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file varco_nats-2.0.0.tar.gz.
File metadata
- Download URL: varco_nats-2.0.0.tar.gz
- Upload date:
- Size: 43.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
632ac3ac8014b3d60c2bb6ade4be443b14f9b1ef3f0b516aa35345858938ce0a
|
|
| MD5 |
c3bb44e20be186a94a49e84a8d1799da
|
|
| BLAKE2b-256 |
d878c869240d268cc9ac0d3a729bd3b7765a60b00db998fb74eaa28bbdde82dd
|
File details
Details for the file varco_nats-2.0.0-py3-none-any.whl.
File metadata
- Download URL: varco_nats-2.0.0-py3-none-any.whl
- Upload date:
- Size: 37.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a107db9a8154fefb0f282a2de0bcc1da2b1c5a6dc3d1724e9a2a9ec2122ec6c1
|
|
| MD5 |
ed98d583b20968b6dbb8d5572d48678a
|
|
| BLAKE2b-256 |
0a2f0c810c2c1ca8ae5e51f841942e68f995f7476c9dd0c2bae73d974f955cdc
|