No project description provided
Project description
Django NATS
Features
- Wrapper of NATS's nats-py
- Django management command to listen for incoming NATS messages
- Automatically serialize/deserialize message from/to JSON format
- Easy-to-call method for publishing NATS messages
- Support NATS JetStream pull subscription
Installation
pip install django-nats-client
Setup
-
Add
nats_client
intoINSTALLED_APPS
# settings.py INSTALLED_APPS = [ ... 'nats_client', ]
-
Put NATS connection configuration in settings
# settings.py NATS_SERVERS = 'nats://localhost:4222' NATS_NAMESPACE = 'foo'
Usage
Listen for messages
-
Create a new callback method and register
# common/nats.py import nats_client @nats_client.register def new_message(message: str): print(message) @nats_client.register def get_year_from_date(date: str): return date.year # custom name @nats_client.register('get_current_time') def current_time(): return datetime.datetime.now().strftime('%H:%M') # without decorator def current_time(): return datetime.datetime.now().strftime('%H:%M') nats_client.register('get_current_time', current_time) # JetStream @nats_client.register(js=True) def new_message(message: str): print(message) # JetStream from other namespace @nats_client.register(namespace='bar', js=True) def new_message_from_bar(message: str): print(message)
-
Import previously file in
ready
method of yourapps.py
# common/apps.py class CommonConfig(AppConfig): ... def ready(self): import common.nats
-
Run listener management command
python manage.py nats_listener # or with autoreload enabled (suite for development) python manage.py nats_listener --reload
Publishing message
import nats_client
arg = 'some arg'
await nats_client.publish(
'subject_name',
'method_name',
arg,
keyword_arg=1,
another_keyword_arg=2,
)
Examples
import nats_client
await nats_client.publish('default', 'new_message', 'Hello, world!')
await nats_client.publish('default', 'project_created', 1, name='ACME')
# JetStream
await nats_client.publish('default', 'new_message', 'Hello, world!', _js=True)
await nats_client.js_publish('default', 'new_message', 'Hello, world!')
Request-Reply
import nats_client
arg = 'some arg'
await nats_client.request(
'subject_name',
'method_name',
arg,
keyword_arg=1,
another_keyword_arg=2,
)
Examples
import nats_client
year = await nats_client.request('default', 'get_year_from_date', datetime.date(2022, 1, 1)) # 2022
current_time = await nats_client.request('default', 'get_current_time') # 12:11
Settings
Key | Type | Required | Default | Description |
---|---|---|---|---|
NATS_SERVERS |
str | list[str] |
Yes | NATS server's host(s) | |
NATS_NAMESPACE |
str |
No | 'default' |
Main namespace using for prefixing subject, stream name, and etc. |
NATS_REQUEST_TIMEOUT |
int |
No | 1 |
Timeout when using request() (in seconds) |
NATS_OPTIONS |
dict |
No | {} |
Other configuration to be passed in nats.connect() |
NATS_JETSTREAM_ENABLED |
bool |
No | True |
Enable JetStream |
NATS_JETSTREAM_DURABLE_NAME |
str |
No | settings.NATS_NAMESPACE |
Durable name which is unique across all subscriptions |
NATS_JETSTREAM_CREATE_STREAM |
bool |
No | True |
Automatically create stream named in NATS_NAMESPACE |
NATS_JETSTREAM_CONFIG |
dict |
No | {} |
Extra configuration for JetStream streams |
Development
Requirements
- Docker
- Python
- Poetry
Linting
make lint
Testing
make test
Fix Formatting
make yapf
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for django_nats_client-0.4.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bde1e951afc36cf276c1f1d0965474c00a3cc384154dee708f58c45d83824740 |
|
MD5 | 298c8ae5ef4926dfb12ae02dd958166e |
|
BLAKE2b-256 | 1642849cf93ac94449909702df23a73b7f96e35110a48c5fc787ce798a705bd7 |