rabbit message queue client
Project description
rabbit-client
asyncio rabbit client powered by aioamqp.
rabbit-client provides a simple and automatic configuration to work with pub/sub and Dead Letter Exchanges with rabbitmq.
The image below exemplifies the workflow implemented by the client.
Installing
Install and update using pip:
pip install -U rabbit-client
Dependencies
Setup
All values expected to configure rabbitmq can be set via environment variables.
Default values
# publish/producer
PUBLISH_EXCHANGE=default.out.exchange
PUBLISH_EXCHANGE_TYPE=topic
PUBLISH_TOPIC=#
PUBLISH_QUEUE=default.publish.queue
# subscribe/consumer
SUBSCRIBE_EXCHANGE=default.in.exchange
SUBSCRIBE_EXCHANGE_TYPE=topic
SUBSCRIBE_TOPIC=#
SUBSCRIBE_QUEUE=default.subscribe.queue
# dlx
DLX_EXCHANGE=DLX
DLX_TYPE=direct
DQL_QUEUE=default.subscribe.queue.dlq
Usage example
To run the example locally clone the project repo, install your dependencies and run:
consumer
:
python test_client.py
producer
:
python test_producer.py
Consumer code
import asyncio
import logging
from aiohttp import web
from rabbit.client import AioRabbitClient
from rabbit.publish import Publish
from rabbit.subscribe import Subscribe
async def handle_info(request):
return web.json_response({'app': 'aio-rabbit-client'})
async def handle_status(request):
return web.json_response({'status': 'UP'})
def configure_default_client(app):
client = AioRabbitClient(app=app.loop)
consumer = Subscribe(client, publish=Publish())
app.loop.create_task(consumer.configure())
app['rabbit_client'] = client
logging.getLogger().setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()
app = web.Application(loop=loop)
app.add_routes([
web.get('/manage/health', handle_status),
web.get('/manage/info', handle_info)
])
configure_default_client(app)
web.run_app(app, host='0.0.0.0', port=5000)
Producer code
import asyncio
import json
import os
from rabbit.client import AioRabbitClient
from rabbit.exchange import Exchange
from rabbit.publish import Publish
from rabbit.queue import Queue
loop = asyncio.get_event_loop()
publish = Publish(
AioRabbitClient(loop),
exchange=Exchange(
name=os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange'),
exchange_type=os.getenv('SUBSCRIBE_EXCHANGE_TYPE', 'topic'),
topic=os.getenv('SUBSCRIBE_TOPIC', '#')
),
queue=Queue(
name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')
)
)
loop.run_until_complete(publish.configure())
print(
"[>] Event sent to: "
f"[exchange: {os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange')}"
f" | topic: {os.getenv('SUBSCRIBE_TOPIC', '#')} | "
f"subscribe: {os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')}]"
)
payload = {
'document': 1,
'description': '123',
'documentSearchable': None,
'pages': [
{
'body': 'abc 123',
'number': 1
},
{
'body': 'def 456',
'number': 2
},
{
'body': 'ghi 789',
'number': 3
}
]
}
loop.run_until_complete(
publish.send_event(
bytes(json.dumps(payload), 'utf-8')
# properties={'headers': {'x-delay': 5000}}
)
)
Development
Install development dependencies.
make install-deps
To execute lint:
make lint
To execute tests just run:
make tests
Links
- License: Apache License
- Code: https://github.com/amenezes/rabbit-client
- Issue Tracker: https://github.com/amenezes/rabbit-client/issues
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
rabbit-client-0.1.0.tar.gz
(8.9 kB
view hashes)
Built Distribution
Close
Hashes for rabbit_client-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b4927c4c21401f5f1b14e9003216fd79484b902835c7055b60b917f7c5154c1f |
|
MD5 | 75fcb4f0e3b4f26244b1541da27f628f |
|
BLAKE2b-256 | efe2e198def25207c16de8ac66fc815389af909b0d5d2ccd80b0a645df014610 |