A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.
Project description
Introduction
A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.
Requirements
- Python >=3.7.3
- Pika ==1.2.0
- Aio-pika ==6.8.0
Installation
pip install rctiplus-rabbitmq-python-sdk
Getting latest version
pip install rctiplus-rabbitmq-python-sdk --upgrade
Usage
To start using this SDK, you may follow given instructions bellow in order.
Payload handler
First, you need to create a payload class handler that implement MessagePayload
. For example, we want to make a class to handle JSON payload:
import json
from rctiplus_rabbitmq_python_sdk import MessagePayload
class JSONPayload(MessagePayload):
"""Example class to handle JSON payload
"""
def __init__(self, firstname: str, lastname: str) -> None:
self.firstname = firstname
self.lastname = lastname
def __str__(self) -> str:
"""Convert JSON to string payload message
Returns:
str: String payload message
"""
return json.dumps({
'firstname': self.firstname,
'lastname': self.lastname
})
@classmethod
def from_str(cls, message: str) -> 'JSONPayload':
"""Generate data from JSON string payload message
Returns:
JSONPayload: Generated data
"""
payload = json.loads(message)
return cls(firstname=payload['firstname'], lastname=payload['lastname'])
MessagePayload
class from the SDK's core has this functions that require to implemented:
class MessagePayload:
"""Python RabbitMQ message payload
"""
@classmethod
def from_str(cls, message: str) -> 'MessagePayload':
"""Generate data from specified string payload message format
Raises:
NotImplementedError: Raise an error if not implemented
"""
raise NotImplementedError()
def __str__(self) -> str:
"""Convert specified data format to string payload message
Raises:
NotImplementedError: Raise an error if not implemented
Returns:
str: String payload message
"""
raise NotImplementedError()
Connect to RabbitMQ
Making connection to RabbitMQ server can be done by doing this simple way:
from rctiplus_rabbitmq_python_sdk import RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')
Sending message
After you have payload class handler & connected to the RabbitMQ server, now you can try to send a messsage to queue channel. For example, we will send JSON payload message to test
queue:
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)
Receiving message
Great. Now, in our consumer app, we want to listen & receive that message, and then doing some stuff:
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
data = JSONPayload.from_str(body)
print(f'data: firstname={data.firstname}, lastname={data.lastname}')
conn.receive('test', callback)
Putting it all together
Here is the complete example from the code above:
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload
# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
"""Example class to handle JSON payload
"""
def __init__(self, firstname: str, lastname: str) -> None:
self.firstname = firstname
self.lastname = lastname
def __str__(self) -> str:
"""Convert JSON to string payload message
Returns:
str: String payload message
"""
return json.dumps({
'firstname': self.firstname,
'lastname': self.lastname
})
@classmethod
def from_str(cls, message: str) -> 'JSONPayload':
"""Generate data from JSON string payload message
Returns:
JSONPayload: Generated data
"""
payload = json.loads(message)
return cls(firstname=payload['firstname'], lastname=payload['lastname'])
# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')
# Example sender app
# Send payload to queue
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)
# Example consumer app
# Create a callback to be executed immadiately after recieved a message
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
# Generate data from string payload message
data = JSONPayload.from_str(body)
print(f'data: firstname={data.firstname}, lastname={data.lastname}')
# Receive & listen messages from queue channel
conn.receive('test', callback)
License
GNU General Public License v3
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for rctiplus-rabbitmq-python-sdk-1.0.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 361a91cf335e34d8255e122ba64fe491e3d025ad2c733105f83cb39c9dcc7eb4 |
|
MD5 | 66532c1b910083db1353ed79ab590562 |
|
BLAKE2b-256 | 0ade721871d3e0888bd4a95fccfa27205f8190d11686692da9bbfe6cabd1ba8c |
Hashes for rctiplus_rabbitmq_python_sdk-1.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e8b32acb9280d214234bb3c341403411e10305014d58979224c476deb9017f8c |
|
MD5 | df3d455e411a2a0e5e425cdae0062372 |
|
BLAKE2b-256 | 6fbd4243e4e02fb653c3aca71f177525eea8cf336474090f755a8ef34c1bcc6d |