Skip to main content

极简分布式延迟任务队列 - 基于 Redis,支持 Cron 周期任务、异步协程、多执行器

Project description

mini-job

极简分布式延迟任务队列 — 基于 Redis,支持 Cron 周期任务、异步协程和多执行器。

特性

特性 说明
延迟任务 设定延迟秒数,到期自动执行
Cron 周期调度 支持标准 cron 表达式(分 时 日 月 星期)
三种执行器 async 协程(IO 密集)、thread 线程(通用)、process 进程(CPU 密集)
执行器隔离 每种消费者只消费匹配类型的任务,互不干扰
死信队列 失败的一次性任务自动进入死信队列,可排查重试
可见性超时回收 消费者崩溃后任务自动回收重入队列,不丢任务
命名空间隔离 多环境共用同一 Redis 实例,Key 前缀隔离
监控指标 内置 QueueMetrics,统计各生命周期计数
背压控制 队列深度超阈值自动告警
Pydantic 配置 集中配置管理,环境变量覆盖,类型校验
Lua 原子操作 抢占和回收均为 Redis 端原子执行,无竞态
优雅关闭 SIGTERM/SIGINT 信号处理,等待任务完成

安装

pip install mini-job
# 或开发模式
git clone <repo-url> && cd mini_job && uv sync

依赖:Python >= 3.12,Redis >= 7.4,croniter,pydantic-settings

快速开始

1. 确保 Redis 运行

redis-cli ping  # PONG

2. 生产者 — 发布任务

from mini_job import DelayQueue

dq = DelayQueue(namespace="myapp")

# 注册脚本(动态执行模式)
dq.register_script(
    "send_email",
    """
def handler(payload):
    to_email = payload.get('to')
    print(f'发送邮件到: {to_email}')
    return {'status': 'sent', 'to': to_email}
    """,
)

# 发布任务 — executor 参数指定执行器类型
dq.publish(
    "send_email",
    {"to": "user@example.com", "subject": "欢迎", "content": "注册成功"},
    executor="async",   # async / thread / process
)

# 延迟 30 秒执行
dq.publish("send_email", {...}, delay_seconds=30)

# 每天凌晨 2 点执行(cron 表达式:分 时 日 月 星期)
dq.publish("send_email", {...}, cron="0 2 * * *")

# 查询任务结果
result = dq.get_task_result(task_id)

3. 消费者 — 按类型独立启动

from mini_job import DelayQueue

# 注册本地函数
def send_sms(payload):
    print(f"发送短信 -> {payload['phone']}")

TASK_REGISTRY = {
    "send_sms": send_sms,
}

dq = DelayQueue(namespace="myapp")
dq.start(
    task_registry=TASK_REGISTRY,
    executor_type="async",    # 本进程只消费 async 任务
)

启动不同执行器类型的消费者(3 个终端):

python consumer.py async     # 协程消费者 — IO 密集任务
python consumer.py thread    # 线程消费者 — 通用任务
python consumer.py process   # 进程消费者 — CPU 密集任务

核心概念

执行器类型

类型 适用场景 实现 推荐并发数
async IO 密集(发邮件、HTTP 请求、DB 操作) asyncio 协程 100~500
thread 通用任务、阻塞操作 ThreadPoolExecutor 30~100
process CPU 密集(数据处理、报表生成) ProcessPoolExecutor CPU 核数

任务路由表

TASK_REGISTRY = {
    # 简单格式:默认 async 执行器
    "send_sms": send_sms,

    # 带配置格式:指定执行器类型
    "daily_report": (daily_report, {"executor": "thread"}),
}

状态生命周期

pending → running → completed
                ↘ failed → 死信队列(一次性任务)
                           下次重试(周期任务)

Redis Key 设计

{namespace}:ready              — 待执行任务 ZSet
{namespace}:processing:{id}   — 消费者处理列表
{namespace}:processing:timeout — 全局超时追踪 ZSet
{namespace}:dead_letter        — 死信队列
{namespace}:dead_letter:detail — 死信详情
{namespace}:task:meta          — 任务元数据
{namespace}:task:result:{id}   — 任务结果(独立 TTL)
{namespace}:scripts            — 注册脚本

API 参考

DelayQueue

dq = DelayQueue(namespace="myapp")
# 或使用配置对象
from mini_job import QueueConfig
dq = DelayQueue(QueueConfig(namespace="myapp"))

生产者方法:

方法 说明
publish(func, payload, delay_seconds=0, cron=None, executor="async") 发布任务 → 返回 task_id
register_script(name, content, language="python", use=[]) 注册动态脚本
get_script(name) 获取脚本信息
delete_script(name) 删除脚本
list_scripts() 列出所有脚本
get_task_result(task_id) 查询任务状态和结果

消费者方法:

方法 说明
start(task_registry, executor_type="async", **kwargs) 启动消费者
stop() 手动触发优雅关闭

start() 参数:

参数 默认值 说明
task_registry (必填) 任务路由表 {"name": func}
executor_type "async" 执行器类型:async / thread / process
poll_interval 0.5 轮询间隔(秒)
grab_limit 80 每次最多抢占任务数
worker_threads 50 工作线程/协程/进程数
task_timeout 30 单个任务超时(秒)
visibility_timeout 60 可见性超时(秒)

配置

通过 Pydantic Settings 管理,支持 .env 文件、环境变量覆盖、类型校验。

队列配置 DQ_*

