Skip to main content

flask-redis-stream-pubsub

Project description

flask-redis-stream-pubsub

A message queue implemented based on Redis Stream

Advantages:

  1. High Speed: Redis stores data in memory, which offers very high read and write performance. This is particularly advantageous for applications that require low latency.
  2. Data Persistence: Redis can offer data persistence by saving data to disk, meaning that messages will not be lost even if Redis restarts.
  3. Wide Range of Use Cases: Redis can be used not only as a message queue but also as a cache, database, distributed lock, and more, reducing the complexity of the technology stack.
  4. Message Traceability: Supports persistent storage of messages, allowing consumers to trace back messages.
  5. Support for Multiple Consumers: Allows multiple consumers to compete for messages, speeding up the consumption rate.
  6. Blocking Read: Supports blocking reads, ensuring there is no risk of missing messages.
  7. Message Acknowledgment Mechanism: Supports consumer acknowledgment mechanisms, ensuring that messages are consumed at least once.

Application Scenarios:

  1. Asynchronous Processing: Suitable for scenarios that require asynchronous processing, such as order handling and payment notifications.
  2. Load leveling (Peak Shaving and Valley Filling): During peak traffic periods, using a message queue can smooth out requests, preventing system overload.
  3. Data Broadcasting: Suitable for scenarios where data needs to be broadcast to multiple consumers.
  4. Off-peak and Flow Control: When the volume of data is too large, the message queue can allow for a certain amount of message accumulation, achieving off-peak processing.
  5. Eventual Consistency: In distributed systems, using a message queue can achieve eventual consistency.
  6. Simple Small to Medium-sized Projects: If the functionality is simple and the volume of access is not high, Redis Stream can be considered as a message queue.

⚙ Installation

pip install flask-redis-stream-pubsub

⚡ Quickstart

发送消息

from flask import Flask

from flask_redis_stream_pubsub.pubsub import Producer

app = Flask(__name__)
app.config.from_object("example.config")

if __name__ == '__main__':
    producer = Producer()

    producer.init_app(app)
    
    #发送一条消息
    msgid = producer.publish("hello_word", {'name': 'dog'})
    print(msgid)


    # 批量发送
    sess = producer.session
    sess.add("hello_word", {'name': 'cat1'})
    sess.add("hello_word", {'name': 'cat2'})
    sess.add("hello_word", {'name': 'cat3'})
    sess.add("hello_word", {'name': 'cat4'})
    sess.add("hello_word", {'name': 'cat5'})

    msgids = sess.publish()
    print(msgids)

订阅消息

import logging

from flask import Flask, current_app
from flask_redis_stream_pubsub.pubsub import Consumer, Msg

app = Flask(__name__)
app.config.from_object("example.config")
app.logger.setLevel(logging.INFO)

if __name__ == '__main__':
    cs = Consumer(__name__)


    @cs.subscribe("hello_word")
    def hello_word(msg: Msg):
        """ 业务代码没抛出异常, 就代表消费成功 """
        current_app.logger.info(msg)


    @cs.subscribe("hello_word_retry", retry_count=3, timeout=30)
    def hello_word_retry(msg: Msg):
        """ 重试3次, 每次间隔30秒 """
        current_app.logger.info(msg)
        raise RuntimeError("I will retry 3 times, with a 30 second interval between each attempt")


    @cs.subscribe("hello_word_cron", cron="*/5 * * * * *", retry_count=0)
    def hello_word_cron(msg: Msg):
        """ 每5秒执行一次, 不重试 """
        current_app.logger.info(msg)


    cs.init_app(app)
    cs.run("consumer:app")

example.config

PUBSUB_REDIS_URL = 'redis://:password@host:6379/0'
PUBSUB_REDIS_OPTION = {
    'group':'pubsub_g1', #消息分组 -> redis XGROUP groupname
    'workers':10, #消费进程数
    'retry_count':32, #默认重试次数
    'timeout_second':300, #默认超时时间,未来超时时间消费成功,将进行重试
    'block_second': 6, #读取消息的阻塞时间 -> xreadgroup block=6
    'read_count': 16, #一次最多读取的消息数量 -> xreadgroup read_count=16
}

See examples

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask_redis_stream_pubsub-0.0.8.tar.gz (10.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flask_redis_stream_pubsub-0.0.8-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file flask_redis_stream_pubsub-0.0.8.tar.gz.

File metadata

  • Download URL: flask_redis_stream_pubsub-0.0.8.tar.gz
  • Upload date:
  • Size: 10.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.9.6 Darwin/24.0.0

File hashes

Hashes for flask_redis_stream_pubsub-0.0.8.tar.gz
Algorithm Hash digest
SHA256 8862b53bcd3136cb61ed151eba80e0d924a70db0f5cfc405b67d6280a4dcefa3
MD5 a2df3d5dcc9e7fba30a9c2588d98e51a
BLAKE2b-256 e982d05f8eb1fe84192cf2c3e81eb02a097000a1cff4f6b239f5ececb105d1af

See more details on using hashes here.

File details

Details for the file flask_redis_stream_pubsub-0.0.8-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_redis_stream_pubsub-0.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 aac0da9d9eb03a2d0575473046ca954a397dc1a1ad358c611c40490637111c82
MD5 e875f1aaadefc21e1f7d7da5bea61217
BLAKE2b-256 119c833d0bbb17268d65cc60d0308ee1fee2d6d5440499d5ab0d1f31579239b6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page