Skip to main content

A high-performance, SQLite-based distributed task queue system with Rust-backed core operations. Supports task mounting, message listening, priority handling, retry mechanisms, and automatic cleanup of expired messages. Ideal for building reliable, scalable background task processing systems.

Project description

Queue SQLite - 高性能 SQLite 任务队列系统

python-3.11+ rust-1.65+ license-MIT version-0.2.1

一个基于 SQLite 的高性能任务队列系统,采用 Rust 核心操作,支持任务挂载、消息监听、优先级处理、重试机制和自动清理过期消息。适合构建可靠、可扩展的后台任务处理系统。

🌟 特性

核心优势

  • 🚀 高性能:Rust 核心提供毫秒级任务处理
  • 💾 持久化存储:基于 SQLite 的可靠消息存储
  • 🔄 多调度器支持:标准、异步、Qt 三种调度模式
  • 🎯 智能分片:自动哈希分片,支持横向扩展
  • 📊 全面监控:内置资源使用监控和队列状态查看

功能亮点

  • ✅ 任务装饰器:使用 @task 装饰器轻松注册任务
  • ✅ 监听装饰器:使用 @listener 装饰器实现数据变更监听
  • ✅ 优先级队列:支持 LOW/NORMAL/HIGH/URGENT 四级优先级
  • ✅ 重试机制:可配置的最大重试次数和延迟重试
  • ✅ 过期清理:自动清理过期和完成的消息
  • ✅ 批量操作:支持消息批量入队和处理
  • ✅ 异步支持:原生支持 async/await 异步任务
  • ✅ Qt 集成:可选 Qt 调度器用于 GUI 应用

📦 安装

前置要求

  • Python 3.11+
  • Rust 1.65+ (用于编译核心扩展)
  • SQLite 3.35+

安装方式

方式一:从源码安装(推荐)

# 克隆仓库
git clone https://github.com/chakcy/queue_sqlite.git
cd queue_sqlite

# 安装 Rust(如果未安装)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# 安装 Python 依赖
pip install -r requirements.txt

# 安装开发模式
pip install -e .

方式二:从 PyPI 安装

pip install queue-sqlite

🚀 快速开始

基本使用

from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.constant import MessagePriority
from queue_sqlite.mounter import task


# 1. 注册任务
@task(meta={"max_retries": 3, "delay": 1})
def process_image(message_item):
    """处理图片任务"""
    data = message_item.content
    # 处理逻辑
    return {"status": "success", "processed": data["image_id"]}


# 2. 创建调度器
scheduler = QueueScheduler(scheduler_type="standard")

# 3. 启动调度器
scheduler.start()

# 4. 发送任务
for i in range(10):
    message = MessageItem(
        content={"image_id": i, "path": f"/images/{i}.jpg"},
        destination="process_image",  # 任务函数名
        priority=MessagePriority.HIGH,  # HIGH 优先级
        tags="image_processing",
    )

    def callback(result_message):
        print(f"任务完成: {result_message.id}, 结果: {result_message.result}")

    scheduler.send_message(message, callback)

# 5. 等待任务完成
import time

while scheduler.queue_operation.get_queue_length() > 0:
    print(f"剩余任务: {scheduler.queue_operation.get_queue_length()}")
    time.sleep(1)

# 6. 停止调度器
scheduler.stop()

异步任务示例

import asyncio
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.mounter import task


@task(meta={"name": "async_processor", "max_retries": 2})
async def async_data_fetcher(message_item):
    """异步数据获取任务"""
    url = message_item.content["url"]
    # 模拟异步 HTTP 请求
    await asyncio.sleep(0.5)
    return {"url": url, "data": "fetched", "status": 200}


async def main():
    scheduler = QueueScheduler(scheduler_type="async")
    scheduler.start()

    # 发送异步任务
    message = MessageItem(
        content={"url": "https://api.example.com/data"},
        destination="async_data_fetcher",
    )

    scheduler.send_message(message, lambda m: print(f"完成: {m.id}"))

    await asyncio.sleep(5)
    scheduler.stop()


asyncio.run(main())

数据监听示例

from queue_sqlite import QueueScheduler
from queue_sqlite.mounter import listener

# 注册监听器
@listener()
def user_activity_log(data):
    """监听用户活动数据"""
    print(f"用户活动: {data}")

@listener()
def system_alert(data):
    """监听系统告警"""
    print(f"系统告警: {data}")

# 创建调度器
scheduler = QueueScheduler()
scheduler.start()

# 更新监听数据(会自动触发监听函数)
scheduler.update_listen_data("user_activity_log", "用户登录")
scheduler.update_listen_data("user_activity_log", "用户购买")
scheduler.update_listen_data("system_alert", "CPU使用率过高")

⚙️ 配置选项

调度器配置

from queue_sqlite import SchedulerConfig, QueueScheduler

config = SchedulerConfig(
    receive_thread_num=2,    # 接收线程数
    task_thread_num=8,       # 任务执行线程数
    shard_num=4,             # 数据库分片数
    queue_name="production", # 队列名称
    meta={"app": "myapp"}    # 自定义元数据
)

scheduler = QueueScheduler(
    scheduler_type="standard",  # standard | async | qt
    config=config
)

消息配置

from queue_sqlite import MessageItem
from queue_sqlite.constant import MessagePriority, MessageType
from datetime import datetime, timedelta

