Skip to main content

Django NATS Consumer

Project description

django-nats-consumer

NATS + Django = ⚡️

Installation

Please pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.

I hope you find some value in it - writing a good consumer takes some finesse.

pip install django-nats-consumer

Usage

settings.py

INSTALLED_APPS = [
    ...
    "nats_consumer",
]

NATS_CONSUMER = {
    "connect_args": {
        "servers": ["nats://localhost:4222"],
        "allow_reconnect": True,
        "max_reconnect_attempts": 5,
        "reconnect_time_wait": 1,
        "connect_timeout": 10,
    },
}

{app_name}/consumers.py

# Consumers need to be in the consumers module in order to be loaded,
# or you can import them to force them to be loaded.
from nats_consumer import JetstreamPushConsumer

import logging

from nats_consumer import JetstreamPushConsumer, operations

logger = logging.getLogger(__name__)


class OrderConsumer(JetstreamPushConsumer):
    stream_name = "orders"
    subjects = [
        "orders.created",
    ]

    # You need to setup the streams
    async def setup(self):
        return [
            operations.CreateStream(
                name=self.stream_name,
                subjects=self.subjects,
                storage="file"
            ),
        ]

    async def handle_message(self, message):
        # The message only shows if its logged as error
        logger.error(f"Received message: {message.data}")

publish.py

import asyncio

from nats_consumer import get_nats_client

async def publish_messages():
    ns = await get_nats_client()
    js = ns.jetstream()
    for i in range(5):
        data = {"id": i, "name": f"Order {i}"}
        data_b = json.dumps(data).encode("utf-8")
        print(f"Publishing message {i}...")
        await js.publish("orders.created", data_b)

if __name__ == "__main__":
    asyncio.run(publish_messages())

Running Consumers

To run a single consumer:

python manage.py nats_consumer OrderConsumer --setup

To run multiple consumers:

python manage.py nats_consumer OrderConsumer AnotherConsumer

To run all consumers:

python manage.py nats_consumer

Feature roadmap

  • Encoding/decoding of messages (json, protobuf, etc)
  • Better error handling, configurable retry
  • Better log output from the consumer
  • Configurable DLQ strategies
  • [insert your feature here]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_nats_consumer-1.2.0.tar.gz (9.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_nats_consumer-1.2.0-py3-none-any.whl (10.7 kB view details)

Uploaded Python 3

File details

Details for the file django_nats_consumer-1.2.0.tar.gz.

File metadata

  • Download URL: django_nats_consumer-1.2.0.tar.gz
  • Upload date:
  • Size: 9.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.0 CPython/3.10.16 Linux/6.8.0-1020-azure

File hashes

Hashes for django_nats_consumer-1.2.0.tar.gz
Algorithm Hash digest
SHA256 b97c21e28cb1619488fba339bdcc64f207486108d4b76b6c65a108dd1a2b633d
MD5 ddf182d9ede936993cb913fce107e9a2
BLAKE2b-256 dfd2364064384bdc99848a649401655a3aae09ba26e11798d6c4ce4f1fca25db

See more details on using hashes here.

File details

Details for the file django_nats_consumer-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: django_nats_consumer-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 10.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.0 CPython/3.10.16 Linux/6.8.0-1020-azure

File hashes

Hashes for django_nats_consumer-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ef40ffafa3d86908bd95f374134d1846cfb4b4f7438cf6d86a4a4e64437d4cce
MD5 f879f1bf0895aac337356dffe8975f7a
BLAKE2b-256 f01e6982b6403661b1639b7682f08cdc5897cf699d373c75300f2fd5d2ca2519

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page