Skip to main content

NATS JetStream event bus backend for varco — NatsEventBus built on nats-py

Project description

varco-nats

NATS JetStream event bus backend for varcoNatsEventBus built on nats-py.

varco_nats implements varco_core's AbstractEventBus, ChannelManager, AbstractDeadLetterQueue and HealthCheck contracts on top of NATS JetStream — the persistent, at-least-once layer of NATS (its analogue of Apache Kafka).

JetStream only. Core NATS at-most-once pub/sub is intentionally not exposed. If you need fire-and-forget delivery, use varco_redis's Pub/Sub bus.


Installation

uv add varco-nats          # or: pip install varco-nats

Requires a running NATS server with JetStream enabled (nats-server -js).


Quick start

from varco_nats import NatsEventBus, NatsEventBusSettings
from varco_core.event import BusEventProducer, EventConsumer, listen


class OrderPlacedEvent(Event):
    __event_type__ = "order.placed"
    order_id: str


config = NatsEventBusSettings(
    servers="nats://localhost:4222",
    durable_name="order-service",   # the JetStream analogue of a Kafka group_id
)

async with NatsEventBus(config) as bus:
    class OrderConsumer(EventConsumer):
        @listen(OrderPlacedEvent, channel="orders")
        async def on_placed(self, event: OrderPlacedEvent) -> None:
            print(f"Order placed: {event.order_id}")

    OrderConsumer().register_to(bus)

    producer = BusEventProducer(bus)
    await producer._produce(OrderPlacedEvent(order_id="abc"), channel="orders")

How channels map to NATS

Unlike Kafka — where each channel is a topic — NATS channels are subjects under a single JetStream stream's wildcard:

varco concept NATS concept
stream (stream_name) one JetStream stream capturing {subject_prefix}.>
channel "orders" subject {subject_prefix}.{channel_prefix}orders
durable_name base name for durable consumers (≈ Kafka consumer group)
CHANNEL_ALL local-only filter — opens no consumer

The bus creates the backing stream automatically on start() when auto_create_stream=True (the default).


Delivery semantics

NatsEventBus mirrors KafkaEventBus: JetStream redelivery is the broker-level safety net, while handler-level retries are the job of varco's @listen(retry_policy=..., dlq=...) machinery.

delivery_semantics Behaviour
at_most_once Message acked before dispatch. Crash → message lost. No duplicates.
at_least_once (default) Message acked after dispatch. Crash before ack → JetStream redelivers.
exactly_once As at_least_once + every publish carries Nats-Msg-Id = event.event_id, so JetStream drops producer-retry duplicates within duplicate_window.
from varco_nats import NatsDeliverySemantics

config = NatsEventBusSettings(
    servers="nats://localhost:4222",
    delivery_semantics=NatsDeliverySemantics.EXACTLY_ONCE,
)

Configuration

All settings are read from environment variables with the VARCO_NATS_ prefix:

VARCO_NATS_SERVERS=nats://nats.internal:4222
VARCO_NATS_STREAM_NAME=orders-events
VARCO_NATS_SUBJECT_PREFIX=orders
VARCO_NATS_DURABLE_NAME=order-service
VARCO_NATS_DELIVERY_SEMANTICS=at_least_once
VARCO_NATS_CHANNEL_PREFIX=prod.
config = NatsEventBusSettings.from_env()

For structured connection/security config (TLS, user/password, token), use NatsConnectionSettings with the NATS_ prefix:

NATS_SERVERS=nats://nats1:4222,nats://nats2:4222
NATS_SSL__CA_CERT=/etc/ssl/nats-ca.pem
NATS_AUTH__TYPE=basic
NATS_AUTH__USERNAME=alice
NATS_AUTH__PASSWORD=secret
from varco_nats import NatsConnectionSettings

conn = NatsConnectionSettings.from_env()
config = NatsEventBusSettings(connect_kwargs=conn.to_nats_kwargs())

