amqpstorm library for Flask
Project description
AMQP Service for Flask Apps
A Flask extension for AMQPStorm that provides easy integration with RabbitMQ.
Features
- Automatic Reconnection: Uses
APSchedulerto monitor and reconnect to RabbitMQ if the connection drops. - Easy Publishing: Simple method to send JSON messages with retry logic.
- Decorator-based Consumers: Define message consumers using simple decorators.
- Health Checks: Built-in health check functionality.
- Configuration: Configure via Flask app config or environment variables.
Installation
pip install amqpstorm-flask
Configuration
The extension can be configured via Flask's app.config or environment variables.
| Config Key | Environment Variable | Default | Description |
|---|---|---|---|
MQ_URL |
MQ_URL |
None | AMQP connection URL (e.g. amqp://guest:guest@localhost:5672/%2f) |
MQ_EXCHANGE |
MQ_EXCHANGE |
None | Default exchange name |
AMQP_STORM_APSCHEDULER |
1 |
Use APScheduler for health checks and consumers (recommended) | |
FILTER_LOGS |
1 |
Filter out noisy connection logs | |
MQ_MAX_CONSUMER_IDLE_TIME |
300 |
Max idle time in seconds before reconnecting | |
MQ_DELIMITER |
. |
Delimiter used to generate queue names from function names | |
MQ_QUEUES |
None | Comma-separated list of enabled queues (if set, only these will start) |
Usage
Initialization
from flask import Flask
from amqpstorm_flask import RabbitMQ
app = Flask(__name__)
app.config["MQ_URL"] = "amqp://guest:guest@localhost:5672/%2f"
app.config["MQ_EXCHANGE"] = "my_exchange"
mq = RabbitMQ(app)
Alternatively, use the factory pattern:
mq = RabbitMQ()
# ... later ...
mq.init_app(app)
Standalone Usage (Without Flask)
If you are not using Flask or want to initialize it manually without the init_app helper:
from amqpstorm_flask import RabbitMQ
mq = RabbitMQ(
mq_url="amqp://guest:guest@localhost:5672/%2f",
mq_exchange="my_exchange"
)
# Manual start is required if init_app is not used
mq.start()
Sending Messages
mq.send(
body={"hello": "world"},
routing_key="events.user.created",
exchange_type="topic"
)
Consuming Messages
By default, the decorated function receives routing_key, body, and message_id.
@mq.queue(routing_key="events.user.#")
def handle_user_events(routing_key, body, message_id):
print(f"Received event {routing_key}: {body}")
To receive the full AMQPStorm message object:
@mq.queue(routing_key="events.user.#", full_message_object=True)
def handle_user_events(message):
print(f"Received body: {message.body}")
# Manual acknowledgment if auto_ack is False
# message.ack()
Health Check
You can use the check_health method to implement a health check endpoint:
@app.route("/health")
def health_check():
is_ok, message = mq.check_health()
if is_ok:
return {"status": "ok"}, 200
return {"status": "error", "message": message}, 503
Advanced Queue Configuration
The @mq.queue decorator supports several parameters:
routing_key: String or list of strings.queue_name: Custom queue name (defaults to function name with_replaced byMQ_DELIMITER).exchange_type: Default is"topic".auto_ack: Whether to automatically acknowledge messages.prefetch_count: Number of messages to prefetch (default1).queue_arguments: Dictionary of arguments for queue declaration (default{"x-queue-type": "quorum"}).full_message_object: IfTrue, the decorated function receives the message object instead of unpacked values.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amqpstorm_flask-0.6.0.dev0.tar.gz.
File metadata
- Download URL: amqpstorm_flask-0.6.0.dev0.tar.gz
- Upload date:
- Size: 26.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
71f14237b14176698abdca29a808e7ef15dbc2b908cc7bfdaab90a6551ac975b
|
|
| MD5 |
5538d31073704cce67966743544850d0
|
|
| BLAKE2b-256 |
6835097c720d296248dfffdf2e21067911487699068ccf61da6abdd80ce7c07c
|
Provenance
The following attestation bundles were made for amqpstorm_flask-0.6.0.dev0.tar.gz:
Publisher:
python-publish.yml on inuits/amqpstorm-flask
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
amqpstorm_flask-0.6.0.dev0.tar.gz -
Subject digest:
71f14237b14176698abdca29a808e7ef15dbc2b908cc7bfdaab90a6551ac975b - Sigstore transparency entry: 1202817492
- Sigstore integration time:
-
Permalink:
inuits/amqpstorm-flask@dfb6364173b2fff5bf4ad13694724f1e42313ce6 -
Branch / Tag:
refs/tags/v0.6.0-dev - Owner: https://github.com/inuits
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@dfb6364173b2fff5bf4ad13694724f1e42313ce6 -
Trigger Event:
release
-
Statement type:
File details
Details for the file amqpstorm_flask-0.6.0.dev0-py3-none-any.whl.
File metadata
- Download URL: amqpstorm_flask-0.6.0.dev0-py3-none-any.whl
- Upload date:
- Size: 21.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0612fededee3dfad77bb13196ba55d21a198083b68fc75453e187c177c50ec6e
|
|
| MD5 |
eb5cdbf660eef4eab48c3f0b22525aa3
|
|
| BLAKE2b-256 |
78216067d60a91a8dfdf3b3421a859abbab22d3345fae35f554d0d103a5461e0
|
Provenance
The following attestation bundles were made for amqpstorm_flask-0.6.0.dev0-py3-none-any.whl:
Publisher:
python-publish.yml on inuits/amqpstorm-flask
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
amqpstorm_flask-0.6.0.dev0-py3-none-any.whl -
Subject digest:
0612fededee3dfad77bb13196ba55d21a198083b68fc75453e187c177c50ec6e - Sigstore transparency entry: 1202817494
- Sigstore integration time:
-
Permalink:
inuits/amqpstorm-flask@dfb6364173b2fff5bf4ad13694724f1e42313ce6 -
Branch / Tag:
refs/tags/v0.6.0-dev - Owner: https://github.com/inuits
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@dfb6364173b2fff5bf4ad13694724f1e42313ce6 -
Trigger Event:
release
-
Statement type: