NATS integration for taskiq
Project description
Taskiq NATS
Taskiq-nats is a plugin for taskiq that adds NATS broker. This package has support for NATS JetStream.
Installation
To use this project you must have installed core taskiq library:
pip install taskiq taskiq-nats
Usage
Here's a minimal setup example with a broker and one task.
Default NATS broker.
import asyncio
from taskiq_nats import NatsBroker, JetStreamBroker
broker = NatsBroker(
[
"nats://nats1:4222",
"nats://nats2:4222",
],
queue="random_queue_name",
)
@broker.task
async def my_lovely_task():
print("I love taskiq")
async def main():
await broker.startup()
await my_lovely_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
NATS broker based on JetStream
import asyncio
from taskiq_nats import (
PushBasedJetStreamBroker,
PullBasedJetStreamBroker
)
broker = PushBasedJetStreamBroker(
servers=[
"nats://nats1:4222",
"nats://nats2:4222",
],
queue="awesome_queue_name",
)
# Or you can use pull based variant
broker = PullBasedJetStreamBroker(
servers=[
"nats://nats1:4222",
"nats://nats2:4222",
],
durable="awesome_durable_consumer_name",
)
@broker.task
async def my_lovely_task():
print("I love taskiq")
async def main():
await broker.startup()
await my_lovely_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
NatsBroker configuration
Here's the constructor parameters:
servers
- a single string or a list of strings with nats nodes addresses.subject
- name of the subect that will be used to exchange tasks betwee workers and clients.queue
- optional name of the queue. By default NatsBroker broadcasts task to all workers, but if you want to handle every task only once, you need to supply this argument.result_backend
- custom result backend.task_id_generator
- custom function to generate task ids.- Every other keyword argument will be sent to
nats.connect
function.
JetStreamBroker configuration
Common
servers
- a single string or a list of strings with nats nodes addresses.subject
- name of the subect that will be used to exchange tasks betwee workers and clients.stream_name
- name of the stream where subjects will be located.queue
- a single string or a list of strings with nats nodes addresses.result_backend
- custom result backend.task_id_generator
- custom function to generate task ids.stream_config
- a config for stream.consumer_config
- a config for consumer.
PushBasedJetStreamBroker
queue
- name of the queue. It's used to share messages between different consumers.
PullBasedJetStreamBroker
durable
- durable name of the consumer. It's used to share messages between different consumers.pull_consume_batch
- maximum number of message that can be fetched each time.pull_consume_timeout
- timeout for messages fetch. If there is no messages, we start fetching messages again.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
taskiq_nats-0.4.0.tar.gz
(4.1 kB
view details)
Built Distribution
File details
Details for the file taskiq_nats-0.4.0.tar.gz
.
File metadata
- Download URL: taskiq_nats-0.4.0.tar.gz
- Upload date:
- Size: 4.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.9.18 Linux/6.2.0-1019-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c6b16ccd4bff83260437c6797f903e234033083ca1417bcb2a7ef6e059f5d404 |
|
MD5 | 79c6c8fb19913abd92136ae2b7ce38ff |
|
BLAKE2b-256 | dfd48b551fd393d67fbc13a31703222e7740033664a0fd2a4ac7ff00a072997d |
File details
Details for the file taskiq_nats-0.4.0-py3-none-any.whl
.
File metadata
- Download URL: taskiq_nats-0.4.0-py3-none-any.whl
- Upload date:
- Size: 4.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.9.18 Linux/6.2.0-1019-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1b8c81ad7891a7fb725199d32c9754bd25f437c0dec138ee6b0d08cbb98b2925 |
|
MD5 | 5ed293522dee1843ee798d56b3f4ec56 |
|
BLAKE2b-256 | 18e5c013e5b65624ac470df434d0f649baeb78b58d0a5e244c0722510ea72c13 |