Skip to main content

amqpstorm library for Flask

Project description

AMQP Service for Flask Apps

A Flask extension for AMQPStorm that provides easy integration with RabbitMQ.

Features

  • Automatic Reconnection: Uses APScheduler to monitor and reconnect to RabbitMQ if the connection drops.
  • Easy Publishing: Simple method to send JSON messages with retry logic.
  • Decorator-based Consumers: Define message consumers using simple decorators.
  • Health Checks: Built-in health check functionality.
  • Configuration: Configure via Flask app config or environment variables.

Installation

pip install amqpstorm-flask

Configuration

The extension can be configured via Flask's app.config or environment variables.

Config Key Environment Variable Default Description
MQ_URL MQ_URL None AMQP connection URL (e.g. amqp://guest:guest@localhost:5672/%2f)
MQ_EXCHANGE MQ_EXCHANGE None Default exchange name
AMQP_STORM_APSCHEDULER 1 Use APScheduler for health checks and consumers (recommended)
FILTER_LOGS 1 Filter out noisy connection logs
MQ_MAX_CONSUMER_IDLE_TIME 300 Max idle time in seconds before reconnecting
MQ_DELIMITER . Delimiter used to generate queue names from function names
MQ_QUEUES None Comma-separated list of enabled queues (if set, only these will start)

Usage

Initialization

from flask import Flask
from amqpstorm_flask import RabbitMQ

app = Flask(__name__)
app.config["MQ_URL"] = "amqp://guest:guest@localhost:5672/%2f"
app.config["MQ_EXCHANGE"] = "my_exchange"

mq = RabbitMQ(app)

Alternatively, use the factory pattern:

mq = RabbitMQ()
# ... later ...
mq.init_app(app)

Standalone Usage (Without Flask)

If you are not using Flask or want to initialize it manually without the init_app helper:

from amqpstorm_flask import RabbitMQ

mq = RabbitMQ(
    mq_url="amqp://guest:guest@localhost:5672/%2f",
    mq_exchange="my_exchange"
)
# Manual start is required if init_app is not used
mq.start()

Sending Messages

mq.send(
    body={"hello": "world"},
    routing_key="events.user.created",
    exchange_type="topic"
)

Consuming Messages

By default, the decorated function receives routing_key, body, and message_id.

@mq.queue(routing_key="events.user.#")
def handle_user_events(routing_key, body, message_id):
    print(f"Received event {routing_key}: {body}")

To receive the full AMQPStorm message object:

@mq.queue(routing_key="events.user.#", full_message_object=True)
def handle_user_events(message):
    print(f"Received body: {message.body}")
    # Manual acknowledgment if auto_ack is False
    # message.ack()

Health Check

You can use the check_health method to implement a health check endpoint:

@app.route("/health")
def health_check():
    is_ok, message = mq.check_health()
    if is_ok:
        return {"status": "ok"}, 200
    return {"status": "error", "message": message}, 503

Advanced Queue Configuration

The @mq.queue decorator supports several parameters:

  • routing_key: String or list of strings.
  • queue_name: Custom queue name (defaults to function name with _ replaced by MQ_DELIMITER).
  • exchange_type: Default is "topic".
  • auto_ack: Whether to automatically acknowledge messages.
  • prefetch_count: Number of messages to prefetch (default 1).
  • queue_arguments: Dictionary of arguments for queue declaration (default {"x-queue-type": "quorum"}).
  • full_message_object: If True, the decorated function receives the message object instead of unpacked values.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amqpstorm_flask-0.6.0.dev0.tar.gz (26.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amqpstorm_flask-0.6.0.dev0-py3-none-any.whl (21.5 kB view details)

Uploaded Python 3

File details

Details for the file amqpstorm_flask-0.6.0.dev0.tar.gz.

File metadata

  • Download URL: amqpstorm_flask-0.6.0.dev0.tar.gz
  • Upload date:
  • Size: 26.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amqpstorm_flask-0.6.0.dev0.tar.gz
Algorithm Hash digest
SHA256 71f14237b14176698abdca29a808e7ef15dbc2b908cc7bfdaab90a6551ac975b
MD5 5538d31073704cce67966743544850d0
BLAKE2b-256 6835097c720d296248dfffdf2e21067911487699068ccf61da6abdd80ce7c07c

See more details on using hashes here.

Provenance

The following attestation bundles were made for amqpstorm_flask-0.6.0.dev0.tar.gz:

Publisher: python-publish.yml on inuits/amqpstorm-flask

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file amqpstorm_flask-0.6.0.dev0-py3-none-any.whl.

File metadata

File hashes

Hashes for amqpstorm_flask-0.6.0.dev0-py3-none-any.whl
Algorithm Hash digest
SHA256 0612fededee3dfad77bb13196ba55d21a198083b68fc75453e187c177c50ec6e
MD5 eb5cdbf660eef4eab48c3f0b22525aa3
BLAKE2b-256 78216067d60a91a8dfdf3b3421a859abbab22d3345fae35f554d0d103a5461e0

See more details on using hashes here.

Provenance

The following attestation bundles were made for amqpstorm_flask-0.6.0.dev0-py3-none-any.whl:

Publisher: python-publish.yml on inuits/amqpstorm-flask

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page