A simple rabbitmq client for asyncio applications based on the awesome aio-pika library!
Project description
aio-easy-rabbit
An opinionated way to use RabbitMQ in Asyncio based frameworks.
Installation
to be continued ...
Usage example
Simple consumer example in a FastAPI
from fastapi import FastAPI
from aio_easy_rabbit.connection import (
connect_to_rabbitmq,
start_listening_to_queue,
)
app = FastAPI()
@rabbitmq_consumer("test_queue")
async def test_consumer(message):
print(message)
@app.on_event("startup")
async def startup_event():
registry = ConsumerRegistry()
rabbitmq_connection_string = "amqp://guest:guest@localhost"
app.state.rabbitmq_connection = await connect_to_rabbitmq(
rabbitmq_connection_string
)
for queue_name in registry.get_registered_queues():
asyncio.create_task(
start_listening_to_queue(app.state.rabbitmq_connection, queue_name)
)
Key component is the ConsumerRegistry
that will manage state of all registered queues and their consuming functions.
Note: Each decorated consumer will receive a raw string that needs to be serialized as pleased.
Simple publisher example in FastAPI
from fastapi import FastAPI
from aio_easy_rabbit.connection import (
connect_to_rabbitmq
)
from aio_easy_rabbit.producers import RabbitMQPublisher
app = FastAPI()
class Message(BaseModel):
message: str
@app.post("/test/{queue_name}/")
async def test_endpoint(queue_name: str, message: Message):
await app.state.rabbitmq_publisher.publish_message(queue_name, message)
return {"message": "Message published"}
@app.on_event("startup")
async def startup_event():
rabbitmq_connection_string = "amqp://guest:guest@localhost"
app.state.rabbitmq_connection = await connect_to_rabbitmq(
rabbitmq_connection_string
)
app.state.rabbitmq_publisher = RabbitMQPublisher(rabbitmq_connection_string)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aio-easy-rabbit-0.0.1.tar.gz
(4.3 kB
view hashes)
Built Distribution
Close
Hashes for aio_easy_rabbit-0.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 080d12e6a9fa9693703d92c8b719576f3d622a5403d54b445da45548492680f5 |
|
MD5 | 4999bf20cfef87e675f3b4c423f9112f |
|
BLAKE2b-256 | 19b0b46bf2ef70702cc5a2bce7ae5919af3408633757d733dd7c34867ef44fb0 |