Skip to main content

A simple message queue demo

Project description

MESSAGE QUEUE SQLITE

[TOC]

介绍

这是一个基于消息队列的任务处理系统。使用 SQLite 作为中间件实现了异步任务的提交与处理,支持多线程处理任务和结果的回调。

特性

  • 异步任务处理: 利用线程池实现任务的并发处理。
  • 动态任务注册: 可以通过装饰器轻松注册新任务。
  • 结果回调: 支持任务执行后的结果回调函数。
  • 状态管理: 通过状态枚举管理任务的生命周期,包括未开始、运行中、已完成、失败等状态

目录结构

message_queue_sqlite/ 
 ├── cache/ 
 │    └── __init__.py # 缓存模块 
 ├── config/ 
 │    └── __init__.py # 配置模块 
 ├── constants/ # 常量模块 
 │    ├── __init__.py 
 │    └── task_status.py # 任务状态常量 
 ├── middleware/ # 中间件模块 
 │    ├── __init__.py 
 │    ├── client.py # 客户端中间件 
 │    └── server.py # 服务器中间件 
 ├── model/ # 结果模型模块 
 │    ├── __init__.py │ 
 |    └── task_result.py # 任务结果模型 
 ├── task_service/ # 任务服务模块 
 │    ├── service/ 
 │    │    ├── __init__.py 
 │    │    └── services.py # 服务回调模块 
 │    ├── task/ 
 │    │    ├── __init__.py 
 │    │    ├── discover.py # 动态任务挂载模块 
 │    │    ├── task_base.py # 任务基类 
 │    │    └── tasks.py # 任务函数管理模块
 │    └── __init__.py 
 └── __init__.py # 初始化模块 

安装

  1. 创建虚拟环境

    mkdir demo
    cd demo
    python -m venv venv
    source venv/bin/activate
    
  2. 克隆仓库

    git clone https://gitee.com/cai-xinpenge/message_queue.git
    

    cd message_queue

  3. 安装

    pip install .
    

使用示例

请参考以下示例代码以了解如何使用 message_queue_sqlite

目录结构

demo/
├── app/
|    ├── engine/
|    |    ├── __init__.py
|    |    └── test.py # 回调函数
|    └── __init__.py
main.py

定义任务

app/engine/test.py

from message_queue_sqlite import task_function

@task_function(use_cache=True)
def test(message):
    print(message)
    return "test"

@task_function()
def test1(message):
    print(f"{message}1")
    return "test1"

app/engine/__init__.py

from .test import test, test1

__all__ = ["test", "test1"]

动态挂载任务并初始化服务

app/__init__.py

from message_queue_sqlite import discover_and_mount_ts, init_server
from message_queue_sqlite.config import Config 

# 动态挂在 app.engine 目录下的任务
tasks = discover_and_mount_ts("app.engine")
tasks.get_all_task_names()

# 初始化服务
config = Config(middleware_path="message_queue", max_workers=5, middleware_num=1)
server, client, send_message = init_server(tasks, config) 
# config 参数可选,默认 middleware_path 为 message_queue,max_workers 为 5,middleware_num 为 1

__all__ = ["model", "server"]

Config 参数可选

  • middleware_path 为中间件路径(不包括文件名)
  • max_workers 为线程池大小
  • middleware_num 为中间件数量(默认为 1)。

启动服务

main.py

from app import server, client, send_message
from message_queue_sqlite import stop_server, start_server
import sys
import time
from threading import Thread


if __name__ == '__main__':
    # 启动服务
    Thread(target=start_server, args=(server, client)).start()
    # 循环发送任务
    while(True):
        try:
            keyworded_args = {'message': 'hello world'}
            callback = lambda x: print(x)
            send_message('test', keyworded_args, callback, 1, True)
            send_message('test1', keyworded_args, callback, 2)
            time.sleep(0.002)
        except KeyboardInterrupt:
            break
    # 停止服务
    sys.exit(stop_server(server, client))

任务调用

# 缓存模式
send_message('test', keyworded_args, callback, priority=1, use_cache=True)  # priority 值越大优先级越高
# 非缓存模式
send_message('test1', keyworded_args, callback, priority=2)

缓存模式

当task任务的参数类型不方便被序列化时,可以选择使用缓存模式。该模式会将参数存入 cache 模块中,但执行任务时会先从 cache 中获取参数,由于 cache 只在运行时存在,所以该模式不支持持久化,若程序意外退出,缓存数据也会丢失,该模式主要针对参数类型不方便被序列化的场景。

多中间件负载均衡

区别于 1.x 版本,2.x 版本支持多中间件负载均衡,可以根据任务的执行时间和任务的负载情况,动态调整中间件的数量,以提高任务处理的效率。send_message 函数会根据指定的中间件数量,将任务发送到不同的中间件,以降低 sqlite 并发处理的压力。

任务状态

任务的生命周期通过枚举类型 TaskStatus 管理,包括:

  • NOT_STARTED: 任务尚未开始
  • RUNNING: 任务正在执行
  • FINISHED: 任务执行完成
  • FAILED: 任务执行失败
  • CALLBACKED: 任务结果已经回调

版本更新

v0.0.1

  • 实现基本功能,包括异步任务处理、动态任务注册、结果回调、状态管理。

v0.0.2

  • 代码优化

v1.0.0

  • 实现缓存模式

v1.0.1

  • 修复了一些 bug

v1.0.2

  • 性能优化

v2.0.0

  • 实现多中间件负载均衡

v2.0.1

  • 说明文档更新

联系

如有问题,请联系作者:

许可

该项目遵循 MIT 许可证。请查看 LICENSE 文件以获取更多信息。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

message_queue_sqlite-2.0.1.tar.gz (14.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

message_queue_sqlite-2.0.1-py3-none-any.whl (19.0 kB view details)

Uploaded Python 3

File details

Details for the file message_queue_sqlite-2.0.1.tar.gz.

File metadata

  • Download URL: message_queue_sqlite-2.0.1.tar.gz
  • Upload date:
  • Size: 14.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.8.10

File hashes

Hashes for message_queue_sqlite-2.0.1.tar.gz
Algorithm Hash digest
SHA256 8aa32d641f5b483b95837fc8c7d0238678ee636b426740b057b53caf0435ca45
MD5 e4b561acfa1206ba20a570f84cf31252
BLAKE2b-256 e47d0f26b3e0e759f05d6c46dd189e7431a10ed13a6c4a51a086429aa36f6c04

See more details on using hashes here.

File details

Details for the file message_queue_sqlite-2.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for message_queue_sqlite-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 dfacd8e4a7ec36e2f37cfa7b5b0492138c93f9fecee9e9eae1a73da5057b29a5
MD5 bc7b4239aa39b0472883ae6b66f51236
BLAKE2b-256 c82b983eea88cd3676ac92d16797037fff4bbdb330c6f213e9236e4c61b95545

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page