Pub/Sub for microservice on django
Project description
Easy communication for django based microservices
Library provides tools for:
- Publisher/Subscription pattern
- Sending async command
- REST API
Installation
pip install git+https://github.com/ksinn/django-microservices-communication
In requirements.txt file
...
Django==4.2
git+https://github.com/ksinn/django-microservices-communication
django-cors-headers==3.14.0
...
Installation in Docker If pip install execute in docker, you require git in image.
Add 'services_communication' to your INSTALLED_APPS setting.
INSTALLED_APPS = [
...
'services_communication',
]
Any global settings are kept in a single configuration dictionary named MICROSERVICES_COMMUNICATION_SETTINGS. Start off by adding the following to your settings.py module:
MICROSERVICES_COMMUNICATION_SETTINGS = {
'APP_ID': 'my-service',
'BROKER_CONNECTION_URL': 'amqp://guest:guest@localhost:5672',
'QUEUE': 'my-queue',
'EXCHANGES': [
'my-exchange1',
('my-other-exchange', 'fanout'),
'exchange3',
],
'BINDS': [
('my-exchange1', 'event.*'),
'my-other-exchange',
],
'REST_API_HOST': 'http://api.example.com',
'REST_API_USERNAME': 'username',
'REST_API_PASSWORD': 'password',
}
Defaults:
- exchange type - topic
- bind routing key - '#'
Async communication
Consuming
Write logical consuming function in file 'consumers.py' in django app
some_project/
| some_project/
| settings.py
| urls.py
| some_app/
| __init__.py
| admin.py
| apps.py
| consumers.py <----
| models.py
| tests.py
| viwes.py
| some_other_app/
| __init__.py
| admin.py
| apps.py
| consumers.py <----
| models.py
| tests.py
| viwes.py
Consumer function must be registered in message router. Basic consumer function mast accept 2 positional arguments: routing key and message body.
Example consumers.py file:
from services_communication.consumer import message_router
@message_router.consumer('my-exchange1', 'event.update')
@message_router.consumer('my-exchange1', 'event.create')
@message_router.consumer('my-other-exchange') # For get all routing keys
@message_router.consumer() # For get all exchange (default consumer)
def stupid_consume_function(routing_key, body):
print(routing_key, body)
@message_router.default_consumer # For get message not routed to other consumers
def stupid_consume_function(routing_key, body):
print(payload)
If you want to consume aggregate event, use decorator @event_consumer and after then consumer function mast accept only on positional argument event payload and other event data as kwargs Example consumers.py file:
from services_communication.consumer import message_router
@message_router.consumer('my-exchange1', 'event.update')
@message_router.consumer('my-exchange1', 'event.create')
@message_router.consumer('my-ether_exchange') # For get all routing keys
@event_consumer
def stupid_consume_function(payload, **kwargs):
print(payload)
Run consumer
python manage.py runconsumer
Or user devconsumer for auto reloading on change files
Publishing
Publishing in transaction For publish event happened with aggregate in transaction use publish_aggregate_event
from services_communication.publisher import publish_aggregate_event
def update_user_name(user, new_name):
user.name = new_name
user.save()
publish_aggregate_event(
aggregate='user',
event_type='update.base',
payload=build_user_data(user),
)
This function save event data in db table. Then publisher process will read the event from the table and publish it to the broker in exchange same as aggregate name with routing key same as event type, event_type and body:
{
"eventId": "2",
"eventTime": "2023-06-02T10:58:58.340174Z",
"eventType": "update.base",
"aggregate": "user",
"payload": {
...
},
}
Run publisher process
python manage.py runpublisher
Or user devpublisher for auto reloading on change files
Commands
A command is a way of telling remote service to do something without waiting for a response from it.
For send command immediately, without regard to transactionality, use send_command with service name and payloads as arguments.
You can set timeout in seconds for command executing, this means that the executor of the command should not execute it if the specified time has passed since the time it was sent (called send_command function).
from services_communication.call import send_command
send_command(
'sms',
{
'phone': '998990000000',
'text': 'Hello world!',
}
)
If remote service has any commands, you may want to use optional argument command_name.
Sync communication
REST API
For request endpoint use method functions from rest_api package.
from services_communication.rest_api import get, post, head, delete
from services_communication.rest_api.formatter import full_response
first_subject = get('api/v1/subjects/1') # return only response body as dict
first_subject = get(
'api/v1/subjects',
params={
'page': 2,
'size': 20,
},
) # sending query params
response = get('api/v1/subjects/1', response_formatter=full_response) # return response object
new_subject = post(
'api/v1/subjects',
json={
'name': 'My new subject',
'order': 5,
},
) # sending request body
In all methods function you can send additional keyword argument, it was sent to request.
For formatting request and response uoy can send custom function as request_formatter and response_formatter keyword arguments.
request_formatter will be applied to other request arguments(params, json, data).
response_formatter will be applied to response and it result be returned from method.
By default:
- get, post, delete methods return response.json
- head method return full response
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file django-microservices-communication-2.1.7.tar.gz
.
File metadata
- Download URL: django-microservices-communication-2.1.7.tar.gz
- Upload date:
- Size: 16.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 328e79a67431c9a59493a12aea7754a2c11332433bbfc468c0e117d1870e948b |
|
MD5 | a0a0eda6d959bf9afea2e39d281d3285 |
|
BLAKE2b-256 | 124cbc7a5f1e0dbb998e3295cfd68f50391f29d77044dba965997f1fc27a1ba9 |
File details
Details for the file django_microservices_communication-2.1.7-py2-none-any.whl
.
File metadata
- Download URL: django_microservices_communication-2.1.7-py2-none-any.whl
- Upload date:
- Size: 24.1 kB
- Tags: Python 2
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 67ec738145987d48931bfb38940c0e5da89c9f444f03ab1f26aa60e3b05d0bc0 |
|
MD5 | b596f31eb98b5a526aefef6c91e3c9e8 |
|
BLAKE2b-256 | 134303e572f0ec6cb90365e668cf149a0cb1bc172aefde5d18491882967435eb |