A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.
Project description
Introduction
A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.
Requirements
- Python >=3.7.3
- Pika ==1.2.0
- Aio-pika ==6.8.0
Installation
pip install rctiplus-rabbitmq-python-sdk
Getting latest version
pip install rctiplus-rabbitmq-python-sdk --upgrade
Usage
To start using this SDK, you may follow given instructions bellow in order.
Payload handler
First, you need to create a payload class handler that implement MessagePayload. For example, we want to make a class to handle JSON payload:
import json
from rctiplus_rabbitmq_python_sdk import MessagePayload
class JSONPayload(MessagePayload):
"""Example class to handle JSON payload
"""
def __init__(self, firstname: str, lastname: str) -> None:
self.firstname = firstname
self.lastname = lastname
def __str__(self) -> str:
"""Convert JSON to string payload message
Returns:
str: String payload message
"""
return json.dumps({
'firstname': self.firstname,
'lastname': self.lastname
})
@classmethod
def from_str(cls, message: str) -> 'JSONPayload':
"""Generate data from JSON string payload message
Returns:
JSONPayload: Generated data
"""
payload = json.loads(message)
return cls(firstname=payload['firstname'], lastname=payload['lastname'])
MessagePayload class from the SDK's core has this functions that require to implemented:
class MessagePayload:
"""Python RabbitMQ message payload
"""
@classmethod
def from_str(cls, message: str) -> 'MessagePayload':
"""Generate data from specified string payload message format
Raises:
NotImplementedError: Raise an error if not implemented
"""
raise NotImplementedError()
def __str__(self) -> str:
"""Convert specified data format to string payload message
Raises:
NotImplementedError: Raise an error if not implemented
Returns:
str: String payload message
"""
raise NotImplementedError()
Connect to RabbitMQ
Making connection to RabbitMQ server can be done by doing this simple way:
from rctiplus_rabbitmq_python_sdk import RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')
Sending message
After you have payload class handler & connected to the RabbitMQ server, now you can try to send a messsage to queue channel. For example, we will send JSON payload message to test queue:
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)
Receiving message
Great. Now, in our consumer app, we want to listen & receive that message, and then doing some stuff:
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
data = JSONPayload.from_str(body)
print(f'data: firstname={data.firstname}, lastname={data.lastname}')
conn.receive('test', callback)
Putting it all together
Here is the complete example from the code above:
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload
# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
"""Example class to handle JSON payload
"""
def __init__(self, firstname: str, lastname: str) -> None:
self.firstname = firstname
self.lastname = lastname
def __str__(self) -> str:
"""Convert JSON to string payload message
Returns:
str: String payload message
"""
return json.dumps({
'firstname': self.firstname,
'lastname': self.lastname
})
@classmethod
def from_str(cls, message: str) -> 'JSONPayload':
"""Generate data from JSON string payload message
Returns:
JSONPayload: Generated data
"""
payload = json.loads(message)
return cls(firstname=payload['firstname'], lastname=payload['lastname'])
# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')
# Example sender app
# Send payload to queue
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)
# Example consumer app
# Create a callback to be executed immadiately after recieved a message
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
# Generate data from string payload message
data = JSONPayload.from_str(body)
print(f'data: firstname={data.firstname}, lastname={data.lastname}')
# Receive & listen messages from queue channel
conn.receive('test', callback)
License
GNU General Public License v3
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rctiplus-rabbitmq-python-sdk-1.0.0.tar.gz.
File metadata
- Download URL: rctiplus-rabbitmq-python-sdk-1.0.0.tar.gz
- Upload date:
- Size: 17.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
01ab72c0888869fb714223dd389c48bacd71bbdfc06aae09292e0b6aea1e0136
|
|
| MD5 |
324d2e6a2e0e7149ef5670310ce08515
|
|
| BLAKE2b-256 |
d133cc4734c4629f4698ea797c4d500647d05925337765608cb6baf7c406b63d
|
File details
Details for the file rctiplus_rabbitmq_python_sdk-1.0.0-py3-none-any.whl.
File metadata
- Download URL: rctiplus_rabbitmq_python_sdk-1.0.0-py3-none-any.whl
- Upload date:
- Size: 21.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e69d40726ca360e9b48cdf5193c2adea01eb1e9fddc537df0d2e99df62e9fc98
|
|
| MD5 |
40264a23e73716c47150e05b99d425ee
|
|
| BLAKE2b-256 |
644d4a70fdfeb43f48f9f1d46659b479a0e35d08625d8b533f4f27317686779a
|