A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.
Project description
Introduction
A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.
Requirements
- Python >=3.7.3
- Pika ==1.2.0
Installation
pip install rctiplus-rabbitmq-python-sdk
Getting latest version
pip install rctiplus-rabbitmq-python-sdk --upgrade
Usage
To start using this SDK, you may follow given instructions bellow in order.
Payload handler
First, you need to create a payload class handler that implement MessagePayload
. For example, we want to make a class to handle JSON payload:
import json
from rctiplus_rabbitmq_python_sdk import MessagePayload
class JSONPayload(MessagePayload):
"""Example class to handle JSON payload
"""
def __init__(self, firstname: str, lastname: str) -> None:
self.firstname = firstname
self.lastname = lastname
def __str__(self) -> str:
"""Convert JSON to string payload message
Returns:
str: String payload message
"""
return json.dumps({
'firstname': self.firstname,
'lastname': self.lastname
})
@classmethod
def from_str(cls, message: str) -> 'JSONPayload':
"""Generate data from JSON string payload message
Returns:
JSONPayload: Generated data
"""
payload = json.loads(message)
return cls(firstname=payload['firstname'], lastname=payload['lastname'])
MessagePayload
class from the SDK's core has this functions that require to implemented:
class MessagePayload:
"""Python RabbitMQ message payload
"""
@classmethod
def from_str(cls, message: str) -> 'MessagePayload':
"""Generate data from specified string payload message format
Raises:
NotImplementedError: Raise an error if not implemented
"""
raise NotImplementedError()
def __str__(self) -> str:
"""Convert specified data format to string payload message
Raises:
NotImplementedError: Raise an error if not implemented
Returns:
str: String payload message
"""
raise NotImplementedError()
Connect to RabbitMQ
Making connection to RabbitMQ server can be done by doing this simple way:
from rctiplus_rabbitmq_python_sdk import RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')
Sending message
After you have payload class handler & connected to the RabbitMQ server, now you can try to send a messsage to queue channel. For example, we will send JSON payload message to test
queue:
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)
Receiving message
Great. Now, in our consumer app, we want to listen & receive that message, and then doing some stuff:
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
data = JSONPayload.from_str(body)
print(f'data: firstname={data.firstname}, lastname={data.lastname}')
conn.receive('test', callback)
Putting it all together
Here is the complete example from the code above:
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload
# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
"""Example class to handle JSON payload
"""
def __init__(self, firstname: str, lastname: str) -> None:
self.firstname = firstname
self.lastname = lastname
def __str__(self) -> str:
"""Convert JSON to string payload message
Returns:
str: String payload message
"""
return json.dumps({
'firstname': self.firstname,
'lastname': self.lastname
})
@classmethod
def from_str(cls, message: str) -> 'JSONPayload':
"""Generate data from JSON string payload message
Returns:
JSONPayload: Generated data
"""
payload = json.loads(message)
return cls(firstname=payload['firstname'], lastname=payload['lastname'])
# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')
# Example sender app
# Send payload to queue
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)
# Example consumer app
# Create a callback to be executed immadiately after recieved a message
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
# Generate data from string payload message
data = JSONPayload.from_str(body)
print(f'data: firstname={data.firstname}, lastname={data.lastname}')
# Receive & listen messages from queue channel
conn.receive('test', callback)
License
GNU General Public License v3
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for rctiplus-rabbitmq-python-sdk-0.0.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | ae7ab1f757d4dba437bd65455c67530d4bb341a9b5496230d298e11026756f97 |
|
MD5 | 4ef189c1160b8c14c11e6449d6fd59b5 |
|
BLAKE2b-256 | a88135e84411e19e022bc58072825777a093897683863b76db030cbb3cb438ff |
Hashes for rctiplus_rabbitmq_python_sdk-0.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d43ddd05daa159b8bb86a91dcdc745d6d8bb96c2505a960164c92a4e7e3c7d14 |
|
MD5 | beb0752a237a7b9c733b7b095c28749e |
|
BLAKE2b-256 | 803e5f0c4afba6a675464f1a4465f41da1274186eff95f46ae2890870a04f2b1 |