Skip to main content

Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika.

Project description

MRSAL AMQP

Release Python 3.10Mrsal Workflow

Intro

Mrsal is a simple to use message broker abstraction on top of RabbitMQ, aio-pika and Pika. The goal is to make Mrsal trivial to re-use in all services of a distributed system and to make the use of advanced message queing protocols easy and safe. No more big chunks of repetive code across your services or bespoke solutions to handle dead letters.

Mrsal is Arabic for a small arrow and is used to describe something that performs a task with lightness and speed.

Quick Start guide

0. Requirements

  1. RabbitMQ server up and running
  2. python 3.10 >=
  3. tested on linux only

1. Installing

First things first:

poetry add mrsal

Next set the default username, password and servername for your RabbitMQ setup. It's advisable to use a .env script or (.zsh)rc file for persistence.

[RabbitEnvVars]
RABBITMQ_USER=******
RABBITMQ_PASSWORD=******
RABBITMQ_VHOST=******
RABBITMQ_DOMAIN=******
RABBITMQ_PORT=******

# FOR TLS
RABBITMQ_CAFILE=/path/to/file
RABBITMQ_CERT=/path/to/file
RABBITMQ_KEY=/path/to/file
Mrsal was first developed by NeoMedSys and the research group CRAI at the univeristy hospital of Oslo.

2. Setup and connect

  • Example 1: Lets create a blocking connection on localhost with no TLS encryption
from mrsal.amqp.subclass import MrsalBlockingAMQP

mrsal = MrsalBlockingAMQP(
    host=RABBITMQ_DOMAIN,  # Use a custom domain if you are using SSL e.g. mrsal.on-example.com
    port=int(RABBITMQ_PORT),
    credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD),
    virtual_host=RABBITMQ_VHOST,
    ssl=False # Set this to True for SSL/TLS (you will need to set the cert paths if you do so)
)

# boom you are staged for connection. This instantiation stages for connection only

2.1 Publish

Now lets publish our message of friendship on the friendship exchange. Note: When auto_declare=True means that MrsalAMQP will create the specified exchange and queue, then bind them together using routing_key in one go. If you want to customize each step then turn off auto_declare and specify each step yourself with custom arguments etc.

# BasicProperties is used to set the message properties
prop = pika.BasicProperties(
        app_id='zoomer_app',
        message_id='zoomer_msg',
        content_type=' application/json',
        content_encoding='utf-8',
        delivery_mode=pika.DeliveryMode.Persistent,
        headers=None)

message_body = {'zoomer_message': 'Get it yia bish'}

# Publish the message to the exchange to be routed to queue
mrsal.publish_message(exchange_name='zoomer_x',
                        exchange_type='direct',
                        queue_name='zoomer_q',
                        routing_key='zoomer_key',
                        message=message_body, 
                        prop=prop,
                        auto_declare=True)

2.2 Consume

Now lets setup a consumer that will listen to our very important messages. If you are using scripts rather than notebooks then it's advisable to run consume and publish separately. We are going to need a callback function which is triggered upon receiving the message from the queue we subscribe to. You can use the callback function to activate something in your system.

Note:

  • If you start a consumer with callback_with_delivery_info=True then your callback function should have at least these params (method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str).
  • If not, then it should have at least (message_param: str)
  • We can use pydantic BaseModel classes to enforce types in the body
from pydantic import BaseModel

class ZoomerNRJ(BaseModel):
    zoomer_message: str

def consumer_callback_with_delivery_info(
     method_frame: pika.spec.Basic.Deliver,
     properties: pika.spec.BasicProperties,
     body: str
     ):
    if 'Get it' in body:
        app_id = properties.app_id
        msg_id = properties.message_id
        print(f'app_id={app_id}, msg_id={msg_id}')
        print('Slay with main character vibe')
    else:
        raise SadZoomerEnergyError('Zoomer sad now')