Stream management

NatsStreamManager administers the backing JetStream stream:

from varco_nats import NatsStreamManager, NatsChannelManagerSettings

settings = NatsChannelManagerSettings(servers="nats://localhost:4222")
async with NatsStreamManager(settings) as manager:
    await manager.declare_channel("orders")    # ensures the backing stream
    exists = await manager.channel_exists("orders")  # has the subject any message?
    channels = await manager.list_channels()   # channels carrying messages
    await manager.delete_channel("orders")     # purge that channel's messages

Dead letter queue

NatsDLQ stores exhausted events in a dedicated WorkQueue-retention JetStream stream — so count() returns the exact pending-entry count:

from varco_nats import NatsDLQ

async with NatsDLQ(settings=NatsEventBusSettings()) as dlq:
    # Usually wired automatically via @listen(dlq=dlq):
    class OrderConsumer(EventConsumer):
        @listen(
            OrderPlacedEvent,
            channel="orders",
            retry_policy=RetryPolicy(max_attempts=3),
            dlq=dlq,
        )
        async def on_order(self, event: OrderPlacedEvent) -> None: ...

    # Relay:
    entries = await dlq.pop_batch(limit=10)
    for entry in entries:
        await alert_ops(entry)
        await dlq.ack(entry.entry_id)

Dependency injection (Providify)

from varco_nats.di import bootstrap
from varco_core.event import AbstractEventBus

container = bootstrap()                       # scans varco_nats
bus = await container.aget(AbstractEventBus)  # NatsEventBus singleton
# ...
await container.ashutdown()                   # stops the bus via @PreDestroy

Install the DLQ explicitly when needed:

from varco_nats.dlq import NatsDLQConfiguration
from varco_core.event.dlq import AbstractDeadLetterQueue

await container.ainstall(NatsDLQConfiguration)
dlq = await container.aget(AbstractDeadLetterQueue)

Running tests

# Unit tests — no broker required (nats-py is faked)
uv run pytest varco_nats/tests/

# Integration tests — require Docker (a real NATS server is started)
uv run pytest varco_nats/tests/ -m integration
# or
VARCO_RUN_INTEGRATION=1 uv run pytest varco_nats/tests/

Migration 1.x → 2.0

The no-op @Configuration aliases (NatsEventBusConfiguration, NatsChannelManagerConfiguration) were removed. Register the bus via scan:

# Before (1.x)
await container.ainstall(NatsEventBusConfiguration)

# After (2.0)
from varco_nats.di import bootstrap
bootstrap(container)            # or: container.scan("varco_nats", recursive=True)

The opt-in NatsDLQConfiguration is unchanged — still await container.ainstall(...).


License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

varco_nats-2.0.0.tar.gz (43.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

varco_nats-2.0.0-py3-none-any.whl (37.3 kB view details)

Uploaded Python 3

File details

Details for the file varco_nats-2.0.0.tar.gz.

File metadata

  • Download URL: varco_nats-2.0.0.tar.gz
  • Upload date:
  • Size: 43.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_nats-2.0.0.tar.gz
Algorithm Hash digest
SHA256 632ac3ac8014b3d60c2bb6ade4be443b14f9b1ef3f0b516aa35345858938ce0a
MD5 c3bb44e20be186a94a49e84a8d1799da
BLAKE2b-256 d878c869240d268cc9ac0d3a729bd3b7765a60b00db998fb74eaa28bbdde82dd

See more details on using hashes here.

File details

Details for the file varco_nats-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: varco_nats-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 37.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_nats-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a107db9a8154fefb0f282a2de0bcc1da2b1c5a6dc3d1724e9a2a9ec2122ec6c1
MD5 ed98d583b20968b6dbb8d5572d48678a
BLAKE2b-256 0a2f0c810c2c1ca8ae5e51f841942e68f995f7476c9dd0c2bae73d974f955cdc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page