NATS integration for taskiq
Project description
Taskiq NATS
Taskiq-nats is a plugin for taskiq that adds NATS broker. This package has support for NATS JetStream.
Installation
To use this project you must have installed core taskiq library:
pip install taskiq taskiq-nats
Usage
Here's a minimal setup example with a broker and one task.
Default NATS broker.
import asyncio
from taskiq_nats import NatsBroker, JetStreamBroker
broker = NatsBroker(
[
"nats://nats1:4222",
"nats://nats2:4222",
],
queue="random_queue_name",
)
@broker.task
async def my_lovely_task():
print("I love taskiq")
async def main():
await broker.startup()
await my_lovely_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
NATS broker based on JetStream
import asyncio
from taskiq_nats import (
PushBasedJetStreamBroker,
PullBasedJetStreamBroker
)
broker = PushBasedJetStreamBroker(
servers=[
"nats://nats1:4222",
"nats://nats2:4222",
],
queue="awesome_queue_name",
)
# Or you can use pull based variant
broker = PullBasedJetStreamBroker(
servers=[
"nats://nats1:4222",
"nats://nats2:4222",
],
durable="awesome_durable_consumer_name",
)
@broker.task
async def my_lovely_task():
print("I love taskiq")
async def main():
await broker.startup()
await my_lovely_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
NatsBroker configuration
Here's the constructor parameters:
servers- a single string or a list of strings with nats nodes addresses.subject- name of the subect that will be used to exchange tasks between workers and clients.queue- optional name of the queue. By default NatsBroker broadcasts task to all workers, but if you want to handle every task only once, you need to supply this argument.result_backend- custom result backend.task_id_generator- custom function to generate task ids.- Every other keyword argument will be sent to
nats.connectfunction.
JetStreamBroker configuration
Common
servers- a single string or a list of strings with nats nodes addresses.subject- name of the subect that will be used to exchange tasks between workers and clients.stream_name- name of the stream where subjects will be located.queue- a single string or a list of strings with nats nodes addresses.result_backend- custom result backend.task_id_generator- custom function to generate task ids.stream_config- a config for stream.consumer_config- a config for consumer.
PushBasedJetStreamBroker
queue- name of the queue. It's used to share messages between different consumers.
PullBasedJetStreamBroker
durable- durable name of the consumer. It's used to share messages between different consumers.pull_consume_batch- maximum number of message that can be fetched each time.pull_consume_timeout- timeout for messages fetch. If there is no messages, we start fetching messages again.
NATS Result Backend
It's possible to use NATS JetStream to store tasks result.
import asyncio
from taskiq_nats import PullBasedJetStreamBroker
from taskiq_nats.result_backend import NATSObjectStoreResultBackend
result_backend = NATSObjectStoreResultBackend(
servers="localhost",
)
broker = PullBasedJetStreamBroker(
servers="localhost",
).with_result_backend(
result_backend=result_backend,
)
@broker.task
async def awesome_task() -> str:
return "Hello, NATS!"
async def main() -> None:
await broker.startup()
task = await awesome_task.kiq()
res = await task.wait_result()
print(res)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskiq_nats-0.6.0.tar.gz.
File metadata
- Download URL: taskiq_nats-0.6.0.tar.gz
- Upload date:
- Size: 7.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3335fc416e9872a6ed3dbb0af3b51d1d61f23aaa9983c91337c16534abe71c72
|
|
| MD5 |
837af1b8a1b29818c90bf66b0d02d8a5
|
|
| BLAKE2b-256 |
db4a35a179f3bb4e41048fe68401cd3fae91c371e9b877f9266b3d8e80fb13e6
|
File details
Details for the file taskiq_nats-0.6.0-py3-none-any.whl.
File metadata
- Download URL: taskiq_nats-0.6.0-py3-none-any.whl
- Upload date:
- Size: 8.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3d08288cbcca1da500663a45a3c33f03160721380e61cb2351e962f4805cadba
|
|
| MD5 |
2b124693728b44498b121a460f8ac34a
|
|
| BLAKE2b-256 |
62b06b564579476b26c1e881e051e256c62ba3eb4a0f522fb536531e35f150d2
|