Skip to main content

极简分布式延迟任务队列 - 基于 Redis,支持 Cron 周期任务、异步协程、多执行器

Project description

mini-job

极简分布式延迟任务队列 — 基于 Redis,支持 Cron 周期任务、异步协程和多执行器。

特性

特性 说明
延迟任务 设定延迟秒数,到期自动执行
Cron 周期调度 支持标准 cron 表达式(分 时 日 月 星期)
三种执行器 async 协程(IO 密集)、thread 线程(通用)、process 进程(CPU 密集)
队列级执行器隔离 Redis Key {ns}:ready:{executor} 三队列隔离,消费者只拉取专属队列,零竞争
死信队列 失败的一次性任务自动进入死信队列,可排查重试
可见性超时回收 消费者崩溃后任务自动回收重入队列,不丢任务
命名空间隔离 多环境共用同一 Redis 实例,Key 前缀隔离
监控指标 内置 QueueMetrics,统计各生命周期计数
背压控制 队列深度超阈值自动告警
Pydantic 配置 集中配置管理,环境变量覆盖,类型校验
Lua 原子操作 抢占和回收均为 Redis 端原子执行,无竞态
优雅关闭 SIGTERM/SIGINT 信号处理,等待任务完成

安装

pip install mini-job              # 核心依赖
pip install mini-job[script]      # 含 pandas/numpy(脚本执行模式)

依赖:Python >= 3.12,Redis >= 7.4,croniter,pydantic-settings,python-dotenv

快速开始

1. 确保 Redis 运行

redis-cli ping  # PONG

2. 生产者 — 发布任务

from mini_job import DelayQueue

dq = DelayQueue(namespace="myapp")

# 注册脚本(动态执行模式)
dq.register_script(
    "send_email",
    """
def handler(payload):
    to_email = payload.get('to')
    print(f'发送邮件到: {to_email}')
    return {'status': 'sent', 'to': to_email}
    """,
)

# 发布任务 — executor 参数指定执行器类型
dq.publish(
    "send_email",
    {"to": "user@example.com", "subject": "欢迎", "content": "注册成功"},
    executor="async",   # async / thread / process
)

# 延迟 30 秒执行
dq.publish("send_email", {...}, delay_seconds=30)

# 每天凌晨 2 点执行(cron 表达式:分 时 日 月 星期)
dq.publish("send_email", {...}, cron="0 2 * * *")

# 查询任务结果
result = dq.get_task_result(task_id)

3. 消费者 — 按类型独立启动

from mini_job import DelayQueue

# 注册本地函数
def send_sms(payload):
    print(f"发送短信 -> {payload['phone']}")

TASK_REGISTRY = {
    "send_sms": send_sms,
}

dq = DelayQueue(namespace="myapp")
dq.start(
    task_registry=TASK_REGISTRY,
    executor_type="async",    # 本进程只消费 async 任务
)

启动不同执行器类型的消费者(3 个终端):

python consumer.py async     # 协程消费者 — IO 密集任务
python consumer.py thread    # 线程消费者 — 通用任务
python consumer.py process   # 进程消费者 — CPU 密集任务

核心概念

执行器类型

类型 适用场景 实现 推荐并发数
async IO 密集(发邮件、HTTP 请求、DB 操作) asyncio 协程 100~500
thread 通用任务、阻塞操作 ThreadPoolExecutor 30~100
process CPU 密集(数据处理、报表生成) ProcessPoolExecutor CPU 核数

任务路由表

TASK_REGISTRY = {
    # 简单格式:默认 async 执行器
    "send_sms": send_sms,

    # 带配置格式:指定执行器类型
    "daily_report": (daily_report, {"executor": "thread"}),
}

状态生命周期

pending → running → completed
                ↘ failed → 死信队列(一次性任务)
                           下次重试(周期任务)

Redis Key 设计

{namespace}:ready:{executor}  — 按执行器隔离的就绪 ZSet(async/thread/process)
{namespace}:processing:{id}   — 消费者专属处理列表
{namespace}:processing:timeout — 全局超时追踪 ZSet
{namespace}:dead_letter        — 死信队列
{namespace}:dead_letter:detail — 死信详情
{namespace}:task:meta          — 任务元数据
{namespace}:task:result:{id}   — 任务结果(独立 TTL)
{namespace}:scripts            — 注册脚本

API 参考

DelayQueue

dq = DelayQueue(namespace="myapp")
# 或使用配置对象
from mini_job import QueueConfig
dq = DelayQueue(QueueConfig(namespace="myapp"))

生产者方法:

方法 说明
publish(func, payload, delay_seconds=0, cron=None, executor="async") 发布任务 → 返回 task_id
register_script(name, content, language="python", use=[]) 注册动态脚本
get_script(name) 获取脚本信息
delete_script(name) 删除脚本
list_scripts() 列出所有脚本
get_task_result(task_id) 查询任务状态和结果

消费者方法:

方法 说明
start(task_registry, executor_type="async", **kwargs) 启动消费者
stop() 手动触发优雅关闭

start() 参数:

参数 默认值 说明
task_registry (必填) 任务路由表 {"name": func}
executor_type "async" 执行器类型:async / thread / process
poll_interval 0.5 轮询间隔(秒)
grab_limit 80 每次最多抢占任务数
worker_threads 50 工作线程/协程/进程数
task_timeout 30 单个任务超时(秒)
visibility_timeout 60 可见性超时(秒)

配置

通过 Pydantic Settings 管理,支持 .env 文件、环境变量覆盖、类型校验。