参数 环境变量 默认值 类型 说明
namespace DQ_NAMESPACE "dq" str Redis Key 命名空间前缀,多环境隔离
consumer_id DQ_CONSUMER_ID 自动生成 str 消费者唯一标识,默认 worker- + 8 位 hex
result_ttl DQ_RESULT_TTL 86400 int 任务结果保留时间(秒),默认 1 天
reclaim_interval DQ_RECLAIM_INTERVAL 10 int 超时回收检查间隔(轮询周期数),每 N 轮检查一次

Redis 连接配置 DQ_REDIS_*

参数 环境变量 默认值 类型 说明
host DQ_REDIS_HOST "localhost" str Redis 主机地址
port DQ_REDIS_PORT 6379 int Redis 端口
db DQ_REDIS_DB 0 int Redis 数据库编号
password DQ_REDIS_PASSWORD None str Redis 密码(可选)
max_connections DQ_REDIS_MAX_CONNECTIONS 50 int 连接池最大连接数
socket_timeout DQ_REDIS_SOCKET_TIMEOUT 5.0 float 单次操作超时(秒)
socket_connect_timeout DQ_REDIS_SOCKET_CONNECT_TIMEOUT 5.0 float 连接建立超时(秒)
retry_on_timeout DQ_REDIS_RETRY_ON_TIMEOUT True bool 超时是否自动重试
health_check_interval DQ_REDIS_HEALTH_CHECK_INTERVAL 30 int 连接健康检查间隔(秒)

消费者配置 DQ_CONSUMER_*

参数 环境变量 默认值 类型 说明
poll_interval DQ_CONSUMER_POLL_INTERVAL 0.5 float 轮询间隔(秒),影响任务延迟精度
grab_limit DQ_CONSUMER_GRAB_LIMIT 80 int 每次最多抢占任务数,建议 worker × 1.5~2
worker_threads DQ_CONSUMER_WORKER_THREADS 50 int 工作协程/线程/进程数
task_timeout DQ_CONSUMER_TASK_TIMEOUT 30 int 单个任务执行超时(秒),超时后标记失败
visibility_timeout DQ_CONSUMER_VISIBILITY_TIMEOUT 60 int 可见性超时(秒),消费者需在此时间内完成任务
shutdown_timeout DQ_CONSUMER_SHUTDOWN_TIMEOUT 30 int 优雅关闭最大等待时间(秒)
max_queue_depth DQ_CONSUMER_MAX_QUEUE_DEPTH 10000 int 队列深度告警阈值,超阈值打印 WARNING

示例 .env

# 队列
DQ_NAMESPACE=production
DQ_CONSUMER_ID=web-server-01

# Redis
DQ_REDIS_HOST=redis.example.com
DQ_REDIS_PORT=6379
DQ_REDIS_PASSWORD=secret

# 消费者
DQ_CONSUMER_POLL_INTERVAL=0.3
DQ_CONSUMER_GRAB_LIMIT=100
DQ_CONSUMER_WORKER_THREADS=80
DQ_CONSUMER_TASK_TIMEOUT=60
DQ_CONSUMER_VISIBILITY_TIMEOUT=120

监控

# 获取监控指标快照
snapshot = dq.metrics.snapshot()
# {'published': 1000, 'completed': 980, 'failed': 15, 'timeout': 5, ...}

指标说明:

指标 含义
published 已发布任务总数
completed 成功完成数
failed 执行失败数
timeout 超时任务数
dead_lettered 进入死信队列数
reclaimed 超时回收重入队数

项目结构

mini_job/
├── __init__.py           # 公共导出
├── config.py             # Pydantic Settings 配置
├── core/
│   ├── delay_queue.py    # DelayQueue 核心
│   └── task.py           # 任务模型
├── executor/
│   ├── base.py           # 执行器抽象基类
│   ├── async_io.py       # 协程执行器
│   ├── thread.py         # 线程执行器
│   └── process.py        # 进程执行器
├── redis/
│   ├── client.py         # Redis 连接 + Lua 脚本
│   └── scripts.lua       # 原子 Lua 脚本
├── utils/
│   ├── retry.py          # 重试装饰器
│   ├── metrics.py        # 监控指标
│   └── decorators.py     # 任务装饰器
├── consumer.py           # 消费者示例
└── producer.py           # 生产者示例

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mini_job-0.2.0.tar.gz (21.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mini_job-0.2.0-py3-none-any.whl (29.6 kB view details)

Uploaded Python 3

File details

Details for the file mini_job-0.2.0.tar.gz.

File metadata

  • Download URL: mini_job-0.2.0.tar.gz
  • Upload date:
  • Size: 21.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for mini_job-0.2.0.tar.gz
Algorithm Hash digest
SHA256 6a59ee4e739a4a08bf60702fc3327a06395b7076340f122b1caaff430d809823
MD5 0cd4a3a33eaa57d1635d04049974585f
BLAKE2b-256 cc6e69937dd48e25d78a975ab8f23c14d31feede5b104f8acec4aa634e48393f

See more details on using hashes here.

File details

Details for the file mini_job-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: mini_job-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 29.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for mini_job-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 55078fc21a5640d8df0e8143d6a8ccd0970235a05eb0362507153ef7c52e452b
MD5 c374cba07d9c10d5b5d4e21bb2f039a9
BLAKE2b-256 10e32817a1ae606428f0752066065625ce6f0b9c149985e4ac893168c48e9172

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page