flask-redis-stream-pubsub
Project description
flask-redis-stream-pubsub
A message queue implemented based on Redis Stream
Advantages:
- High Speed: Redis stores data in memory, which offers very high read and write performance. This is particularly advantageous for applications that require low latency.
- Data Persistence: Redis can offer data persistence by saving data to disk, meaning that messages will not be lost even if Redis restarts.
- Wide Range of Use Cases: Redis can be used not only as a message queue but also as a cache, database, distributed lock, and more, reducing the complexity of the technology stack.
- Message Traceability: Supports persistent storage of messages, allowing consumers to trace back messages.
- Support for Multiple Consumers: Allows multiple consumers to compete for messages, speeding up the consumption rate.
- Blocking Read: Supports blocking reads, ensuring there is no risk of missing messages.
- Message Acknowledgment Mechanism: Supports consumer acknowledgment mechanisms, ensuring that messages are consumed at least once.
Application Scenarios:
- Asynchronous Processing: Suitable for scenarios that require asynchronous processing, such as order handling and payment notifications.
- Load leveling (Peak Shaving and Valley Filling): During peak traffic periods, using a message queue can smooth out requests, preventing system overload.
- Data Broadcasting: Suitable for scenarios where data needs to be broadcast to multiple consumers.
- Off-peak and Flow Control: When the volume of data is too large, the message queue can allow for a certain amount of message accumulation, achieving off-peak processing.
- Eventual Consistency: In distributed systems, using a message queue can achieve eventual consistency.
- Simple Small to Medium-sized Projects: If the functionality is simple and the volume of access is not high, Redis Stream can be considered as a message queue.
⚙ Installation
pip install flask-redis-stream-pubsub
⚡ Quickstart
发送消息
from flask import Flask
from flask_redis_stream_pubsub.pubsub import Producer
app = Flask(__name__)
app.config.from_object("example.config")
if __name__ == '__main__':
producer = Producer()
producer.init_app(app)
#发送一条消息
msgid = producer.publish("hello_word", {'name': 'dog'})
print(msgid)
# 批量发送
sess = producer.session
sess.add("hello_word", {'name': 'cat1'})
sess.add("hello_word", {'name': 'cat2'})
sess.add("hello_word", {'name': 'cat3'})
sess.add("hello_word", {'name': 'cat4'})
sess.add("hello_word", {'name': 'cat5'})
msgids = sess.commit()
print(msgids)
订阅消息
import logging
from flask import Flask, current_app
from flask_redis_stream_pubsub.pubsub import Consumer, Msg
app = Flask(__name__)
app.config.from_object("example.config")
app.logger.setLevel(logging.INFO)
if __name__ == '__main__':
cs = Consumer(__name__)
@cs.subscribe("hello_word")
def hello_word(msg: Msg):
""" 业务代码没抛出异常, 就代表消费成功 """
current_app.logger.info(msg)
@cs.subscribe("hello_word_retry", retry_count=3, timeout=30)
def hello_word_retry(msg: Msg):
""" 重试3次, 每次间隔30秒 """
current_app.logger.info(msg)
raise RuntimeError("I will retry 3 times, with a 30 second interval between each attempt")
@cs.subscribe("hello_word_cron", cron="*/5 * * * * *", retry_count=0)
def hello_word_cron(msg: Msg):
""" 每5秒执行一次, 不重试 """
current_app.logger.info(msg)
cs.init_app(app)
cs.run("consumer:app")
example.config
PUBSUB_REDIS_URL = 'redis://:password@host:6379/0'
PUBSUB_REDIS_OPTION = {
'group':'pubsub_g1', #消息分组 -> redis XGROUP groupname
'processes': 4, # 消费进程数
'threads': 10, # 一个进程开启多少线程
'retry_count':32, #默认重试次数
'timeout_second':300, #默认超时时间,未来超时时间消费成功,将进行重试
'block_second': 6, #读取消息的阻塞时间 -> xreadgroup block=6
'read_count': 16, #一次最多读取的消息数量 -> xreadgroup read_count=16
}
See examples
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flask_redis_stream_pubsub-0.1.2.tar.gz.
File metadata
- Download URL: flask_redis_stream_pubsub-0.1.2.tar.gz
- Upload date:
- Size: 13.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.9.6 Darwin/24.0.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd4a59d2a585f318e1933bbf110b5fc8cd362bc7103e13a608683077f6f511be
|
|
| MD5 |
9584bb357ae99bbbff1b54679c50c54a
|
|
| BLAKE2b-256 |
2abe6354c51b8bc38226e6e43cb262b488e631ff3c097356bad0e6279c765144
|
File details
Details for the file flask_redis_stream_pubsub-0.1.2-py3-none-any.whl.
File metadata
- Download URL: flask_redis_stream_pubsub-0.1.2-py3-none-any.whl
- Upload date:
- Size: 13.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.9.6 Darwin/24.0.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1f22373606a496f92f990bbeb6f3052ea693e6943ca7aeeb20b42b85965e28eb
|
|
| MD5 |
83ce4c6e2e615fc9bff87af55a6992d7
|
|
| BLAKE2b-256 |
1b46ed9e011af03e522edf890c07e80378c9c3c22bc4a661ea14ac127bd6e43c
|