Skip to main content

A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems.

Project description

SQLite 任务队列系统

这个项目是一个基于 SQLite 的高性能任务队列系统,使用 Rust 和 Python 混合编程实现。系统提供了任务调度、消息队列管理和任务生命周期管理的完整解决方案。

主要特性

  • 🚀 高性能:使用 Rust 实现核心操作,确保高性能

  • 📚 多分片支持:支持数据库分片,提高并发处理能力

  • ⏱️ 智能调度:提供接收调度器、任务调度器和清理调度器

  • 🔒 任务生命周期管理:支持任务状态跟踪、重试机制和过期处理

  • 📊 监控支持:内置资源监控功能

  • 🧩 任务挂载系统:通过装饰器轻松添加新任务

项目结构

src/
├── core/                # Rust 核心实现
│   ├── lib.rs           # 主模块
│   ├── queue_operation.rs # 队列操作实现
│   └── task_mounter.rs  # 任务挂载实现
│
├── queue_sqlite/        # Python 实现
│   ├── constant/        # 常量定义(消息优先级、状态、类型)
│   ├── core/            # 核心接口
│   ├── model/           # 数据模型
│   ├── queue_operation/ # 队列操作封装
│   ├── scheduler/       # 调度器实现
│   └── task_cycle/      # 任务生命周期管理
│
tests/                  # 测试代码

核心组件

  1. 消息模型 (MessageItem)

    定义了任务消息的数据结构,包含:

    • 消息ID、类型、状态

    • 内容、创建时间、更新时间

    • 优先级、来源、目标

    • 重试次数、过期时间

    • 标签和元数据

  2. 队列操作 (QueueOperation)

    提供对 SQLite 队列的基本操作:

    • 初始化数据库

    • 入队和出队操作

    • 获取队列长度和已完成消息

    • 更新状态和结果

    • 删除消息和清理过期消息

  3. 调度系统

    包含三个主要调度器:

    接收调度器 (ReceiveScheduler)

    • 处理消息发送

    • 管理回调函数

    • 接收已完成消息

    任务调度器 (TaskScheduler)

    • 从队列中获取任务

    • 调用任务函数

    • 更新任务状态和结果

    清理调度器 (CleanupScheduler)

    • 清理过期消息

    • 删除旧消息(默认清理7天前的消息)

  4. 任务挂载系统 (TaskMounter) 提供装饰器挂载任务函数:

    @TaskMounter.task(meta={"task_name": "example"})
    def example_task(message_item: MessageItem):
        # 任务逻辑
        return result
    

使用示例

基本使用

from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem

# 初始化调度器
scheduler = QueueScheduler(
    receive_thread_num=4, 
    task_thread_num=8, 
    shard_num=12
)

# 启动调度器
scheduler.start_queue_scheduler()

# 创建消息
message = MessageItem(
    content={"data": "example"},
    destination="task_name"
)

# 定义回调函数
def callback(message_item):
    print(f"任务完成: {message_item.id}")

# 发送消息
scheduler.send_message(message, callback)

# ... 程序运行 ...

# 停止调度器
scheduler.stop_queue_scheduler()

压力测试

# tests/test_stress.py
class TestStress:
    @classmethod
    def test_stress(cls):
        TASK_NUM = 10000
        scheduler = QueueScheduler(receive_thread_num=4, task_thread_num=8, shard_num=12)
        scheduler.start_queue_scheduler()
        
        # 发送大量任务
        for i in range(TASK_NUM):
            message = MessageItem(content={"num": i}, destination="example")
            scheduler.send_message(message, cls._callback)
        
        # 等待所有任务完成
        while scheduler.queue_operation.get_queue_length() > 0:
            time.sleep(0.5)
        
        scheduler.stop_queue_scheduler()

安装与运行

前提条件

Python 3.7+

Rust 工具链

SQLite 开发文件

安装步骤

  1. 克隆仓库:

    git clone https://gitee.com/cai-xinpenge/queue_sqlite.git
    cd sqlite-task-queue
    
  2. 安装 Python 依赖:

    pip install -r requirements.txt
    
  3. 构建 Rust 核心模块:

    cd src/core
    maturin develop --release
    

    将会在 src/core/target/release 目录下生成 core.dllcore.so 文件。 将该文件复制到 queue_sqlite/core 目录下(dll文件需改名为pyd后缀)。

  4. 运行测试:

    pytest tests/
    
  5. 性能指标

    在标准开发机器上(8核CPU,16GB内存):

    可处理 10,000+ 任务(斐波那契数列前500项计算)/分钟

    平均任务延迟 < 50ms

    CPU 使用率 < 60%

    内存占用 < 500MB

贡献指南

欢迎贡献代码!请遵循以下步骤:

  1. Fork 仓库

    创建新分支 (git checkout -b feature/your-feature)

    提交更改 (git commit -am 'Add some feature')

    推送到分支 (git push origin feature/your-feature)

  2. 创建 Pull Request

许可证

本项目采用 MIT 许可证。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queue_sqlite-0.2.1.tar.gz (41.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queue_sqlite-0.2.1-py3-none-any.whl (45.4 kB view details)

Uploaded Python 3

File details

Details for the file queue_sqlite-0.2.1.tar.gz.

File metadata

  • Download URL: queue_sqlite-0.2.1.tar.gz
  • Upload date:
  • Size: 41.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.11

File hashes

Hashes for queue_sqlite-0.2.1.tar.gz
Algorithm Hash digest
SHA256 43842a50333cd93131d26e43a5c6fdbe0084d062c871595f69c9b0bbdff4519a
MD5 268b5f88a8f9966d24f058169bc7d33d
BLAKE2b-256 66c02dbb2cf71cffcfdbab1edb81dc743a664181116f6a5b152f4b60b22a4cc0

See more details on using hashes here.

File details

Details for the file queue_sqlite-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: queue_sqlite-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 45.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.11

File hashes

Hashes for queue_sqlite-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a8847581f87ed9232e53767ed4e7f942c34112044c9aab4d57af2e0219089fd2
MD5 91adb4b45a09667445e468b68733a517
BLAKE2b-256 b135bdb8e94140aa4bde0a828143b4599bbe2aec42dbf3e622810a942e7410ae

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page