Core NATS Extensions is a set of utilities providing additional features to Core NATS component of nats-py client.
Project description
Core NATS Extensions
Core NATS Extensions is a set of utilities providing additional features to Core NATS component of nats-py client.
Installation
uv add natsext
Utilities
see examples.py for a runnable version of all snippets below.
request_many
request_many is a utility that allows you to send a single request and await multiple responses.
This allows you to implement various patterns like scatter-gather or streaming responses.
Responses are returned in an async iterator, which you can iterate over to receive messages. When a termination condition is met, the iterator is closed (and no error is returned).
import nats
import natsext
nc = await nats.connect()
# Basic usage
async for msg in natsext.request_many(nc, "subject", b"request data"):
print(f"Received: {msg.data}")
Alternatively, use request_many_msg to send a Msg request:
import nats
from nats.aio.msg import Msg
import natsext
nc = await nats.connect()
msg = Msg(
nc,
subject="subject",
data=b"request data",
headers={
"Key": "Value",
},
)
async for response in natsext.request_many_msg(nc, msg):
print(f"Received: {response.data}")
Configuration
You can configure the following options:
timeout: Overall timeout for the request operation (float, seconds)stall: Stall timer, useful in scatter-gather scenarios where subsequent responses are expected within a certain timeframe (float, seconds)max_messages: Maximum number of messages to receive (int)sentinel: Function that stops returning responses once it returns True for a message (Callable[[Msg], bool])
import nats
import natsext
nc = await nats.connect()
# With all options
async for msg in natsext.request_many(
nc,
"subject",
b"request data",
timeout=5.0,
stall=0.1,
max_messages=3,
sentinel=None, # Don't use sentinel here to show max_messages working
):
print(f"Received: {msg.data}")
Default Sentinel
The package includes a default_sentinel function that stops receiving messages once a message with an empty payload is received:
import nats
import natsext
nc = await nats.connect()
async for msg in natsext.request_many(
nc, "subject", b"request", sentinel=natsext.default_sentinel
):
print(f"Received: {msg.data}")
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file natsext-0.4.0.tar.gz.
File metadata
- Download URL: natsext-0.4.0.tar.gz
- Upload date:
- Size: 4.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.8.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1116d013101300df4fc6bd58b6886728d8ad011296f3d5366266aa4f89e3113b
|
|
| MD5 |
4ba2ee189c73101fed1412a429514d80
|
|
| BLAKE2b-256 |
417b8883387a71f37a9b4e06f0507934dd011e4c6bc469a0ae5dfcc36b0c9922
|
File details
Details for the file natsext-0.4.0-py3-none-any.whl.
File metadata
- Download URL: natsext-0.4.0-py3-none-any.whl
- Upload date:
- Size: 5.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.8.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0aeb2c0d16cea8e52d07c1d81cecb085124065c2bf7f22b578793782a9854745
|
|
| MD5 |
a882d21182254a38aba2d810261d2cfd
|
|
| BLAKE2b-256 |
a1fd945f2b2389868e06e17348435758eaac302e910bef8b22cbfe1452a7bb1e
|