队列配置 DQ_*

参数 环境变量 默认值 类型 说明
namespace DQ_NAMESPACE "dq" str Redis Key 命名空间前缀,多环境隔离
consumer_id DQ_CONSUMER_ID 自动生成 str 消费者唯一标识,默认 worker- + 8 位 hex
result_ttl DQ_RESULT_TTL 86400 int 任务结果保留时间(秒),默认 1 天
reclaim_interval DQ_RECLAIM_INTERVAL 10 int 超时回收检查间隔(轮询周期数),每 N 轮检查一次

Redis 连接配置 DQ_REDIS_*

参数 环境变量 默认值 类型 说明
host DQ_REDIS_HOST "localhost" str Redis 主机地址
port DQ_REDIS_PORT 6379 int Redis 端口
db DQ_REDIS_DB 0 int Redis 数据库编号
password DQ_REDIS_PASSWORD None str Redis 密码(可选)
max_connections DQ_REDIS_MAX_CONNECTIONS 50 int 连接池最大连接数
socket_timeout DQ_REDIS_SOCKET_TIMEOUT 5.0 float 单次操作超时(秒)
socket_connect_timeout DQ_REDIS_SOCKET_CONNECT_TIMEOUT 5.0 float 连接建立超时(秒)
retry_on_timeout DQ_REDIS_RETRY_ON_TIMEOUT True bool 超时是否自动重试
health_check_interval DQ_REDIS_HEALTH_CHECK_INTERVAL 30 int 连接健康检查间隔(秒)

消费者配置 DQ_CONSUMER_*

参数 环境变量 默认值 类型 说明
poll_interval DQ_CONSUMER_POLL_INTERVAL 0.5 float 轮询间隔(秒),影响任务延迟精度
grab_limit DQ_CONSUMER_GRAB_LIMIT 80 int 每次最多抢占任务数,建议 worker × 1.5~2
worker_threads DQ_CONSUMER_WORKER_THREADS 50 int 工作协程/线程/进程数
task_timeout DQ_CONSUMER_TASK_TIMEOUT 30 int 单个任务执行超时(秒),超时后标记失败
visibility_timeout DQ_CONSUMER_VISIBILITY_TIMEOUT 60 int 可见性超时(秒),消费者需在此时间内完成任务
shutdown_timeout DQ_CONSUMER_SHUTDOWN_TIMEOUT 30 int 优雅关闭最大等待时间(秒)
max_queue_depth DQ_CONSUMER_MAX_QUEUE_DEPTH 10000 int 队列深度告警阈值,超阈值打印 WARNING

示例 .env

# 队列
DQ_NAMESPACE=production
DQ_CONSUMER_ID=web-server-01

# Redis
DQ_REDIS_HOST=redis.example.com
DQ_REDIS_PORT=6379
DQ_REDIS_PASSWORD=secret

# 消费者
DQ_CONSUMER_POLL_INTERVAL=0.3
DQ_CONSUMER_GRAB_LIMIT=100
DQ_CONSUMER_WORKER_THREADS=80
DQ_CONSUMER_TASK_TIMEOUT=60
DQ_CONSUMER_VISIBILITY_TIMEOUT=120

监控

# 获取监控指标快照
snapshot = dq.metrics.snapshot()
# {'published': 1000, 'completed': 980, 'failed': 15, 'timeout': 5, ...}

指标说明:

指标 含义
published 已发布任务总数
completed 成功完成数
failed 执行失败数
timeout 超时任务数
dead_lettered 进入死信队列数
reclaimed 超时回收重入队数

项目结构

mini_job/
├── __init__.py           # 公共导出
├── config.py             # Pydantic Settings 配置
├── core/
│   ├── delay_queue.py    # DelayQueue 核心
│   └── task.py           # 任务模型
├── executor/
│   ├── base.py           # 执行器抽象基类
│   ├── async_io.py       # 协程执行器
│   ├── thread.py         # 线程执行器
│   └── process.py        # 进程执行器
├── redis/
│   ├── client.py         # Redis 连接 + Lua 脚本
│   └── scripts.lua       # 原子 Lua 脚本
├── utils/
│   ├── retry.py          # 重试装饰器
│   ├── metrics.py        # 监控指标
│   └── decorators.py     # 任务装饰器
├── consumer.py           # 消费者示例
└── producer.py           # 生产者示例

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mini_job-0.2.1.tar.gz (22.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mini_job-0.2.1-py3-none-any.whl (29.6 kB view details)

Uploaded Python 3

File details

Details for the file mini_job-0.2.1.tar.gz.

File metadata

  • Download URL: mini_job-0.2.1.tar.gz
  • Upload date:
  • Size: 22.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for mini_job-0.2.1.tar.gz
Algorithm Hash digest
SHA256 78b2f9359f390b369691390780a3a2d336edc46a1f974bb438c0c293ae7c4aeb
MD5 e9919b62bc2e09269e68abfdcc0db7ad
BLAKE2b-256 e68de50bb905f26e822ed127eae41ba29c417dc9f60c0c03a1c0052d2724fd24

See more details on using hashes here.

File details

Details for the file mini_job-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: mini_job-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 29.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for mini_job-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 bdf5d26a9a8108399d0249afd6dda544d2cdd0a85d34dbcc1daafc2c46645c0c
MD5 315798c033d23682f87f47a423a9adfb
BLAKE2b-256 ec2926d3431edc362255bad969c1a1701eccb7a575009b37ba1f2d03fbba8821

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page