Skip to main content

Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika.

Project description

MRSAL 20200907_104224

Release Python 3.10 Documentation

Tests Status Coverage Status Flake8 Status

Intro

Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika. The goal is to make Mrsal trivial to re-use in all services of a distributed system and to make the use of advanced message queing protocols easy and safe. No more big chunks of repetive code across your services or bespoke solutions to handle dead letters.

Mrsal is Arabic for a small arrow and is used to describe something that performs a task with lightness and speed.

Quick Start guide

0. Install

First things first:

pip install mrsal

We need to install RabbitMQ to use Mrsal. Head over to install RabbitMQ. Make sure to stick to the configuration that you give the installation throughout this guide. You can also use the Dockerfile and the docker-compose that we are using in the full guide.

Next set the default username, password and servername for your RabbitMQ setup. It's advisable to use a .env script or the rc file for persistence.

[RabbitEnvVars]
RABBITMQ_DEFAULT_USER=******
RABBITMQ_DEFAULT_PASS=******
RABBITMQ_DEFAULT_VHOST=******
RABBITMQ_DOMAIN=******
RABBITMQ_DOMAIN_TLS=******

RABBITMQ_GUI_PORT=******
RABBITMQ_PORT=******
RABBITMQ_PORT_TLS=******

# FOR TLS
RABBITMQ_CAFILE=/path/to/file
RABBITMQ_CERT=/path/to/file
RABBITMQ_KEY=/path/to/file

Please read the full guide to understand what Mrsal currently can and can't do.

Mrsal was first developed by NeoMedSys and the research group CRAI at the univeristy hospital of Oslo.

1. Setup and connect

The first thing we need to do is to setup our rabbit server before we can subscribe and publish to it. Lets set up a server on our localhost with the port and credentials we used when spinning up the docker-compose

import json
import pika
from mrsal.mrsal import Mrsal

# If you want to use SSL for external listening then set it to True
SSL = False

# Note RabbitMQ container is listening on:
# 1. When SSL is False the default port 5672 which is exposed to RABBITMQ_PORT in docker-compose
# 2. When SSL is True the default port 5671 which is exposed to RABBITMQ_PORT_TLS in docker-compose
port = RABBITMQ_PORT_TLS if SSL else RABBITMQ_PORT
host = RABBITMQ_DOMAIN_TLS if SSL else RABBITMQ_DOMAIN

# It should match with the env specifications (RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS)
credentials=(RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS)

# It should match with the env specifications (RABBITMQ_DEFAULT_VHOST)
v_host = RABBITMQ_DEFAULT_VHOST

mrsal = Mrsal(
    host=host,
    port=port,
    credentials=credentials,
    virtual_host=v_host,
    ssl=SSL
)

mrsal.connect_to_server()

2 Publish

Now lets publish our message of friendship on the friendship exchange. Note: When fast_setup=True that means Mrsal will create the specified exchange and queue, then bind them together using routing_key.

# BasicProperties is used to set the message properties
prop = pika.BasicProperties(
        app_id='friendship_app',
        message_id='friendship_msg',
        content_type='text/plain',
        content_encoding='utf-8',
        delivery_mode=pika.DeliveryMode.Persistent,
        headers=None)

message_body = 'Hello'

# Publish the message to the exchange to be routed to queue
mrsal.publish_message(exchange='friendship',
                        exchange_type='direct',
                        queue='friendship_queue',
                        routing_key='friendship_key',
                        message=json.dumps(message_body), 
                        prop=prop,
                        fast_setup=True)

3 Consume

Now lets setup a consumer that will listen to our very important messages. If you are using scripts rather than notebooks then it's advisable to run consume and publish separately. We are going to need a callback function which is triggered upon receiving the message from the queue we subscribe to. You can use the callback function to activate something in your system.

Note:

  • If you start a consumer with callback_with_delivery_info=True then your callback function should have at least these params (method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str).
  • If not, then it should have at least (message_param: str)
import json

def consumer_callback_with_delivery_info(host_param: str, queue_param: str, method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str):
    str_message = json.loads(message_param).replace('"', '')
    if 'Hello' in str_message:
        app_id = properties.app_id
        msg_id = properties.message_id
        print(f'app_id={app_id}, msg_id={msg_id}')
        print('Hola habibi')
        return True  # Consumed message processed correctly
    return False

