Skip to main content

Pika bus wrapper for amqp messaging with RabbitMq

Project description

ReadTheDocs Drone CI Latest Version https://img.shields.io/pypi/pyversions/pikabus.svg https://img.shields.io/pypi/l/pikabus.svg

The PikaBus library is a wrapper around pika to make it easy to implement the messages, events and command pattern, as described in detail here:

Features

  • Secure messaging with amqp enabled by default, which includes:
    • Durable and mirrored queues on all nodes.

    • Persistent messages, meaning no messages are lost after a node restart.

    • Delivery confirms with RabbitMq publisher confirms.

    • Mandatory delivery turned on by default to guarantee at least once delivery.

  • Object oriented API with short and easy-to-use interface.

  • Fault-tolerant with auto-reconnect retry logic and state recovery.

Installation

pip install PikaBus

Example

import asyncio
import pika
import datetime
from PikaBus.abstractions.AbstractPikaBus import AbstractPikaBus
from PikaBus.PikaBusSetup import PikaBusSetup


def MessageHandlerMethod(**kwargs):
    """
    A message handler method may simply be a method with som **kwargs.
    The **kwargs will be given all incoming pipeline data, the bus and the incoming payload.
    """
    data: dict = kwargs['data']
    bus: AbstractPikaBus = kwargs['bus']
    payload: dict = kwargs['payload']
    print(payload)
    if payload['reply']:
        payload['reply'] = False
        bus.Reply(payload=payload)


# Use pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials)

# Create a PikaBusSetup instance with a listener queue, and add the message handler method.
pikaBusSetup = PikaBusSetup(connParams,
                            defaultListenerQueue='myQueue',
                            defaultSubscriptions='myTopic')
pikaBusSetup.AddMessageHandler(MessageHandlerMethod)

# Start consuming messages from the queue.
consumingTasks = pikaBusSetup.StartAsync()

# Create a temporary bus to subscribe on topics and send, defer or publish messages.
bus = pikaBusSetup.CreateBus()
bus.Subscribe('myTopic')
payload = {'hello': 'world!', 'reply': True}

# To send a message means sending a message explicitly to one receiver.
bus.Send(payload=payload, queue='myQueue')

# To defer a message means sending a message explicitly to one receiver with some delay before it is processed.
bus.Defer(payload=payload, delay=datetime.timedelta(seconds=1), queue='myQueue')

# To publish a message means publishing a message on a topic received by any subscribers of the topic.
bus.Publish(payload=payload, topic='myTopic')

input('Hit enter to stop all consuming channels \n\n')
pikaBusSetup.Stop()

# Wait for the consuming tasks to complete safely.
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*consumingTasks))

Quick Start

Clone PikaBus repo:

git clone https://github.com/hansehe/PikaBus.git

Start local RabbitMq instance with Docker:

docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=amqp -e RABBITMQ_DEFAULT_PASS=amqp -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Open RabbitMq admin (user=amqp, password=amqp) at:

http://localhost:15672/

Then, run the example:

pip install PikaBus
python ./Examples/basic_example.py

Try restarting RabbitMq to notice how PikaBus tolerates downtime:

docker stop rabbit
docker start rabbit

Send or publish more messages to the running PikaBus consumer with:

python ./Examples/send_example.py
python ./Examples/publish_example.py

Contribute

License

The project is licensed under the MIT license.

Versioning

This software follows Semantic Versioning

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

PikaBus-1.1.0-py3-none-any.whl (21.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page