amqp-worker is a Python-based multi-threaded RabbitMQ consumer framework
Project description
🐰amqp-worker
English | 简体中文
amqp-worker is a Python-based multi-threaded RabbitMQ consumer framework. It allows you to consume messages more efficiently and stably.
Features
- Batch consumption: process messages in batches, improve consumption efficiency.
- Automatic reconnection: when RabbitMQ service disconnects, amqp-worker will automatically reconnect, ensuring uninterrupted consumption.
- Customizable consumption mode: freely decide to use multi-threading and coroutines in the consumption function.
- Configurable message acknowledgment mode: support automatic acknowledgment and manual acknowledgment modes, configure according to your consumption needs.
- Configurable exception handling: support global configuration of message exception consumption mode, re-enter queue, re-insert, consume message.
Installation
You can use pip tool to install amqp-worker:
pip install amqp-workers
Usage
First, you need to import the amqp_worker module in your Python code:
from amqpworker.app import App
Then, you need to instantiate an App object, and the App object depends on the AMQPConnection object:
from amqpworker.connections import AMQPConnection
amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)
app = App(connections=[amqp_conn])
Next, you need to define the consumption function:
@app.amqp.consume(
['test'],
options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
)
def _handler(msgs: List[RabbitMQMessage]):
print(f"Recv {len(msgs)} {datetime.now().isoformat()}")
In the above code we give the consumption function a decorator, giving the consumption queue, the number of consumption per batch, it is worth noting that the parameter type of the consumption function is List[RabbitMQMessage]
Finally, just call the run
method to start consuming:
app.run()
Example code
Below is a simple example code that will consume messages from a queue named test
:
from datetime import datetime
from typing import List
from amqpworker.app import App
from amqpworker.connections import AMQPConnection
from amqpworker.rabbitmq import RabbitMQMessage
from amqpworker.routes import AMQPRouteOptions
amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)
app = App(connections=[amqp_conn])
@app.amqp.consume(
['test'],
options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
)
def _handler(msgs: List[RabbitMQMessage]):
print(f"Recv {len(msgs)} {datetime.now().isoformat()}")
app.run()
Contributors
License
amqp-worker uses MIT license. Please refer to LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for amqp_workers-0.1.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e48b241eee0f7e929c6e36922c88a89b0f2df4c5fcd44bfe6164410df4f23054 |
|
MD5 | 7bd01483cb349f9fda6cc31853414547 |
|
BLAKE2b-256 | 80b912eb990fb98fb76ea143047c779b6ef9db8d4a10d6e01213553eb024ce94 |