def consumer_callback(host_param: str, queue_param: str, message_param: str):
    str_message = json.loads(message_param).replace('"', '')
    if 'Hello' in str_message:
        print('Hola habibi')
        return True  # Consumed message processed correctly
    return False

mrsal.start_consumer(
        queue='friendship_queue',
        callback=consumer_callback_with_delivery_info,
        callback_args=(test_config.HOST, 'friendship_queue'),
        inactivity_timeout=1,
        requeue=False,
        fast_setup=True,
        callback_with_delivery_info=True
    )

Done! Your first message of friendship has been sent to the friendship queue on the exchange of friendship.

3 Concurrent Consumers

Sometimes we need to start multiple consumers to listen to the same queue and process received messages concurrently. You can do that by calling start_concurrence_consumer which takes total_threads param in addition to the same parameters used in start_consumer. This method will create a thread pool and spawn new Mrsal object and start new consumer for every thread.

import json
import time

import pika
from pika.exchange_type import ExchangeType

import mrsal.config.config as config
import tests.config as test_config
from mrsal.mrsal import Mrsal


mrsal = Mrsal(host=test_config.HOST,
              port=config.RABBITMQ_PORT,
              credentials=config.RABBITMQ_CREDENTIALS,
              virtual_host=config.V_HOST)
mrsal.connect_to_server()

APP_ID="TEST_CONCURRENT_CONSUMERS"
EXCHANGE="GoodFriends"
EXCHANGE_TYPE='direct'
QUEUE_EMERGENCY="alleSindInkludiert"  # place the excluded (but no fundamentalist danke) in an emergency queue  
NUM_THREADS=3
NUM_MESSAGES=3
INACTIVITY_TIMEOUT=30 # time out after 30 seconds
ROUTING_KEY="bleib-cool"
MESSAGE_ID="Bleib cool und alles wird besser"

def test_concurrent_consumer():
    # Start concurrent consumers
    mrsal.start_concurrence_consumer(total_threads=NUM_THREADS, queue=QUEUE_EMERGENCY,
                                     callback=consumer_callback_with_delivery_info,
                                     callback_args=(test_config.HOST, QUEUE_EMERGENCY),
                                     exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE,
                                     routing_key=ROUTING_KEY,
                                     inactivity_timeout=INACTIVITY_TIMEOUT,
                                     fast_setup=True,
                                     callback_with_delivery_info=True)

    mrsal.close_connection()

def consumer_callback_with_delivery_info(host_param: str, queue_param: str, method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str):
    time.sleep(5)
    return True

That simple! You have now setup a full advanced message queueing protocol that you can use to promote friendship or other necessary communication between your services.

Note! Please refer to the >>>FULL GUIDE<<< on how to use customize Mrsal to meet specific needs. There are many parameters and settings that you can use to set up a more sophisticated communication protocol.

References


Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mrsal-0.7.1a0.tar.gz (28.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mrsal-0.7.1a0-py3-none-any.whl (26.9 kB view details)

Uploaded Python 3

File details

Details for the file mrsal-0.7.1a0.tar.gz.

File metadata

  • Download URL: mrsal-0.7.1a0.tar.gz
  • Upload date:
  • Size: 28.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.0 CPython/3.10.3 Linux/5.15.0-91-generic

File hashes

Hashes for mrsal-0.7.1a0.tar.gz
Algorithm Hash digest
SHA256 c5f34752c025e1b99f262bc8b3428217eff8185dd397bb086fb04b1606625efc
MD5 abbd6061f48bdc123a30fc07586c80e8
BLAKE2b-256 2b153a9495d0ee075541ad96a64b6286cef5cdd731109aee657a190feb0e063b

See more details on using hashes here.

File details

Details for the file mrsal-0.7.1a0-py3-none-any.whl.

File metadata

  • Download URL: mrsal-0.7.1a0-py3-none-any.whl
  • Upload date:
  • Size: 26.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.0 CPython/3.10.3 Linux/5.15.0-91-generic

File hashes

Hashes for mrsal-0.7.1a0-py3-none-any.whl
Algorithm Hash digest
SHA256 3e932f96f4e1d042f65269e26a69a595ec1437722224e82d383fcd03ba4aa178
MD5 1eca794bae15a9b873713c6de547e6e9
BLAKE2b-256 4c3669f9d3b419d7ac5097d03904430aafafd25c1c0b375b3905716c058cd3a8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page