message = MessageItem(
    # 必需字段
    content={"data": "任务数据"},
    destination="task_function_name",
    
    # 可选字段
    id="custom-uuid",  # 默认自动生成
    type=MessageType.TASK,
    priority=MessagePriority.HIGH,
    source="web_api",
    tags="urgent,processing",
    
    # 时间控制
    expire_time=datetime.now() + timedelta(hours=1),  # 1小时后过期
    retry_count=0,
    
    # 自定义元数据
    metadata={"user_id": 123, "request_id": "abc123"}
)

📊 系统架构

架构图

┌─────────────────────────────────────────────────────────┐
│                    Python application                   |
│  ┌─────────────┐  ┌─────────────┐  ┌────────────────┐   │
│  │   @task     │  │ @listener   │  │ QueueScheduler │   │
│  │             │  │             │  │                │   │
│  └─────────────┘  └─────────────┘  └────────────────┘   │
└─────────────────────────────────────────────────────────┘
                            │
┌─────────────────────────────────────────────────────────┐
│                  Python Service                         │
│  ┌──────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │ TaskMounter  │  │ TaskCycle   │  │ Schedulers  │     │
│  │ ListenMounter│  │ AsyncCycle  │  │             │     │
│  └──────────────┘  └─────────────┘  └─────────────┘     │
└─────────────────────────────────────────────────────────┘
                            │
┌─────────────────────────────────────────────────────────┐
│                   Rust core                             │
│  ┌─────────────────────────────────────────┐            │
│  │      queue_sqlite_core                  │            │
│  │  • shared sqlite database               │            │
│  │  • SQLite Optimization                  │            │
│  │  • Connection pool                      │            │
│  └─────────────────────────────────────────┘            │
└─────────────────────────────────────────────────────────┘
                            │
┌─────────────────────────────────────────────────────────┐
│                  SQLite database                        │
│  ┌───────────────────────────────────────────────┐      │
│  │      shared database (cache/queue_name/)      │      │
│  │  • queue_shard_0.db                           │      │
│  │  • queue_shard_1.db                           │      │
│  │  • listen.db                                  │      │
│  └───────────────────────────────────────────────┘      │
└─────────────────────────────────────────────────────────┘

组件说明

  1. MessageIte: 核心数据模型,包含消息的所有属性和方法
  2. TaskMounter: 任务过载器,通过装饰器注册任务函数
  3. ListenMounter:监听挂载器,通过装饰器注册监听函数
  4. TaskCycle:任务生命周期管理器,处理重试和状态更新
  5. QueueScheduler:统一调度器接口,支持三种实现:
    • StandardQueueScheduler:统一调度器接口,支持三种实现:
    • AsyncQueueScheduler:异步/等待实现
    • QtQueueScheduler:Qt 线程池实现(GUI应用)
  6. CleanupScheduler:自动清理过期消息
  7. ShardedQueueOperation:Rust 实现的高性能分片队列操作

🧪 测试

运行测试套件

# 运行所有测试
python -m -v -s pytest tests/

# 运行特定测试
python -m pytest tests/test_stress.py -v
python -m pytest tests/test_async_scheduler.py -v

性能测试示例

from tests.test_stress import TestStress

# 压力测试:处理 10000 个任务
TestStress.test_stress()

# 异步调度器测试
from tests.test_async_scheduler import TestAsyncScheduler
TestAsyncScheduler.test_async_scheduler()

📈 性能指标

基准测试结果

指标 标准调度器 异步调度器 Qt 调度器
单核 QPS 5,000+ 8,000+ 6,000+
内存占用 50-100MB 60-120MB 70-150MB
延迟(p95) <50ms <30ms <40ms
最大并发 1,000+ 2,000+ 1500+

扩展性测试

  • 10 分片:支持 50,000+ 并发任务
  • 自动负载均衡:分片间任务均匀分布
  • 线性扩展:增加分片数可线性提升吞吐量

📄 许可证

本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。

📞 联系与支持

🙏 致谢

感谢以下开源项目:

  • SQLite - 轻量级嵌入式数据库
  • PyO3 - Rust-Python 绑定
  • r2d2 - Rust 数据库连接池

Queue SQLite - 为您的应用提供可靠、高效的任务队列解决方案。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queue_sqlite-0.2.3.tar.gz (137.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queue_sqlite-0.2.3-py3-none-any.whl (49.5 kB view details)

Uploaded Python 3

File details

Details for the file queue_sqlite-0.2.3.tar.gz.

File metadata

  • Download URL: queue_sqlite-0.2.3.tar.gz
  • Upload date:
  • Size: 137.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.9

File hashes

Hashes for queue_sqlite-0.2.3.tar.gz
Algorithm Hash digest
SHA256 ae300d8e0ba3c02fc4f20ead1cd60930a5e3cda53e8b223cbf932454b9f9d835
MD5 bd8896b905d2b549f86b6ccc7646ef9f
BLAKE2b-256 fa8c00ba21f93e2f708e2b4bc20ad85928d3c36387910d8217367084fda58fc8

See more details on using hashes here.

File details

Details for the file queue_sqlite-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: queue_sqlite-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 49.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.9

File hashes

Hashes for queue_sqlite-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 ceb6b3ac89eec2e30c34d0e11f66451e57ca06cfb91d16c342799315e58ce5a6
MD5 bc73c24aaed1ad75ced3abf7525fb811
BLAKE2b-256 8615211d2526d7a1495c78b5cb3a7240f326a63743355a211d02d0ad04198817

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page