Skip to main content

flask-redis-stream-pubsub

Project description

flask-redis-stream-pubsub

A message queue implemented based on Redis Stream

Advantages:

  1. High Speed: Redis stores data in memory, which offers very high read and write performance. This is particularly advantageous for applications that require low latency.
  2. Data Persistence: Redis can offer data persistence by saving data to disk, meaning that messages will not be lost even if Redis restarts.
  3. Wide Range of Use Cases: Redis can be used not only as a message queue but also as a cache, database, distributed lock, and more, reducing the complexity of the technology stack.
  4. Message Traceability: Supports persistent storage of messages, allowing consumers to trace back messages.
  5. Support for Multiple Consumers: Allows multiple consumers to compete for messages, speeding up the consumption rate.
  6. Blocking Read: Supports blocking reads, ensuring there is no risk of missing messages.
  7. Message Acknowledgment Mechanism: Supports consumer acknowledgment mechanisms, ensuring that messages are consumed at least once.

Application Scenarios:

  1. Asynchronous Processing: Suitable for scenarios that require asynchronous processing, such as order handling and payment notifications.
  2. Load leveling (Peak Shaving and Valley Filling): During peak traffic periods, using a message queue can smooth out requests, preventing system overload.
  3. Data Broadcasting: Suitable for scenarios where data needs to be broadcast to multiple consumers.
  4. Off-peak and Flow Control: When the volume of data is too large, the message queue can allow for a certain amount of message accumulation, achieving off-peak processing.
  5. Eventual Consistency: In distributed systems, using a message queue can achieve eventual consistency.
  6. Simple Small to Medium-sized Projects: If the functionality is simple and the volume of access is not high, Redis Stream can be considered as a message queue.

⚙ Installation

pip install flask-redis-stream-pubsub

⚡ Quickstart

发送消息

from flask import Flask

from flask_redis_stream_pubsub.pubsub import Producer

app = Flask(__name__)
app.config.from_object("example.config")

if __name__ == '__main__':
    producer = Producer()

    producer.init_app(app)
    
    #发送一条消息
    msgid = producer.publish("hello_word", {'name': 'dog'})
    print(msgid)


    # 批量发送
    sess = producer.session
    sess.add("hello_word", {'name': 'cat1'})
    sess.add("hello_word", {'name': 'cat2'})
    sess.add("hello_word", {'name': 'cat3'})
    sess.add("hello_word", {'name': 'cat4'})
    sess.add("hello_word", {'name': 'cat5'})

    msgids = sess.commit()
    print(msgids)

订阅消息

import logging

from flask import Flask, current_app
from flask_redis_stream_pubsub.pubsub import Consumer, Msg

app = Flask(__name__)
app.config.from_object("example.config")
app.logger.setLevel(logging.INFO)

if __name__ == '__main__':
    cs = Consumer(__name__)


    @cs.subscribe("hello_word")
    def hello_word(msg: Msg):
        """ 业务代码没抛出异常, 就代表消费成功 """
        current_app.logger.info(msg)


    @cs.subscribe("hello_word_retry", retry_count=3, timeout=30)
    def hello_word_retry(msg: Msg):
        """ 重试3次, 每次间隔30秒 """
        current_app.logger.info(msg)
        raise RuntimeError("I will retry 3 times, with a 30 second interval between each attempt")


    @cs.subscribe("hello_word_cron", cron="*/5 * * * * *", retry_count=0)
    def hello_word_cron(msg: Msg):
        """ 每5秒执行一次, 不重试 """
        current_app.logger.info(msg)


    cs.init_app(app)
    cs.run("consumer:app")

example.config

PUBSUB_REDIS_URL = 'redis://:password@host:6379/0'
PUBSUB_REDIS_OPTION = {
    'group':'pubsub_g1', #消息分组 -> redis XGROUP groupname
    'processes': 4,  # 消费进程数
    'threads': 10,  # 一个进程开启多少线程
    'retry_count':32, #默认重试次数
    'timeout_second':300, #默认超时时间,未来超时时间消费成功,将进行重试
    'block_second': 6, #读取消息的阻塞时间 -> xreadgroup block=6
    'read_count': 16, #一次最多读取的消息数量 -> xreadgroup read_count=16
}

See examples

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask_redis_stream_pubsub-0.1.2.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flask_redis_stream_pubsub-0.1.2-py3-none-any.whl (13.7 kB view details)

Uploaded Python 3

File details

Details for the file flask_redis_stream_pubsub-0.1.2.tar.gz.

File metadata

  • Download URL: flask_redis_stream_pubsub-0.1.2.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.9.6 Darwin/24.0.0

File hashes

Hashes for flask_redis_stream_pubsub-0.1.2.tar.gz
Algorithm Hash digest
SHA256 bd4a59d2a585f318e1933bbf110b5fc8cd362bc7103e13a608683077f6f511be
MD5 9584bb357ae99bbbff1b54679c50c54a
BLAKE2b-256 2abe6354c51b8bc38226e6e43cb262b488e631ff3c097356bad0e6279c765144

See more details on using hashes here.

File details

Details for the file flask_redis_stream_pubsub-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_redis_stream_pubsub-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 1f22373606a496f92f990bbeb6f3052ea693e6943ca7aeeb20b42b85965e28eb
MD5 83ce4c6e2e615fc9bff87af55a6992d7
BLAKE2b-256 1b46ed9e011af03e522edf890c07e80378c9c3c22bc4a661ea14ac127bd6e43c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page