Skip to main content

amqp-worker is a Python-based multi-threaded RabbitMQ consumer framework

Project description

🐰amqp-worker

English | 简体中文

amqp-worker is a Python-based multi-threaded RabbitMQ consumer framework. It allows you to consume messages more efficiently and stably.

Features

  • Batch consumption: process messages in batches, improve consumption efficiency.
  • Automatic reconnection: when RabbitMQ service disconnects, amqp-worker will automatically reconnect, ensuring uninterrupted consumption.
  • Customizable consumption mode: freely decide to use multi-threading and coroutines in the consumption function.
  • Configurable message acknowledgment mode: support automatic acknowledgment and manual acknowledgment modes, configure according to your consumption needs.
  • Configurable exception handling: support global configuration of message exception consumption mode, re-enter queue, re-insert, consume message.

Installation

You can use pip tool to install amqp-worker:

pip install amqp-workers

Usage

First, you need to import the amqp_worker module in your Python code:

from amqpworker.app import App

Then, you need to instantiate an App object, and the App object depends on the AMQPConnection object:

from amqpworker.connections import AMQPConnection
amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)

app = App(connections=[amqp_conn])

Next, you need to define the consumption function:

@app.amqp.consume(
    ['test'],
    options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
)
def _handler(msgs: List[RabbitMQMessage]):
    print(f"Recv {len(msgs)} {datetime.now().isoformat()}")

In the above code we give the consumption function a decorator, giving the consumption queue, the number of consumption per batch, it is worth noting that the parameter type of the consumption function is List[RabbitMQMessage]

Finally, just call the run method to start consuming:

app.run()

Example code

Below is a simple example code that will consume messages from a queue named test:

from datetime import datetime
from typing import List

from amqpworker.app import App
from amqpworker.connections import AMQPConnection
from amqpworker.rabbitmq import RabbitMQMessage
from amqpworker.routes import AMQPRouteOptions

amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)
app = App(connections=[amqp_conn])

@app.amqp.consume(
    ['test'],
    options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
)
def _handler(msgs: List[RabbitMQMessage]):
    print(f"Recv {len(msgs)} {datetime.now().isoformat()}")

app.run()

Contributors

License

amqp-worker uses MIT license. Please refer to LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amqp_workers-0.1.5.tar.gz (20.2 kB view details)

Uploaded Source

Built Distribution

amqp_workers-0.1.5-py3-none-any.whl (28.5 kB view details)

Uploaded Python 3

File details

Details for the file amqp_workers-0.1.5.tar.gz.

File metadata

  • Download URL: amqp_workers-0.1.5.tar.gz
  • Upload date:
  • Size: 20.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.9.5 Windows/10

File hashes

Hashes for amqp_workers-0.1.5.tar.gz
Algorithm Hash digest
SHA256 3260555d1d851a4b04b2e70603824f5a0266ed13bd8f30e6a0516bb9be5edeef
MD5 6e1ebc67c3af2458a8387306d4cb2b79
BLAKE2b-256 69cfa8ded93b7a1b5a6695b6022ecbe3b3990a7bf87102e4f7b7809c3b347373

See more details on using hashes here.

File details

Details for the file amqp_workers-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: amqp_workers-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 28.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.9.5 Windows/10

File hashes

Hashes for amqp_workers-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 e48b241eee0f7e929c6e36922c88a89b0f2df4c5fcd44bfe6164410df4f23054
MD5 7bd01483cb349f9fda6cc31853414547
BLAKE2b-256 80b912eb990fb98fb76ea143047c779b6ef9db8d4a10d6e01213553eb024ce94

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page