Django NATS Consumer
Project description
django-nats-consumer
NATS + Django = ⚡️
Installation
Please pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.
pip install django-nats-consumer
Usage
settings.py
INSTALLED_APPS = [
...
"nats_consumer",
...
]
NATS_CONSUMER = {
"connect_args": {
"servers": ["nats://localhost:4222"],
"allow_reconnect": True,
"max_reconnect_attempts": 5,
"reconnect_time_wait": 1,
"connect_timeout": 10,
},
}
{app_name}/consumers.py
# Consumers need to be in the consumers module in order to be loaded,
# or you can import them to force them to be loaded.
from nats_consumer import JetstreamPushConsumer
import logging
from nats_consumer import JetstreamPushConsumer, operations
logger = logging.getLogger(__name__)
class OrderConsumer(JetstreamPushConsumer):
stream_name = "orders"
subjects = [
"orders.created",
]
# You need to setup the streams
async def setup(self):
return [
operations.CreateStream(
name=self.stream_name,
subjects=self.subjects,
storage="file"
),
]
async def handle_message(self, message):
# The message only shows if its logged as error
logger.error(f"Received message: {message.data}")
publish.py
import asyncio
from nats_consumer import get_nats_client
async def publish_messages():
ns = await get_nats_client()
js = ns.jetstream()
for i in range(5):
data = {"id": i, "name": f"Order {i}"}
data_b = json.dumps(data).encode("utf-8")
print(f"Publishing message {i}...")
await js.publish("orders.created", data_b)
if __name__ == "__main__":
asyncio.run(publish_messages())
Running Consumers
To run a single consumer:
python manage.py nats_consumer OrderConsumer --setup
To run multiple consumers:
python manage.py nats_consumer OrderConsumer AnotherConsumer
To run all consumers:
python manage.py nats_consumer
Additional Options:
# Enable uvloop for better performance
python manage.py nats_consumer --uvloop
# Enable auto-reload for development (watches for file changes)
python manage.py nats_consumer --reload
# Combine options
python manage.py nats_consumer OrderConsumer --uvloop --reload
Note When running your code in production, uvloop typically provides better performance than the default asyncio event loop, but it's only available on Unix-like systems (Linux, macOS). To use uvloop, please install using:
pip install django-nats-consumer[uvloop]
And add the following to your settings.py:
NATS_CONSUMER = {
"event_loop_policy": "uvloop.EventLoopPolicy",
...
}
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_nats_consumer-1.3.0.tar.gz.
File metadata
- Download URL: django_nats_consumer-1.3.0.tar.gz
- Upload date:
- Size: 10.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.0 CPython/3.10.16 Linux/6.8.0-1020-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8d72b7663400c6061ba2c643d1518591f2a9f112985f8563834234dc47e1d8f
|
|
| MD5 |
b4f410593ffd4ca397f0a2146dd0b77e
|
|
| BLAKE2b-256 |
1bf00f381ba1efb9d01830a0906150f6ca002be45e4f618c08ca6d161e6295d7
|
File details
Details for the file django_nats_consumer-1.3.0-py3-none-any.whl.
File metadata
- Download URL: django_nats_consumer-1.3.0-py3-none-any.whl
- Upload date:
- Size: 11.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.0 CPython/3.10.16 Linux/6.8.0-1020-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac996f0d06433ae21e5fc93943f7b4d0a855e172d76c70cc805549ad82fe1af5
|
|
| MD5 |
9ac6e0f206573e97ae7b46cd2e37a340
|
|
| BLAKE2b-256 |
e794dd6be61dd3f77af85b41d34a4600d97715ee099ad0f8e4abf6988b3c1c14
|