Skip to main content

Lightweight message queue with pluggable drivers (memory/Redis), persistence, priorities, retries and a dead-letter queue.

Project description

MagicQueue (Python)

CI

轻量、可插拔的消息队列库 —— Go 版 MagicQueue 的 Python 移植,API 与语义对齐。

特性

  • 可插拔驱动:内存(开发/测试)与 Redis(生产/分布式),可继承 Driver 自定义。
  • 三档消息优先级:高 / 普通 / 低,同档 FIFO。
  • 批量入队 enqueue_batch:Redis 用 pipeline 单次往返;持久化用 sqlite 批量写;失败整体回滚。
  • Redis 多 key BLPOP:按优先级阻塞消费,无 LLEN 轮询、无空转。
  • 持久化与崩溃恢复:基于标准库 sqlite3,重启自动重投未确认消息。
  • 自动重试:指数退避 + 抖动,异步调度(threading.Timer)不阻塞 worker。
  • 死信队列:重试耗尽的消息可转入死信队列而非丢弃。
  • 优雅关闭:取消信号 + 等待在途任务完成(可用作上下文管理器)。
  • 处理器异常隔离:单条消息抛异常不会拖垮 worker,可注册回调。
  • 可注入 Logger:实现 log(msg) 接入任意日志框架。

交付语义为 at-least-once(至少一次):消息可能被重复投递,处理器应保证幂等

安装

pip install -e .            # 仅内存驱动
pip install -e ".[redis]"   # 含 Redis 驱动

需要 Python 3.9+。持久化使用标准库 sqlite3,无额外依赖。

快速开始

from magicqueue import MQueue, Payload, JobResult

q = MQueue("svc").use_memory()
q.set_handler("email", "notify", lambda p: JobResult.ok())  # 返回 fail(..) 会触发重试
q.start_workers(4)

q.enqueue(Payload("email", group="notify"))

q.stop()  # 优雅关闭;也可用 with MQueue(...) as q: ...

处理器可以是普通可调用对象,也可以是实现 execute(payload) -> JobResult 的对象。

性能基准(内存驱动横向对比)

同一台机器、统一方法下三语言版本的吞吐对比(内存驱动,排除 Redis/磁盘干扰)。

测试环境:Intel Xeon Platinum 8375C @ 2.90GHz(2 vCPU)/ 7.8 GiB RAM / Ubuntu 22.04 / Go 1.22、Rust 1.83(release)、CPython 3.12。 方法:每项 N = 200,000 消息,取 3 次运行的中位数;批量大小 1000;端到端为 4 workers + 空 handler(pre-enqueue 后计时至全部处理完)。

指标 Go Rust Python
单条入队 enqueue(条/秒) 663,013 1,293,833 97,660
批量入队 enqueue_batch(条/秒,batch=1000) 727,392 1,109,984 98,832
端到端处理(条/秒,4 workers) 404,835 1,075,262 45,624

说明:Go/Rust 为编译型、Python 为解释型且受 GIL 限制,量级差异属预期。数字为单台 2 vCPU 云主机上的近似值,仅用于版本间相对比较,绝对值会随硬件波动。

复现:Go go run ./bench、Rust cargo run --release --example bench、Python python bench.py

消息优先级

Payload.priority(int)按符号映射三档,默认 0

priority 档位 子队列后缀
> 0 :p2
== 0 普通 :p1
< 0 :p0

同一档内保持 FIFO。消费顺序:高 → 普通 → 低

q.enqueue(Payload("task", priority=10))   # 高
q.enqueue(Payload("task"))                # 普通
q.enqueue(Payload("task", priority=-1))   # 低

实现:每个逻辑队列拆成三个子队列(key 加后缀 :p2/:p1/:p0)。 Redis 消费用多 key BLPOP key:p2 key:p1 key:p0 timeout——Redis 按参数顺序返回首个非空队列的元素, 一次调用既实现优先级又是阻塞消费,取代了 LLEN 轮询 + LPOP 的空转与竞态

批量入队

ids = q.enqueue_batch([
    Payload("task", priority=10),
    Payload("task"),
    Payload("task", priority=-1),
])
  • Redis 用 pipeline 单次往返完成所有 RPUSH
  • 持久化消息用 sqlite executemany 在一个事务内落盘。
  • 任一条校验失败则整体不入队;推送失败会回滚已持久化的条目。
  • 自定义驱动可重写 push_batch 优化,否则自动退化为逐条 push

