Skip to main content

flask-redis-stream-pubsub

Project description

flask-redis-stream-pubsub

A message queue implemented based on Redis Stream

Advantages:

  1. High Speed: Redis stores data in memory, which offers very high read and write performance. This is particularly advantageous for applications that require low latency.
  2. Data Persistence: Redis can offer data persistence by saving data to disk, meaning that messages will not be lost even if Redis restarts.
  3. Wide Range of Use Cases: Redis can be used not only as a message queue but also as a cache, database, distributed lock, and more, reducing the complexity of the technology stack.
  4. Message Traceability: Supports persistent storage of messages, allowing consumers to trace back messages.
  5. Support for Multiple Consumers: Allows multiple consumers to compete for messages, speeding up the consumption rate.
  6. Blocking Read: Supports blocking reads, ensuring there is no risk of missing messages.
  7. Message Acknowledgment Mechanism: Supports consumer acknowledgment mechanisms, ensuring that messages are consumed at least once.

Application Scenarios:

  1. Asynchronous Processing: Suitable for scenarios that require asynchronous processing, such as order handling and payment notifications.
  2. Load leveling (Peak Shaving and Valley Filling): During peak traffic periods, using a message queue can smooth out requests, preventing system overload.
  3. Data Broadcasting: Suitable for scenarios where data needs to be broadcast to multiple consumers.
  4. Off-peak and Flow Control: When the volume of data is too large, the message queue can allow for a certain amount of message accumulation, achieving off-peak processing.
  5. Eventual Consistency: In distributed systems, using a message queue can achieve eventual consistency.
  6. Simple Small to Medium-sized Projects: If the functionality is simple and the volume of access is not high, Redis Stream can be considered as a message queue.

⚙ Installation

pip install flask-redis-stream-pubsub

⚡ Quickstart

发送消息

from flask import Flask

from flask_redis_stream_pubsub.pubsub import Producer

app = Flask(__name__)
app.config.from_object("example.config")

if __name__ == '__main__':
    producer = Producer()

    producer.init_app(app)
    
    #发送一条消息
    msgid = producer.publish("hello_word", {'name': 'dog'})
    print(msgid)


    # 批量发送
    sess = producer.session
    sess.add("hello_word", {'name': 'cat1'})
    sess.add("hello_word", {'name': 'cat2'})
    sess.add("hello_word", {'name': 'cat3'})
    sess.add("hello_word", {'name': 'cat4'})
    sess.add("hello_word", {'name': 'cat5'})

    msgids = sess.publish()
    print(msgids)

订阅消息

import logging

from flask import Flask, current_app
from flask_redis_stream_pubsub.pubsub import Consumer, Msg

app = Flask(__name__)
app.config.from_object("example.config")
app.logger.setLevel(logging.INFO)

if __name__ == '__main__':
    cs = Consumer(__name__)


    @cs.subscribe("hello_word")
    def hello_word(msg: Msg):
        """ 业务代码没抛出异常, 就代表消费成功 """
        current_app.logger.info(msg)


    @cs.subscribe("hello_word_retry", retry_count=3, timeout=30)
    def hello_word_retry(msg: Msg):
        """ 重试3次, 每次间隔30秒 """
        current_app.logger.info(msg)
        raise RuntimeError("I will retry 3 times, with a 30 second interval between each attempt")


    @cs.subscribe("hello_word_cron", cron="*/5 * * * * *", retry_count=0)
    def hello_word_cron(msg: Msg):
        """ 每5秒执行一次, 不重试 """
        current_app.logger.info(msg)


    cs.init_app(app)
    cs.run("consumer:app")

example.config

PUBSUB_REDIS_URL = 'redis://:password@host:6379/0'
PUBSUB_REDIS_OPTION = {
    'group':'pubsub_g1', #消息分组 -> redis XGROUP groupname
    'workers':10, #消费进程数
    'retry_count':32, #默认重试次数
    'timeout_second':300, #默认超时时间,未来超时时间消费成功,将进行重试
    'block_second': 6, #读取消息的阻塞时间 -> xreadgroup block=6
    'read_count': 16, #一次最多读取的消息数量 -> xreadgroup read_count=16
}

See examples

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask_redis_stream_pubsub-0.0.4.tar.gz (10.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flask_redis_stream_pubsub-0.0.4-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file flask_redis_stream_pubsub-0.0.4.tar.gz.

File metadata

  • Download URL: flask_redis_stream_pubsub-0.0.4.tar.gz
  • Upload date:
  • Size: 10.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.9.6 Darwin/24.0.0

File hashes

Hashes for flask_redis_stream_pubsub-0.0.4.tar.gz
Algorithm Hash digest
SHA256 511f3c9895ec080100d73e61904cfd7af02166a06c2243d63b391eb2ffc273b8
MD5 eccf38339c094707eaaea116f4e76821
BLAKE2b-256 441f9012568f76c3adb7c27a17fb415d9a72d7950d42c2c7a79d35beb777f8b3

See more details on using hashes here.

File details

Details for the file flask_redis_stream_pubsub-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_redis_stream_pubsub-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 77542de15cd754e98fc55256a61407baed25130eac2ff8967fb00989f12bf2ac
MD5 ccf7b8c34d06bc2f05a95276473f31fb
BLAKE2b-256 3e87e1ff005b627243c637139d4ae1d9121caba0a5b5fb02e9264f2c3fac9a8e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page