Skip to main content

Core NATS Extensions is a set of utilities providing additional features to Core NATS component of nats-py client.

Project description

Core NATS Extensions

Core NATS Extensions is a set of utilities providing additional features to Core NATS component of nats-py client.

Installation

uv add natsext

Utilities

see examples.py for a runnable version of all snippets below.

request_many

request_many is a utility that allows you to send a single request and await multiple responses. This allows you to implement various patterns like scatter-gather or streaming responses.

Responses are returned in an async iterator, which you can iterate over to receive messages. When a termination condition is met, the iterator is closed (and no error is returned).

import nats
import natsext

nc = await nats.connect()

# Basic usage
async for msg in natsext.request_many(nc, "subject", b"request data"):
    print(f"Received: {msg.data}")

Alternatively, use request_many_msg to send a Msg request:

import nats
from nats.aio.msg import Msg
import natsext

nc = await nats.connect()

msg = Msg(
    nc,
    subject="subject",
    data=b"request data",
    headers={
        "Key": "Value",
    },
)
async for response in natsext.request_many_msg(nc, msg):
    print(f"Received: {response.data}")

Configuration

You can configure the following options:

  • timeout: Overall timeout for the request operation (float, seconds)
  • stall: Stall timer, useful in scatter-gather scenarios where subsequent responses are expected within a certain timeframe (float, seconds)
  • max_messages: Maximum number of messages to receive (int)
  • sentinel: Function that stops returning responses once it returns True for a message (Callable[[Msg], bool])
import nats
import natsext

nc = await nats.connect()

# With all options
async for msg in natsext.request_many(
    nc,
    "subject",
    b"request data",
    timeout=5.0,
    stall=0.1,
    max_messages=3,
    sentinel=None,  # Don't use sentinel here to show max_messages working
):
    print(f"Received: {msg.data}")

Default Sentinel

The package includes a default_sentinel function that stops receiving messages once a message with an empty payload is received:

import nats
import natsext

nc = await nats.connect()

async for msg in natsext.request_many(
    nc, "subject", b"request", sentinel=natsext.default_sentinel
):
    print(f"Received: {msg.data}")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

natsext-0.1.2.tar.gz (4.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

natsext-0.1.2-py3-none-any.whl (5.3 kB view details)

Uploaded Python 3

File details

Details for the file natsext-0.1.2.tar.gz.

File metadata

  • Download URL: natsext-0.1.2.tar.gz
  • Upload date:
  • Size: 4.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.15

File hashes

Hashes for natsext-0.1.2.tar.gz
Algorithm Hash digest
SHA256 3a4bd66004e03586a3e2ed3de504dda600b188e940b3260e86577dd3aa4c935e
MD5 dc356453fba33b7dedcee7cd8bf3f895
BLAKE2b-256 e61ee8d87a471238d142d2ddff263f8411e169b5afed6a5160caa78fdb5bd51c

See more details on using hashes here.

File details

Details for the file natsext-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: natsext-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 5.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.15

File hashes

Hashes for natsext-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 1be1c02242d2a9d237a98926c7a8146b97f1a35f10b28e0d4f8184c2d2e607fa
MD5 f1fc4249d44de1423182d41ca1f93b35
BLAKE2b-256 672451e61fadafc4e6b8fe05a19f4767c747b9174cf664a49516a747aad5bd73

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page