A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems.
Project description
SQLite 任务队列系统
这个项目是一个基于 SQLite 的高性能任务队列系统,使用 Rust 和 Python 混合编程实现。系统提供了任务调度、消息队列管理和任务生命周期管理的完整解决方案。
主要特性
-
🚀 高性能:使用 Rust 实现核心操作,确保高性能
-
📚 多分片支持:支持数据库分片,提高并发处理能力
-
⏱️ 智能调度:提供接收调度器、任务调度器和清理调度器
-
🔒 任务生命周期管理:支持任务状态跟踪、重试机制和过期处理
-
📊 监控支持:内置资源监控功能
-
🧩 任务挂载系统:通过装饰器轻松添加新任务
项目结构
src/
├── core/ # Rust 核心实现
│ ├── lib.rs # 主模块
│ ├── queue_operation.rs # 队列操作实现
│ └── task_mounter.rs # 任务挂载实现
│
├── queue_sqlite/ # Python 实现
│ ├── constant/ # 常量定义(消息优先级、状态、类型)
│ ├── core/ # 核心接口
│ ├── model/ # 数据模型
│ ├── queue_operation/ # 队列操作封装
│ ├── scheduler/ # 调度器实现
│ └── task_cycle/ # 任务生命周期管理
│
tests/ # 测试代码
核心组件
-
消息模型 (MessageItem)
定义了任务消息的数据结构,包含:
-
消息ID、类型、状态
-
内容、创建时间、更新时间
-
优先级、来源、目标
-
重试次数、过期时间
-
标签和元数据
-
-
队列操作 (QueueOperation)
提供对 SQLite 队列的基本操作:
-
初始化数据库
-
入队和出队操作
-
获取队列长度和已完成消息
-
更新状态和结果
-
删除消息和清理过期消息
-
-
调度系统
包含三个主要调度器:
接收调度器 (ReceiveScheduler)
-
处理消息发送
-
管理回调函数
-
接收已完成消息
任务调度器 (TaskScheduler)
-
从队列中获取任务
-
调用任务函数
-
更新任务状态和结果
清理调度器 (CleanupScheduler)
-
清理过期消息
-
删除旧消息(默认清理7天前的消息)
-
-
任务挂载系统 (TaskMounter) 提供装饰器挂载任务函数:
@TaskMounter.task(meta={"task_name": "example"}) def example_task(message_item: MessageItem): # 任务逻辑 return result
使用示例
基本使用
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
# 初始化调度器
scheduler = QueueScheduler(
receive_thread_num=4,
task_thread_num=8,
shard_num=12
)
# 启动调度器
scheduler.start_queue_scheduler()
# 创建消息
message = MessageItem(
content={"data": "example"},
destination="task_name"
)
# 定义回调函数
def callback(message_item):
print(f"任务完成: {message_item.id}")
# 发送消息
scheduler.send_message(message, callback)
# ... 程序运行 ...
# 停止调度器
scheduler.stop_queue_scheduler()
压力测试
# tests/test_stress.py
class TestStress:
@classmethod
def test_stress(cls):
TASK_NUM = 10000
scheduler = QueueScheduler(receive_thread_num=4, task_thread_num=8, shard_num=12)
scheduler.start_queue_scheduler()
# 发送大量任务
for i in range(TASK_NUM):
message = MessageItem(content={"num": i}, destination="example")
scheduler.send_message(message, cls._callback)
# 等待所有任务完成
while scheduler.queue_operation.get_queue_length() > 0:
time.sleep(0.5)
scheduler.stop_queue_scheduler()
安装与运行
前提条件
Python 3.7+
Rust 工具链
SQLite 开发文件
安装步骤
-
克隆仓库:
git clone https://gitee.com/cai-xinpenge/queue_sqlite.git cd sqlite-task-queue
-
安装 Python 依赖:
pip install -r requirements.txt
-
构建 Rust 核心模块:
cd src/core maturin develop --release
将会在
src/core/target/release目录下生成core.dll或core.so文件。 将该文件复制到queue_sqlite/core目录下(dll文件需改名为pyd后缀)。 -
运行测试:
pytest tests/ -
性能指标
在标准开发机器上(8核CPU,16GB内存):
可处理 10,000+ 任务(斐波那契数列前500项计算)/分钟
平均任务延迟 < 50ms
CPU 使用率 < 60%
内存占用 < 500MB
贡献指南
欢迎贡献代码!请遵循以下步骤:
-
Fork 仓库
创建新分支 (git checkout -b feature/your-feature)
提交更改 (git commit -am 'Add some feature')
推送到分支 (git push origin feature/your-feature)
-
创建 Pull Request
许可证
本项目采用 MIT 许可证。
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file queue_sqlite-0.2.1.tar.gz.
File metadata
- Download URL: queue_sqlite-0.2.1.tar.gz
- Upload date:
- Size: 41.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
43842a50333cd93131d26e43a5c6fdbe0084d062c871595f69c9b0bbdff4519a
|
|
| MD5 |
268b5f88a8f9966d24f058169bc7d33d
|
|
| BLAKE2b-256 |
66c02dbb2cf71cffcfdbab1edb81dc743a664181116f6a5b152f4b60b22a4cc0
|
File details
Details for the file queue_sqlite-0.2.1-py3-none-any.whl.
File metadata
- Download URL: queue_sqlite-0.2.1-py3-none-any.whl
- Upload date:
- Size: 45.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8847581f87ed9232e53767ed4e7f942c34112044c9aab4d57af2e0219089fd2
|
|
| MD5 |
91adb4b45a09667445e468b68733a517
|
|
| BLAKE2b-256 |
b135bdb8e94140aa4bde0a828143b4599bbe2aec42dbf3e622810a942e7410ae
|