Skip to main content

rabbit message queue client

Project description

Build Status Maintainability codecov PyPI version

rabbit-client

asyncio rabbit client powered by aioamqp.

rabbit-client provides a simple and automatic configuration to work with pub/sub and Dead Letter Exchanges with rabbitmq.

The image below exemplifies the workflow implemented by the client.

rabbit-client-workflow

Installing

Install and update using pip:

pip install -U rabbit-client

Dependencies

Setup

All values expected to configure rabbitmq can be set via environment variables.

Default values

# publish/producer
PUBLISH_EXCHANGE=default.out.exchange
PUBLISH_EXCHANGE_TYPE=topic
PUBLISH_TOPIC=#
PUBLISH_QUEUE=default.publish.queue

# subscribe/consumer
SUBSCRIBE_EXCHANGE=default.in.exchange
SUBSCRIBE_EXCHANGE_TYPE=topic
SUBSCRIBE_TOPIC=#
SUBSCRIBE_QUEUE=default.subscribe.queue

# dlx
DLX_EXCHANGE=DLX
DLX_TYPE=direct
DQL_QUEUE=default.subscribe.queue.dlq

Usage example

To run the example locally clone the project repo, install your dependencies and run:

consumer:

python test_client.py

producer:

python test_producer.py

Consumer code

import asyncio
import logging

from aiohttp import web

from rabbit.client import AioRabbitClient
from rabbit.publish import Publish
from rabbit.subscribe import Subscribe


async def handle_info(request):
    return web.json_response({'app': 'aio-rabbit-client'})


async def handle_status(request):
    return web.json_response({'status': 'UP'})


def configure_default_client(app):
    client = AioRabbitClient(app=app.loop)
    consumer = Subscribe(client, publish=Publish())
    app.loop.create_task(consumer.configure())
    app['rabbit_client'] = client


logging.getLogger().setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()
app = web.Application(loop=loop)
app.add_routes([
    web.get('/manage/health', handle_status),
    web.get('/manage/info', handle_info)
])
configure_default_client(app)
web.run_app(app, host='0.0.0.0', port=5000)

Producer code

import asyncio
import json
import os

from rabbit.client import AioRabbitClient
from rabbit.exchange import Exchange
from rabbit.publish import Publish
from rabbit.queue import Queue


loop = asyncio.get_event_loop()

publish = Publish(
    AioRabbitClient(loop),
    exchange=Exchange(
        name=os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange'),
        exchange_type=os.getenv('SUBSCRIBE_EXCHANGE_TYPE', 'topic'),
        topic=os.getenv('SUBSCRIBE_TOPIC', '#')
    ),
    queue=Queue(
        name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')
    )
)
loop.run_until_complete(publish.configure())
print(
    "[>] Event sent to: "
    f"[exchange: {os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange')}"
    f" | topic: {os.getenv('SUBSCRIBE_TOPIC', '#')} | "
    f"subscribe: {os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')}]"
)

payload = {
    'document': 1,
    'description': '123',
    'documentSearchable': None,
    'pages': [
        {
            'body': 'abc 123',
            'number': 1
        },
        {
            'body': 'def 456',
            'number': 2
        },
        {
            'body': 'ghi 789',
            'number': 3
        }
    ]
}

loop.run_until_complete(
    publish.send_event(
        bytes(json.dumps(payload), 'utf-8')
        # properties={'headers': {'x-delay': 5000}}
    )
)

Development

Install development dependencies.

make install-deps

To execute lint:

make lint

To execute tests just run:

make tests

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for rabbit-client, version 0.1.0
Filename, size File type Python version Upload date Hashes
Filename, size rabbit_client-0.1.0-py3-none-any.whl (17.4 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size rabbit-client-0.1.0.tar.gz (8.9 kB) File type Source Python version None Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page