Lightweight message queue with pluggable drivers (memory/Redis), persistence, priorities, retries and a dead-letter queue.
Project description
MagicQueue (Python)
轻量、可插拔的消息队列库 —— Go 版 MagicQueue 的 Python 移植,API 与语义对齐。
特性
- 可插拔驱动:内存(开发/测试)与 Redis(生产/分布式),可继承
Driver自定义。 - 三档消息优先级:高 / 普通 / 低,同档 FIFO。
- 批量入队
enqueue_batch:Redis 用 pipeline 单次往返;持久化用 sqlite 批量写;失败整体回滚。 - Redis 多 key
BLPOP:按优先级阻塞消费,无LLEN轮询、无空转。 - 持久化与崩溃恢复:基于标准库
sqlite3,重启自动重投未确认消息。 - 自动重试:指数退避 + 抖动,异步调度(
threading.Timer)不阻塞 worker。 - 死信队列:重试耗尽的消息可转入死信队列而非丢弃。
- 优雅关闭:取消信号 + 等待在途任务完成(可用作上下文管理器)。
- 处理器异常隔离:单条消息抛异常不会拖垮 worker,可注册回调。
- 可注入 Logger:实现
log(msg)接入任意日志框架。
交付语义为 at-least-once(至少一次):消息可能被重复投递,处理器应保证幂等。
安装
pip install -e . # 仅内存驱动
pip install -e ".[redis]" # 含 Redis 驱动
需要 Python 3.9+。持久化使用标准库 sqlite3,无额外依赖。
快速开始
from magicqueue import MQueue, Payload, JobResult
q = MQueue("svc").use_memory()
q.set_handler("email", "notify", lambda p: JobResult.ok()) # 返回 fail(..) 会触发重试
q.start_workers(4)
q.enqueue(Payload("email", group="notify"))
q.stop() # 优雅关闭;也可用 with MQueue(...) as q: ...
处理器可以是普通可调用对象,也可以是实现 execute(payload) -> JobResult 的对象。
性能基准(内存驱动横向对比)
同一台机器、统一方法下三语言版本的吞吐对比(内存驱动,排除 Redis/磁盘干扰)。
测试环境:Intel Xeon Platinum 8375C @ 2.90GHz(2 vCPU)/ 7.8 GiB RAM / Ubuntu 22.04 / Go 1.22、Rust 1.83(release)、CPython 3.12。 方法:每项 N = 200,000 消息,取 3 次运行的中位数;批量大小 1000;端到端为 4 workers + 空 handler(pre-enqueue 后计时至全部处理完)。
| 指标 | Go | Rust | Python |
|---|---|---|---|
单条入队 enqueue(条/秒) |
663,013 | 1,293,833 | 97,660 |
批量入队 enqueue_batch(条/秒,batch=1000) |
727,392 | 1,109,984 | 98,832 |
| 端到端处理(条/秒,4 workers) | 404,835 | 1,075,262 | 45,624 |
说明:Go/Rust 为编译型、Python 为解释型且受 GIL 限制,量级差异属预期。数字为单台 2 vCPU 云主机上的近似值,仅用于版本间相对比较,绝对值会随硬件波动。
复现:Go
go run ./bench、Rustcargo run --release --example bench、Pythonpython bench.py。
消息优先级
Payload.priority(int)按符号映射三档,默认 0:
| priority | 档位 | 子队列后缀 |
|---|---|---|
> 0 |
高 | :p2 |
== 0 |
普通 | :p1 |
< 0 |
低 | :p0 |
同一档内保持 FIFO。消费顺序:高 → 普通 → 低。
q.enqueue(Payload("task", priority=10)) # 高
q.enqueue(Payload("task")) # 普通
q.enqueue(Payload("task", priority=-1)) # 低
实现:每个逻辑队列拆成三个子队列(key 加后缀 :p2/:p1/:p0)。
Redis 消费用多 key BLPOP key:p2 key:p1 key:p0 timeout——Redis 按参数顺序返回首个非空队列的元素,
一次调用既实现优先级又是阻塞消费,取代了 LLEN 轮询 + LPOP 的空转与竞态。
批量入队
ids = q.enqueue_batch([
Payload("task", priority=10),
Payload("task"),
Payload("task", priority=-1),
])
- Redis 用 pipeline 单次往返完成所有
RPUSH。 - 持久化消息用 sqlite
executemany在一个事务内落盘。 - 任一条校验失败则整体不入队;推送失败会回滚已持久化的条目。
- 自定义驱动可重写
push_batch优化,否则自动退化为逐条push。
持久化
q = MQueue("svc").use_memory().use_persistence("./data/mq.db")
q.enqueue(Payload("job", is_persist=True))
启用后,start_workers 会先把尚未确认的消息重投回驱动再启动消费者。
成功或永久失败(进入死信队列)后才会从持久层删除。
持久层后端:sqlite(默认)或 LevelDB
持久层通过 Store 抽象可插拔,内置两种实现,语义一致:
| 方法 | 后端 | 依赖 | 与其他语言版本对齐 |
|---|---|---|---|
use_persistence(path) |
标准库 sqlite3 |
无额外依赖(默认) | — |
use_leveldb(path) |
LevelDB(plyvel) |
pip install "magicqueue[leveldb]" |
Go 版 LevelDB / Rust 版 sled |
# 与 Go/Rust 版一致的 LevelDB 持久化
q = MQueue("svc").use_memory().use_leveldb("./data/mq.leveldb")
q.enqueue(Payload("job", is_persist=True))
plyvel在 Linux/macOS 提供预编译 wheel,通常无需系统级libleveldb。 也可实现自定义Store并用use_store(store)注入。
Redis
import redis
from magicqueue import MQueue
client = redis.Redis.from_url("redis://127.0.0.1:6379")
q = MQueue("svc").use_redis(client) # 复用调用方的 client
# 或:q = MQueue("svc").use_redis_url("redis://127.0.0.1:6379")
重试与死信队列
from magicqueue import Options
q.with_options(Options(
retry_base_delay=0.5,
retry_max_delay=30.0,
enable_dead_letter=True,
))
q.enqueue(Payload("job", max_retry=5))
退避时长为 base * 2^(retry-1),封顶 retry_max_delay,并叠加 [0.5x, 1.0x] 抖动。
示例
| 路径 | 说明 |
|---|---|
examples/basic_memory.py |
内存队列最简上手 |
examples/multiple_handlers.py |
多 (topic, group) 处理器 |
examples/batch_priority.py |
批量入队 + 优先级排序 |
examples/retry_deadletter.py |
重试与死信队列 |
examples/graceful_shutdown.py |
优雅关闭(SIGINT/SIGTERM) |
examples/persistence.py |
sqlite 持久化与崩溃恢复 |
examples/redis_example.py |
Redis 驱动(需本地 Redis) |
运行:python examples/basic_memory.py
架构
enqueue / enqueue_batch
│ (JSON) + 可选 sqlite 持久化
▼
Driver.push ── 内存 / Redis(按优先级分 :p2/:p1/:p0 子队列)
▲ │
│ requeue(重试) │ bpop(多 key BLPOP,高→普通→低)
│ ▼
死信队列 ◄── 重试耗尽 consume 线程 ──► jobs 队列 ──► worker 池 ──► Handler
│
成功→ack(删除持久层)
失败→重试/死信
开发
pip install -e ".[dev]"
ruff check src tests examples
pytest -q
python examples/basic_memory.py
License
MIT,见 LICENSE。Author: JackyZhang8。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file magicqueue-0.1.1.tar.gz.
File metadata
- Download URL: magicqueue-0.1.1.tar.gz
- Upload date:
- Size: 21.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f765f09e1b36eedaba204fa85a6f7a0826c1ba06938c768bd0e0835796eb4943
|
|
| MD5 |
91a34ed45c9a5f0193b9d8446a6233fe
|
|
| BLAKE2b-256 |
44386f20664063fd1f868e9a24f1132f1722f36f828909332ad093fc25bbc600
|
File details
Details for the file magicqueue-0.1.1-py3-none-any.whl.
File metadata
- Download URL: magicqueue-0.1.1-py3-none-any.whl
- Upload date:
- Size: 20.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
45dc4bea4382f42e4e8877e5a63e07e1f625b1815198eb20c1374d0d82a18b08
|
|
| MD5 |
497d3b1f20b2606e71ad571b10420b08
|
|
| BLAKE2b-256 |
2f1df6a35334ba80e40d48a26845506679830daea8618b0b9b23c1104965f5df
|