Skip to main content

Pika bus wrapper for amqp messaging with RabbitMq

Project description

ReadTheDocs Drone CI Latest Version https://img.shields.io/pypi/pyversions/pikabus.svg https://img.shields.io/pypi/l/pikabus.svg

The PikaBus library is a wrapper around pika to make it easy to implement the messages, events and command pattern, as described in detail here:

Features

  • Secure messaging with amqp enabled by default, which includes:
    • Durable and mirrored queues on all nodes.
    • Persistent messages, meaning no messages are lost after a node restart.
    • Delivery confirms with RabbitMq publisher confirms.
    • Mandatory delivery turned on by default to guarantee at least once delivery.
  • Object oriented API with short and easy-to-use interface.
  • Fault-tolerant with auto-reconnect retry logic and state recovery.

Installation

pip install PikaBus

Example

import asyncio
import pika
import datetime
from PikaBus.abstractions.AbstractPikaBus import AbstractPikaBus
from PikaBus.PikaBusSetup import PikaBusSetup


def MessageHandlerMethod(**kwargs):
    """
    A message handler method may simply be a method with som **kwargs.
    The **kwargs will be given all incoming pipeline data, the bus and the incoming payload.
    """
    data: dict = kwargs['data']
    bus: AbstractPikaBus = kwargs['bus']
    payload: dict = kwargs['payload']
    print(payload)
    if payload['reply']:
        payload['reply'] = False
        bus.Reply(payload=payload)


# Use pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials)

# Create a PikaBusSetup instance with a listener queue, and add the message handler method.
pikaBusSetup = PikaBusSetup(connParams,
                            defaultListenerQueue='myQueue',
                            defaultSubscriptions='myTopic')
pikaBusSetup.AddMessageHandler(MessageHandlerMethod)

# Start consuming messages from the queue.
consumingTasks = pikaBusSetup.StartAsync()

# Create a temporary bus to subscribe on topics and send, defer or publish messages.
bus = pikaBusSetup.CreateBus()
bus.Subscribe('myTopic')
payload = {'hello': 'world!', 'reply': True}

# To send a message means sending a message explicitly to one receiver.
bus.Send(payload=payload, queue='myQueue')

# To defer a message means sending a message explicitly to one receiver with some delay before it is processed.
bus.Defer(payload=payload, delay=datetime.timedelta(seconds=1), queue='myQueue')

# To publish a message means publishing a message on a topic received by any subscribers of the topic.
bus.Publish(payload=payload, topic='myTopic')

input('Hit enter to stop all consuming channels \n\n')
pikaBusSetup.Stop()

# Wait for the consuming tasks to complete safely.
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*consumingTasks))

Quick Start

Clone PikaBus repo:

git clone https://github.com/hansehe/PikaBus.git

Start local RabbitMq instance with Docker:

docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=amqp -e RABBITMQ_DEFAULT_PASS=amqp -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Open RabbitMq admin (user=amqp, password=amqp) at:

http://localhost:15672/

Then, run the example:

pip install PikaBus
python ./Examples/basic_example.py

Try restarting RabbitMq to notice how PikaBus tolerates downtime:

docker stop rabbit
docker start rabbit

Send or publish more messages to the running PikaBus consumer with:

python ./Examples/send_example.py
python ./Examples/publish_example.py

License

The project is licensed under the MIT license.

Versioning

This software follows Semantic Versioning

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for PikaBus, version 1.2.1
Filename, size File type Python version Upload date Hashes
Filename, size PikaBus-1.2.1-py3-none-any.whl (21.5 kB) File type Wheel Python version py3 Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page