Skip to main content

Provide easy connection to rabbitmq server.

Project description

rabbitmq-utils

This package will provide easy connection to rabbitmq server.

Producer

Following sample code will allow you to send the message to desire queue.

In RabbitMQProducer class Publisher Confirms is implemented, So it will tell you weather the message is send to desire location or not.

Note: In order to use default exchange, use '' as exchange name. When using the default exchange then routing key will be the name of queue.

from rabbitmq_utils import RabbitMQProducer
import json

# DEFINING MESSAGE
message = json.dumps({'hello': 'world'})

# SENDING
rmqp = RabbitMQProducer(
    host='localhost', port=5672, virtual_host='/', 
    username='guest', password='guest', 
    exchange='test_exc', exchange_type='topic'
)
is_sent = rmqp.sendMessage(
    message,
    routing_key='test_key'
)

# RESULT
if is_sent:
    print('INFO: Message sent.')
else:
    print('ERROR: Unable to send on desire routing key.')

Consumer

RabbitMQConsumer class allow you define queue, define exchange and bind queue and exchange using routing key.

Queue and Exchange are consider to be durable. If you are getting some error then remove the existing queue and exchange, before running this code.

Callback function is called when message is received from the rabbitmq server. So define your callback function using following example:

def my_callback_function(ch, method, properties, body):
    # GETTING MESSAGE
    message = body.decode()
    
    # PERFORM YOUR LOGIC HERE
    myLogic()
    
    # ACKNOWLEDGE WORK IS DONE
    ch.basic_ack(delivery_tag=method.delivery_tag)
    return None

Following sample code will allow you to receive message from rabbitmq server.

from rabbitmq_utils import RabbitMQConsumer

# STARTING RABBITMQ CONSUMER
rmqc = RabbitMQConsumer(
        host='localhost', port=5672, virtual_host='/', 
        username='guest', password='guest', 
        queue_name='test_que', routing_key='test_key',
    	exchange='test_exc', exchange_type='topic',
    	my_callback_function
)
rmqc.receiveMessage()

Remote Procedure Call (RPC)

RPC allow to run a function on a remote computer and wait for the result. It is a synchronous call. This package provide simplest implementation of RPC.

RPC consist of two parts. One is the server that will process the request and other is client that will generate the request to server. Following are example of RPC implementation.

Server

from rabbitmq_utils.rpc import RPCServer

# STARTING RPC SERVER
server = RPCServer(
        host='localhost', port=5672, virtual_host='/', 
        username='guest', password='guest', 
        queue_name='test_que', routing_key='test_key',
    	exchange='test_exc', exchange_type='topic',
    	callback_fun=rpc_callback_function
)
server.receiveMessage()

Callback function of RPC is different from consumer callback function. In this callback we will return the result back to client.

Note: result must be string. If it is not then use json.dumps(result) to convert it to string.

def rpc_callback_function(ch, method, properties, body):
    # GETTING MESSAGE
    message = body.decode()
    
    # PERFORM YOUR LOGIC HERE
    result = myLogic()
    
    # RETURING RESPONSE
    ch.basic_publish(
        exchange='', routing_key=properties.reply_to,
        properties=pika.BasicProperties(
            correlation_id = properties.correlation_id
        ),
        body=result
    )
    
    # ACKNOWLEDGE WORK IS DONE
    ch.basic_ack(delivery_tag=method.delivery_tag)
    return None

Client

from rabbitmq_utils.rpc import RPCClient
import json

# DEFINING MESSAGE
message = json.dumps({'hello': 'world'})

# SENDING
client = RPCClient(
    host='localhost', port=5672, virtual_host='/', 
    username='guest', password='guest', 
    exchange='test_exc', exchange_type='topic',
    timeout=3 # wait 3 seconds for response. default is None (infinite wait).
)
is_sent, response = client.sendMessage(
    message,
    routing_key='test_key',
    return_response=True
)

# OUTPUT
print(f'is_sent: {is_sent} \t code: {client.getCode()} \t response: {response}')

Client sendMessage receive return_response argument (default=False). If this is True then client will wait for response for desire timeout period. You can receive response later if you want by using follow sample code:

# SEND REQUEST
is_sent = client.sendMessage(
    message,
    routing_key
)

# PERFORM YOUR LOGIC

# RECEIVE RESPONSE
response = client.receiveResponse()

Always check the validity of response using status code. Following code will help you check it:

status_code = client.getCode()
print(status_code)

Code is integer. Following table shows the meanings:

Code Meaning
200 Response is successfully obtained.
408 Timeout occur.

Author

Tahir Rafique

Releases

Date Version Summary
14-Jul-23 v1.2.1 Correcting documentation.
21-Jun-23 v1.2.0 Adding RPC to module.
27-Apr-23 v1.0.1 Improving default callback function.
27-Apr-23 v1.0.0 Initial build

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rabbitmq-utils-1.2.1.tar.gz (8.1 kB view details)

Uploaded Source

Built Distribution

rabbitmq_utils-1.2.1-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file rabbitmq-utils-1.2.1.tar.gz.

File metadata

  • Download URL: rabbitmq-utils-1.2.1.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.10

File hashes

Hashes for rabbitmq-utils-1.2.1.tar.gz
Algorithm Hash digest
SHA256 7dae8a03d0c35fe2d82c1d560c9b67aaa382f3d14442933f045fed9c6d65c372
MD5 523d0fd45e3ac3085475615cba78d3c5
BLAKE2b-256 ae3996fb902b2a4ba8c5a08ea986ef72d3f4965ec18d1cfc296aa22fe96b3a8d

See more details on using hashes here.

Provenance

File details

Details for the file rabbitmq_utils-1.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for rabbitmq_utils-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 882306ecde85bc0903f5803efca707207b7042b764a437f82e5c07e7cc453e5e
MD5 9a7797c6c474146acdccdb13b8f7d9d6
BLAKE2b-256 d40386f21495607bbd1629c5fddbe478efec9c6c6355ad6b54599b5bd2de608b

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page