持久化

q = MQueue("svc").use_memory().use_persistence("./data/mq.db")
q.enqueue(Payload("job", is_persist=True))

启用后,start_workers 会先把尚未确认的消息重投回驱动再启动消费者。 成功或永久失败(进入死信队列)后才会从持久层删除。

持久层后端:sqlite(默认)或 LevelDB

持久层通过 Store 抽象可插拔,内置两种实现,语义一致:

方法 后端 依赖 与其他语言版本对齐
use_persistence(path) 标准库 sqlite3 无额外依赖(默认)
use_leveldb(path) LevelDB(plyvel pip install "magicqueue[leveldb]" Go 版 LevelDB / Rust 版 sled
# 与 Go/Rust 版一致的 LevelDB 持久化
q = MQueue("svc").use_memory().use_leveldb("./data/mq.leveldb")
q.enqueue(Payload("job", is_persist=True))

plyvel 在 Linux/macOS 提供预编译 wheel,通常无需系统级 libleveldb。 也可实现自定义 Store 并用 use_store(store) 注入。

Redis

import redis
from magicqueue import MQueue

client = redis.Redis.from_url("redis://127.0.0.1:6379")
q = MQueue("svc").use_redis(client)        # 复用调用方的 client
# 或:q = MQueue("svc").use_redis_url("redis://127.0.0.1:6379")

重试与死信队列

from magicqueue import Options

q.with_options(Options(
    retry_base_delay=0.5,
    retry_max_delay=30.0,
    enable_dead_letter=True,
))
q.enqueue(Payload("job", max_retry=5))

退避时长为 base * 2^(retry-1),封顶 retry_max_delay,并叠加 [0.5x, 1.0x] 抖动。

示例

路径 说明
examples/basic_memory.py 内存队列最简上手
examples/multiple_handlers.py 多 (topic, group) 处理器
examples/batch_priority.py 批量入队 + 优先级排序
examples/retry_deadletter.py 重试与死信队列
examples/graceful_shutdown.py 优雅关闭(SIGINT/SIGTERM)
examples/persistence.py sqlite 持久化与崩溃恢复
examples/redis_example.py Redis 驱动(需本地 Redis)

运行:python examples/basic_memory.py

架构

enqueue / enqueue_batch
        │  (JSON) + 可选 sqlite 持久化
        ▼
   Driver.push  ── 内存 / Redis(按优先级分 :p2/:p1/:p0 子队列)
        ▲                       │
        │ requeue(重试)          │ bpop(多 key BLPOP,高→普通→低)
        │                       ▼
   死信队列 ◄── 重试耗尽    consume 线程 ──► jobs 队列 ──► worker 池 ──► Handler
                                                              │
                                                          成功→ack(删除持久层)
                                                          失败→重试/死信

开发

pip install -e ".[dev]"
ruff check src tests examples
pytest -q
python examples/basic_memory.py

License

MIT,见 LICENSE。Author: JackyZhang8。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

magicqueue-0.1.1.tar.gz (21.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

magicqueue-0.1.1-py3-none-any.whl (20.8 kB view details)

Uploaded Python 3

File details

Details for the file magicqueue-0.1.1.tar.gz.

File metadata

  • Download URL: magicqueue-0.1.1.tar.gz
  • Upload date:
  • Size: 21.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for magicqueue-0.1.1.tar.gz
Algorithm Hash digest
SHA256 f765f09e1b36eedaba204fa85a6f7a0826c1ba06938c768bd0e0835796eb4943
MD5 91a34ed45c9a5f0193b9d8446a6233fe
BLAKE2b-256 44386f20664063fd1f868e9a24f1132f1722f36f828909332ad093fc25bbc600

See more details on using hashes here.

File details

Details for the file magicqueue-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: magicqueue-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 20.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for magicqueue-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 45dc4bea4382f42e4e8877e5a63e07e1f625b1815198eb20c1374d0d82a18b08
MD5 497d3b1f20b2606e71ad571b10420b08
BLAKE2b-256 2f1df6a35334ba80e40d48a26845506679830daea8618b0b9b23c1104965f5df

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page