mrsal.start_consumer(
        queue_name='zoomer_q',
        exchange_name='zoomer_x',
        callback_args=None,  # no need to specifiy if you do not need it
        callback=consumer_callback_with_delivery_info,
        auto_declare=True,
        auto_ack-False
    )

Done! Your first message of zommerism has been sent to the zoomer queue on the exchange of Zoomeru in a blocking connection. Lets see how we can do it in async in the next step.

3. Setup and Connect Async

Its usually the best practise to use async consumers if high throughput is expected. We can easily do this by adjusting the code a little bit to fit the framework of async connection in python.

from mrsal.amqp.subclass import MrsalAsyncAMQP

mrsal = MrsalAsyncAMQP(
    host=RABBITMQ_DOMAIN,  # Use a custom domain if you are using SSL e.g. mrsal.on-example.com
    port=int(RABBITMQ_PORT),
    credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD),
    virtual_host=RABBITMQ_VHOST,
    ssl=False # Set this to True for SSL/TLS (you will need to set the cert paths if you do so)
)

# boom you are staged for async connection.

3.1 Consume

Lets go turbo and set up the consumer in async for efficient AMQP handling

import asyncio
from pydantic import BaseModel

class ZoomerNRJ(BaseModel):
    zoomer_message: str

async def consumer_callback_with_delivery_info(
     method_frame: pika.spec.Basic.Deliver,
     properties: pika.spec.BasicProperties,
     body: str
     ):
    if 'Get it' in body:
        app_id = properties.app_id
        msg_id = properties.message_id
        print(f'app_id={app_id}, msg_id={msg_id}')
        print('Slay with main character vibe')
    else:
        raise SadZoomerEnergyError('Zoomer sad now')

asyncio.run(mrsal.start_consumer(
        queue_name='zoomer_q',
        exchange_name='zoomer_x',
        callback_args=None,  # no need to specifiy if you do not need it
        callback=consumer_callback_with_delivery_info,
        auto_declare=True,
        auto_ack-False
    ))

That simple! You have now setups for full advanced message queueing protocols that you can use to promote friendship or other necessary communication between your services in both blocking or async connections.

Note! There are many parameters and settings that you can use to set up a more sophisticated communication protocol in both blocking or async connection with pydantic BaseModels to enforce data types in the expected payload.

References


Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mrsal-1.3.0b0.tar.gz (26.5 kB view details)

Uploaded Source

Built Distribution

mrsal-1.3.0b0-py3-none-any.whl (26.0 kB view details)

Uploaded Python 3

File details

Details for the file mrsal-1.3.0b0.tar.gz.

File metadata

  • Download URL: mrsal-1.3.0b0.tar.gz
  • Upload date:
  • Size: 26.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.0 CPython/3.11.10 Linux/6.5.0-1025-azure

File hashes

Hashes for mrsal-1.3.0b0.tar.gz
Algorithm Hash digest
SHA256 59a5a4049fc4e2462d5a417d7d2f1d3e210c4e2fe129f9c64772d6e6ba03f56c
MD5 3c9a313f89eaec8cb7304f7c47b4dcdf
BLAKE2b-256 fa7557304dabdcddbb66460dfb54443f6d7e1b0cb20c33807bf89157872d9fd3

See more details on using hashes here.

File details

Details for the file mrsal-1.3.0b0-py3-none-any.whl.

File metadata

  • Download URL: mrsal-1.3.0b0-py3-none-any.whl
  • Upload date:
  • Size: 26.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.0 CPython/3.11.10 Linux/6.5.0-1025-azure

File hashes

Hashes for mrsal-1.3.0b0-py3-none-any.whl
Algorithm Hash digest
SHA256 1d087af6108a15e746da0fbe8ced7aa3c09ec16464963c524f667faf4483e829
MD5 666b8414e0c185dfd0d986e879dd949a
BLAKE2b-256 6959143942787157c405460d8cf35580432107dbb6cb733a44c46efe5ef4c2ca

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page