Skip to main content

NATS integration for taskiq

Project description

Taskiq NATS

PyPI - Python Version PyPI PyPI - Downloads

Taskiq-nats is a plugin for taskiq that adds NATS broker. This package has support for NATS JetStream.

Installation

To use this project you must have installed core taskiq library:

pip install taskiq taskiq-nats

Usage

Here's a minimal setup example with a broker and one task.

Default NATS broker.

import asyncio
from taskiq_nats import NatsBroker, JetStreamBroker

broker = NatsBroker(
    [
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    queue="random_queue_name",
)


@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()

    await my_lovely_task.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

NATS broker based on JetStream

import asyncio
from taskiq_nats import (
    PushBasedJetStreamBroker,
    PullBasedJetStreamBroker
)

broker = PushBasedJetStreamBroker(
    servers=[
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    queue="awesome_queue_name",
)

# Or you can use pull based variant
broker = PullBasedJetStreamBroker(
    servers=[
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    durable="awesome_durable_consumer_name",
)


@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()

    await my_lovely_task.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

NatsBroker configuration

Here's the constructor parameters:

  • servers - a single string or a list of strings with nats nodes addresses.
  • subject - name of the subect that will be used to exchange tasks between workers and clients.
  • queue - optional name of the queue. By default NatsBroker broadcasts task to all workers, but if you want to handle every task only once, you need to supply this argument.
  • result_backend - custom result backend.
  • task_id_generator - custom function to generate task ids.
  • Every other keyword argument will be sent to nats.connect function.

JetStreamBroker configuration

Common

  • servers - a single string or a list of strings with nats nodes addresses.
  • subject - name of the subect that will be used to exchange tasks between workers and clients.
  • stream_name - name of the stream where subjects will be located.
  • queue - a single string or a list of strings with nats nodes addresses.
  • result_backend - custom result backend.
  • task_id_generator - custom function to generate task ids.
  • stream_config - a config for stream.
  • consumer_config - a config for consumer.

PushBasedJetStreamBroker

  • queue - name of the queue. It's used to share messages between different consumers.

PullBasedJetStreamBroker

  • durable - durable name of the consumer. It's used to share messages between different consumers.
  • pull_consume_batch - maximum number of message that can be fetched each time.
  • pull_consume_timeout - timeout for messages fetch. If there is no messages, we start fetching messages again.

NATS Result Backend

It's possible to use NATS JetStream to store tasks result.

import asyncio
from taskiq_nats import PullBasedJetStreamBroker
from taskiq_nats.result_backend import NATSObjectStoreResultBackend


result_backend = NATSObjectStoreResultBackend(
    servers="localhost",
)
broker = PullBasedJetStreamBroker(
    servers="localhost",
).with_result_backend(
    result_backend=result_backend,
)


@broker.task
async def awesome_task() -> str:
    return "Hello, NATS!"


async def main() -> None:
    await broker.startup()
    task = await awesome_task.kiq()
    res = await task.wait_result()
    print(res)
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_nats-0.6.0.tar.gz (7.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_nats-0.6.0-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_nats-0.6.0.tar.gz.

File metadata

  • Download URL: taskiq_nats-0.6.0.tar.gz
  • Upload date:
  • Size: 7.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for taskiq_nats-0.6.0.tar.gz
Algorithm Hash digest
SHA256 3335fc416e9872a6ed3dbb0af3b51d1d61f23aaa9983c91337c16534abe71c72
MD5 837af1b8a1b29818c90bf66b0d02d8a5
BLAKE2b-256 db4a35a179f3bb4e41048fe68401cd3fae91c371e9b877f9266b3d8e80fb13e6

See more details on using hashes here.

File details

Details for the file taskiq_nats-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: taskiq_nats-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 8.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for taskiq_nats-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3d08288cbcca1da500663a45a3c33f03160721380e61cb2351e962f4805cadba
MD5 2b124693728b44498b121a460f8ac34a
BLAKE2b-256 62b06b564579476b26c1e881e051e256c62ba3eb4a0f522fb536531e35f150d2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page