A powerful and simple cron job scheduler that dominates APScheduler
Project description
🚀 nb_cron
下一代 Python 分布式定时任务框架(全面超越 APScheduler)
nb_cron 是一个强大、极简且专为云原生架构设计的定时任务调度库。它不仅彻底解决了 APScheduler 常年存在的序列化崩溃、多实例重复执行等痛点,更在架构理念上实现了**“业务逻辑与调度配置的物理隔离”**,让 Python 定时任务的管理进入真正的现代化阶段。
🥊 核心痛点对决:APScheduler vs nb_cron
| 痛点场景 | 😭 APScheduler 的历史包袱 | 🤩 nb_cron 的现代解决方案 |
|---|---|---|
| 云原生多副本部署 (K8s/多容器) |
致命弱点:多实例会重复执行任务,需手动硬编码第三方 Redis 锁,极易死锁。 | 天生云原生:内置分布式锁(Redis/Mongo/SQL),天然支持 K8s 多副本平滑扩容。对同一次触发点,默认只有一个实例可获得执行锁。 |
| 微服务跨项目调度 | 强耦合:任务代码和调度器必须在同一个项目中,无法集中化管理。 | 跨 Git 项目可视化编排:首创业务与调度解耦机制。A 项目只写函数,B 项目(调度中心)通过 Web UI 动态下发定时配置。 |
| 代码重构与序列化 | Pickle 地狱:存入 DB 的是函数内存地址,一旦代码重构(改名/移动文件路径),反序列化直接崩溃,任务全线瘫痪。 | 彻底抛弃 Pickle:首创 @cron_register 稳定名称注册表,存入 DB 的仅是纯字符串。代码随便重构,只要名字在,任务照样跑。 |
| 可视化管理后台 | 官方没有 UI,想要启停任务、看日志只能自己从头手搓前后端。 | 原生自带 Web UI:内置开箱即用的 Vue3 + Element Plus 现代化后台。前端已预编译到 nb_cron/web,Python 开发者无需安装 Node.js 即可一键启动。 |
| 精度与时区 | Cron 不支持秒级精度;时区配置与 Misfire 策略行为混乱。 | 强制 6 字段 Cron(支持秒级);默认本地时区(可传 tz),Misfire 容忍策略极简可控。 |
| 多项目共享存储 | 多项目共用一个 Redis 时极易发生 Key 冲突,互相踩踏任务。 | 强制物理隔离:初始化必须传 name 参数,按项目名称进行 Redis Key 的绝对隔离。 |
| 选择困难症 | 提供 7 种 Scheduler 类(Blocking, Background, AsyncIO...),新手永远选错。 | 大道至简:全局只有一个 NbCron 类,永远在后台非阻塞运行,同时兼容同步与 async 异步函数。 |
| 集群分布式消费 | 只能在本地调度并执行,面对海量重计算任务力不从心。 | 一键变身分布式 MQ:支持无缝切换至 FunboostExecutor,瞬间获得失败重试、指数退避、超时杀死等工业级分布式消息队列消费能力。 |
✨ 为什么 nb_cron 是“下一代”框架?
传统的定时任务框架,往往把**“业务代码”和“定时规则(如每天凌晨两点)”**死死绑在一起。 nb_cron 带来了全新的架构理念:
- 配置即数据,无需重启服务:通过 Web UI 随时修改任务的 Cron 表达式或启停任务,配置实时写入 Redis 并生效,你的业务进程全程无需重启。
- 函数定义与任务调度的物理分离:让后端开发人员只管专心写业务函数并打上
@cron_register标记;让运维或运营人员在独立的 Web 页面上,通过下拉菜单选择已注册的函数,在nb_cron_ui 的前端去创建定时任务。
安装
⚠️ 重要提示:PyPI 上的包名是
very_nb_cron(不是nb_cron),因为nb_cron已被他人占用。代码中的 import 仍然是from nb_cron import ...,只是安装时用pip install very_nb_cron。
# 基础安装(内存存储)
pip install very_nb_cron
# Redis 存储 + FastAPI Web(推荐,生产环境一行搞定)
pip install very_nb_cron[redis,fastapi]
# Redis + Flask
pip install very_nb_cron[redis,flask]
# 全部功能
pip install very_nb_cron[all]
各可选组件:
| 组件 | 安装命令 | 说明 |
|---|---|---|
| redis | pip install very_nb_cron[redis] |
Redis 存储 + 分布式锁 |
| mongo | pip install very_nb_cron[mongo] |
MongoDB 存储 + 分布式锁 |
| sqlalchemy | pip install very_nb_cron[sqlalchemy] |
SQLite/MySQL/PostgreSQL 存储 |
| fastapi | pip install very_nb_cron[fastapi] |
FastAPI Web 框架集成 |
| flask | pip install very_nb_cron[flask] |
Flask Web 框架集成 |
| django | pip install very_nb_cron[django] |
Django + Ninja 框架集成 |
快速开始
最简用法
from nb_cron import NbCron, cron_register
cron = NbCron("my_project") # name 必传,隔离不同项目
@cron.job("0 */5 * * * *") # 每5分钟执行(6字段:秒 分 时 日 月 周)
@cron_register('my_task') # 必须注册稳定名称
def my_task():
print("Hello nb_cron!")
cron.start() # 不阻塞,定时任务后台运行,进程不会退出
完整示例
from datetime import timedelta, timezone
from nb_cron import NbCron, cron_register, add_cron_register, explain_cron
# 创建调度器(name 必传,选一个存储后端)
cron = NbCron("my_project") # 内存存储
# cron = NbCron("my_project", "redis://localhost:6379/0") # Redis(推荐)
# cron = NbCron("my_project", tz=timezone(timedelta(hours=8))) # 指定东八区
# cron = NbCron("my_project", tz=timezone.utc) # UTC
# ── 装饰器方式(@cron_register 在下,@cron.job 在上) ──
# 无参数任务
@cron.job("0 */5 * * * *")
@cron_register('report')
def report_task():
print("生成报告")
# 有参数任务(必须在装饰器中传 args/kwargs)
@cron.job("0 30 9 * * *", args=("admin@example.com",), kwargs={"report_type": "daily"})
@cron_register('send_report_email')
def send_report_email(to_address: str, report_type: str = "daily"):
print(f"发送{report_type}报表到 {to_address}")
# 有参数任务(装饰器中直接传参)
@cron.job("0 0 2 * * *", args=("backup_db",), kwargs={"compress": True})
@cron_register('backup')
def backup_database(db_name: str, compress: bool = False):
print(f"备份数据库 {db_name}, 压缩={compress}")
# 异步任务也支持
@cron.job("30 0 9 * * 1-5", trigger="cron")
@cron_register('async_work')
async def async_task():
print("异步任务也支持!")
# 间隔任务
@cron.job("@every 30s", trigger="interval")
@cron_register('heartbeat')
def heartbeat():
print("心跳")
# 日期任务(一次性)
@cron.job("2026-10-01 09:00:00", trigger="date")
@cron_register('national_day')
def national_day_task():
print("国庆节任务")
# ── 非装饰器写法(适合第三方函数或动态注册) ──
# 方式 1:add_cron_register + add_job
def send_sms(phone: str, message: str):
"""发送短信(第三方函数)"""
print(f"发送短信到 {phone}: {message}")
add_cron_register('send_sms', send_sms)
cron.add_job(
'send_sms',
"0 0 8 * * *",
trigger="cron",
job_id="morning_sms",
name="早安短信",
args=("13800138000", "早上好!"),
)
# 方式 2:直接 add_job(自动注册)
from nb_cron import cron_register
def cleanup_logs(days: int = 7):
"""清理日志"""
print(f"清理{days}天前的日志")
cron_register('cleanup_logs', cleanup_logs)
cron.add_job(
cleanup_logs, # 传函数对象
"0 0 3 * * 0",
trigger="cron",
job_id="weekly_cleanup",
name="每周清理",
kwargs={"days": 30}, # 覆盖默认参数
)
# 方式 3:批量注册任务
def process_order(order_id: int):
"""处理订单"""
print(f"处理订单 {order_id}")
add_cron_register('process_order', process_order)
# 动态添加多个不同参数的任务
cron.add_job('process_order', "@every 5m", job_id="process_order_batch1", args=(1001,))
cron.add_job('process_order', "@every 5m", job_id="process_order_batch2", args=(1002,))
cron.add_job('process_order', "@every 5m", job_id="process_order_batch3", args=(1003,))
# ── 启动 ──
cron.start()
# ── 管理 ──
cron.pause_job("backup")
cron.resume_job("backup")
cron.trigger_job("backup")
cron.remove_job("backup")
jobs = cron.get_jobs()
# ── Cron 翻译 ──
print(explain_cron("0 30 9 * * *", "zh")) # "每天 09:30:00 执行"
print(explain_cron("0 30 9 * * *", "en")) # "At 09:30:00, every day"
项目隔离(name 参数)
NbCron 的第一个参数 name 是必传的,用于隔离不同项目的数据:
- Redis: keys 格式为
nb_cron:{name}:jobs、nb_cron:{name}:metrics、nb_cron:{name}:due、nb_cron:{name}:functions、nb_cron:{name}:lock:* - MongoDB: collections 为
nb_cron_{name}_jobs、nb_cron_{name}_metrics、nb_cron_{name}_locks - SQLAlchemy: 表名为
nb_cron_{name}_jobs、nb_cron_{name}_metrics、nb_cron_{name}_locks - Web UI: 侧边栏标题显示
name,方便区分
# 同一个 Redis,不同项目互不干扰
cron_a = NbCron("billing_service", "redis://localhost:6379/0")
cron_b = NbCron("user_service", "redis://localhost:6379/0")
# cron_a 只看到 billing_service:jobs 下的任务
# cron_b 只看到 user_service:jobs 下的任务
不传 name 或传空字符串会直接报错:
NbCron("") # ValueError: NbCron name 不能为空
NbCron() # TypeError: missing required argument 'name'
时区支持
nb_cron 默认使用本地时区。所有时间(next_run_time、日期表达式解析等)都基于调度器的时区。
from datetime import timedelta, timezone
# 默认:本地时区(推荐)
cron = NbCron("my_project")
# 显式指定时区
cron = NbCron("my_project", tz=timezone(timedelta(hours=8))) # 东八区
cron = NbCron("my_project", tz=timezone.utc) # UTC
# Python 3.9+ 可用 zoneinfo
from zoneinfo import ZoneInfo
cron = NbCron("my_project", tz=ZoneInfo("Asia/Shanghai"))
tz 参数接受任何 datetime.tzinfo 对象。
三种触发器类型
nb_cron 支持三种触发器类型,通过 trigger 参数显式指定(也可以不传,自动推断):
| trigger 值 | 含义 | expression 示例 |
|---|---|---|
"cron" |
6 字段 cron 表达式 | "0 */5 * * * *" |
"interval" |
固定间隔重复执行 | "@every 30s", "5m", "2h" |
"date" |
指定时间执行一次 | "2026-10-01 09:00:00", "2026年10月01日" |
自动推断 vs 显式指定
# 自动推断(不传 trigger,nb_cron 自动判断)
@cron.job("0 */5 * * * *") # → cron
@cron.job("@every 30s") # → interval
@cron.job("2026-10-01 09:00:00") # → date
# 显式指定(推荐,语义更清晰)
@cron.job("0 */5 * * * *", trigger="cron")
@cron.job("@every 30s", trigger="interval")
@cron.job("2026-10-01 09:00:00", trigger="date")
# trigger="interval" 时支持简写(不需要 @every 前缀)
@cron.job("30s", trigger="interval") # 等价于 @every 30s
@cron.job("5m", trigger="interval") # 等价于 @every 5m
@cron.job("2h", trigger="interval") # 等价于 @every 2h
注意: 以上所有
@cron.job下面都需要@cron_register('名称')装饰器。
函数注册(强制)
nb_cron 强制要求所有定时函数必须通过 @cron_register 注册一个稳定名称(cron_func_name)。
这确保函数标识不依赖文件路径——重命名文件、移动函数不会影响调度。
from nb_cron import cron_register, add_cron_register
# 装饰器注册
@cron_register('daily_backup')
def backup_db():
print("备份")
backup_db.cron_func_name # → 'daily_backup' (IDE 可补全)
# 函数调用注册(适合第三方函数)
def send_email():
print("发邮件")
add_cron_register('send_email', send_email)
# 也支持 cron_register 两参数形式
cron_register('send_email', send_email)
cron_register 的更多写法(已支持)
# 1) 无参形式:默认使用函数名
@cron_register()
def send_daily_email():
pass
assert send_daily_email.cron_func_name == "send_daily_email"
# 2) 无括号形式:同样默认使用函数名
@cron_register
def refresh_cache():
pass
assert refresh_cache.cron_func_name == "refresh_cache"
注册冲突检查(已内置)
- 同一个函数不能注册两个不同名字
- 同一个名字不能绑定两个不同函数
- 冲突会抛
ValueError,避免运行期覆盖导致的隐性 bug
与 @cron.job 配合
@cron_register 放下面(靠近函数),@cron.job 放上面:
@cron.job("0 0 2 * * *", trigger="cron") # 第二步:读 .cron_func_name,注册调度
@cron_register('daily_backup') # 第一步:设 .cron_func_name,注册函数
def backup_db():
print("备份")
add_job 三种传参方式
# 1. 传函数对象(自动读 .cron_func_name)
cron.add_job(backup_db, "0 0 2 * * *", trigger="cron")
# 2. 传注册名字符串
cron.add_job('daily_backup', "0 0 2 * * *", trigger="cron")
# 3. 传 .cron_func_name(IDE 安全,等价于方式 2)
cron.add_job(backup_db.cron_func_name, "0 0 2 * * *", trigger="cron")
# 4. 带参数的任务(args 和 kwargs)
def send_email(to: str, subject: str, body: str = ""):
print(f"发送邮件到 {to}: {subject}")
cron_register('send_email', send_email)
# 在装饰器中传参
@cron.job("0 9 * * * *", args=("admin@example.com", "日报"), kwargs={"body": "这是日报内容"})
@cron_register('daily_report_email')
def daily_report_email(to: str, subject: str, body: str = ""):
print(f"发送日报到 {to}")
# 或在 add_job 中传参
cron.add_job(
'send_email',
"0 9 * * * *",
trigger="cron",
job_id="morning_email",
args=("user@example.com", "晨报"),
kwargs={"body": "这是晨报内容"},
)
# 5. 批量添加同函数不同参数的任务
def process_batch(batch_id: int):
print(f"处理批次 {batch_id}")
cron_register('process_batch', process_batch)
cron.add_job('process_batch', "@every 10m", job_id="batch_1", args=(1,))
cron.add_job('process_batch', "@every 10m", job_id="batch_2", args=(2,))
cron.add_job('process_batch', "@every 10m", job_id="batch_3", args=(3,))
未注册直接报错
@cron.job("0 */5 * * * *")
def simple_task(): # ❌ ValueError: 函数 'simple_task' 未注册 cron_func_name
pass
函数找不到的处理
如果 Redis 中存储了某个 job 但对应的定时函数没有被导入或已被删除:
- 失败次数 +1(计入 metrics)
- job 状态变为
error - 前端显示红色"异常"标签,不再显示"运行中"误导用户
- 日志打印 ERROR 级别信息
- 继续调度(
cron/interval任务会在后续触发点继续重试) - 自动恢复(函数重新可解析且执行成功后,状态会自动恢复为
active) - 注意:
date一次性任务天生只触发一次,若触发窗口内函数缺失,不会再按时间表自动重试
Web UI 管理后台(重点)
nb_cron 自带漂亮的管理后台,包含:
- 仪表盘:任务总数、运行中/已暂停/异常卡片、24小时执行趋势图、成功率饼图
- 任务管理:列表搜索/筛选、暂停/恢复/立即执行/删除操作、新建任务对话框
- 任务详情:执行指标图表、最近10次执行记录、错误日志
- Cron 工具:Cron 表达式翻译器
- 中英文切换
支持 FastAPI、Flask、Django 三种框架一键启动。
方式一:FastAPI 启动(推荐)
pip install very_nb_cron[redis,fastapi]
创建 app.py:
from nb_cron import NbCron, cron_register
from nb_cron.web import get_fastapi_app
cron = NbCron("my_project", "redis://localhost:6379/0")
# 无参数任务
@cron.job("*/10 * * * * *", trigger="cron", name="心跳检测")
@cron_register('heartbeat')
def heartbeat():
print("heartbeat OK")
# 有参数任务(必须在装饰器中传 args/kwargs)
@cron.job("0 */5 * * * *", trigger="cron", name="数据同步", args=(), kwargs={"source": "mysql", "target": "redis"})
@cron_register('sync_data')
def sync_data(source: str = "mysql", target: str = "redis"):
print(f"sync from {source} to {target}")
# 有参数任务(装饰器中传参)
@cron.job("0 30 2 * * *", trigger="cron", name="每日备份", args=("prod_db",), kwargs={"compress": True})
@cron_register('daily_backup')
def daily_backup(db_name: str, compress: bool = False):
print(f"backup {db_name}, compress={compress}")
# 非装饰器写法:第三方函数
def send_email(to: str, subject: str, body: str):
"""发送邮件(第三方库函数)"""
print(f"邮件已发送到 {to}: {subject}")
cron_register('send_email', send_email)
cron.add_job(
'send_email',
"0 0 9 * * 1-5",
trigger="cron",
job_id="morning_report_email",
name="晨报邮件",
args=("admin@example.com", "每日晨报", "这是晨报内容"),
)
# 批量添加同函数不同参数的任务
def process_queue(queue_name: str):
"""处理队列"""
print(f"processing queue: {queue_name}")
cron_register('process_queue', process_queue)
cron.add_job('process_queue', "@every 1m", job_id="process_queue_1", args=("queue_1",))
cron.add_job('process_queue', "@every 1m", job_id="process_queue_2", args=("queue_2",))
cron.add_job('process_queue', "@every 1m", job_id="process_queue_3", args=("queue_3",))
app = get_fastapi_app(cron)
@app.on_event("startup")
def startup():
cron.start()
@app.on_event("shutdown")
def shutdown():
cron.stop()
启动:
uvicorn app:app --host 0.0.0.0 --port 8000 --reload
打开浏览器访问:
| 地址 | 说明 |
|---|---|
| http://localhost:8000/nb_cron/ui/ | 管理后台 UI 页面 |
| http://localhost:8000/nb_cron/api/jobs | REST API - 任务列表 |
| http://localhost:8000/nb_cron/api/health | 健康检查(含时区信息) |
| http://localhost:8000/nb_cron/api/dashboard/stats | 仪表盘统计数据 |
| http://localhost:8000/nb_cron/api/cron/explain?expression=0+*/5+*+*+*+* | Cron 翻译 |
| http://localhost:8000/docs | FastAPI 自动生成的 Swagger 文档 |
方式二:Flask 启动
pip install very_nb_cron[redis,flask]
创建 app.py:
from nb_cron import NbCron, cron_register
from nb_cron.web import get_flask_app
cron = NbCron("my_project", "redis://localhost:6379/0")
# 无参数任务
@cron.job("*/10 * * * * *", trigger="cron", name="心跳")
@cron_register('heartbeat')
def heartbeat():
print("heartbeat OK")
# 有参数任务(必须在装饰器中传 args/kwargs)
@cron.job("0 */5 * * * *", trigger="cron", name="同步", kwargs={"source": "mysql", "target": "redis"})
@cron_register('sync_data')
def sync_data(source: str = "mysql", target: str = "redis"):
print(f"sync from {source} to {target}")
# 非装饰器写法
def cleanup(days: int = 7):
"""清理日志"""
print(f"cleanup logs older than {days} days")
cron_register('cleanup', cleanup)
cron.add_job(
'cleanup',
"0 0 3 * * 0",
trigger="cron",
job_id="weekly_cleanup",
name="每周清理",
kwargs={"days": 30},
)
app = get_flask_app(cron)
if __name__ == "__main__":
cron.start()
app.run(host="0.0.0.0", port=5000, debug=False)
启动:
# 开发模式
python app.py
# 生产模式(注意: 只用 1 个 worker,或用 Redis 存储自动防重复)
gunicorn app:app -w 1 -b 0.0.0.0:5000
访问 http://localhost:5000/nb_cron/ui/
方式三:Django 启动
pip install very_nb_cron[redis,django]
Step 1 — 创建调度器配置文件 your_project/cron_config.py:
from nb_cron import NbCron, cron_register
cron = NbCron("my_project", "redis://localhost:6379/0")
# 无参数任务
@cron.job("*/10 * * * * *", trigger="cron", name="心跳")
@cron_register('heartbeat')
def heartbeat():
print("heartbeat OK")
# 有参数任务(必须在装饰器中传 args/kwargs)
@cron.job("0 */5 * * * *", trigger="cron", name="同步", kwargs={"source": "mysql", "target": "redis"})
@cron_register('sync_data')
def sync_data(source: str = "mysql", target: str = "redis"):
print(f"sync from {source} to {target}")
# 非装饰器写法:批量添加任务
def process_order(order_id: int):
"""处理订单"""
print(f"processing order {order_id}")
cron_register('process_order', process_order)
cron.add_job('process_order', "@every 5m", job_id="process_order_1", args=(1001,))
cron.add_job('process_order', "@every 5m", job_id="process_order_2", args=(1002,))
cron.add_job('process_order', "@every 5m", job_id="process_order_3", args=(1003,))
Step 2 — 在 urls.py 中挂载路由:
from django.contrib import admin
from django.urls import path
from your_project.cron_config import cron
from nb_cron.web import get_django_urls
urlpatterns = [
path('admin/', admin.site.urls),
] + get_django_urls(cron)
Step 3 — 在 apps.py 中启动调度器(防止 reload 重复启动):
import os
from django.apps import AppConfig
class YourAppConfig(AppConfig):
name = 'your_app'
def ready(self):
if os.environ.get('RUN_MAIN') == 'true':
from your_project.cron_config import cron
cron.start()
启动:
python manage.py runserver 0.0.0.0:8000
访问 http://localhost:8000/nb_cron/ui/
前端 UI 构建说明
nb_cron 的管理后台前端源码位于 nb_cron_ui/ 目录,使用以下技术栈:
- Vue 3 — 响应式前端框架
- Element Plus — UI 组件库
- ECharts — 图表库
- Pinia — 状态管理
- Vue I18n — 中英文国际化
- Vue Router — 路由管理
- Vite — 构建工具
构建前端(发布前必须执行)
cd nb_cron_ui
npm install # 安装依赖
npm run build # 编译,输出到 nb_cron/web/static/
构建完成后,nb_cron/web/static/ 目录下会生成 index.html 和 assets/ 目录,
Python 后端会自动读取并在 /nb_cron/ui/ 路径下提供服务。
前端开发模式
如果你要修改前端代码:
# 终端 1:启动后端 API(以 FastAPI 为例)
uvicorn your_app:app --port 8000
# 终端 2:启动前端开发服务器(自动代理 API 到 8000 端口)
cd nb_cron_ui
npm run dev
Vite 开发服务器会自动将 /nb_cron/api/ 请求代理到 http://localhost:8000,
实现前后端分离开发、热更新。
前端目录结构
nb_cron_ui/
├── package.json # 依赖配置
├── vite.config.js # Vite 配置(base路径、构建输出、API代理)
├── index.html # 入口 HTML
├── src/
│ ├── main.js # Vue 应用入口
│ ├── App.vue # 根组件
│ ├── router/index.js # 路由配置
│ ├── stores/app.js # Pinia 状态管理(侧边栏、标签页、语言)
│ ├── i18n/ # 国际化
│ │ ├── index.js # i18n 配置
│ │ ├── zh.js # 中文翻译
│ │ └── en.js # 英文翻译
│ ├── api/index.js # Axios API 封装
│ ├── views/
│ │ ├── Dashboard.vue # 仪表盘(统计卡片 + ECharts 图表)
│ │ ├── JobList.vue # 任务列表(搜索/操作/状态展示/新建任务)
│ │ ├── JobDetail.vue # 任务详情(指标 + 执行记录 + 错误日志)
│ │ └── CronTool.vue # Cron 表达式翻译工具
│ └── components/
│ ├── AppLayout.vue # 整体布局(侧边栏 + 顶栏 + 内容区)
│ ├── Sidebar.vue # 左侧导航栏
│ └── TabsBar.vue # 右侧多标签栏
Cron 表达式
nb_cron 强制 6 字段 cron 表达式,消除歧义:
┌──────────── 秒 (0-59)
│ ┌────────── 分 (0-59)
│ │ ┌──────── 时 (0-23)
│ │ │ ┌────── 日 (1-31)
│ │ │ │ ┌──── 月 (1-12)
│ │ │ │ │ ┌── 周 (0-6, 0=周日)
│ │ │ │ │ │
* * * * * *
传入 5 字段会直接报错,避免用户写错。
常用示例
| 表达式 | 含义 |
|---|---|
* * * * * * |
每秒 |
0 * * * * * |
每分钟整秒 |
*/10 * * * * * |
每10秒 |
0 */5 * * * * |
每5分钟 |
0 0 * * * * |
每小时整点 |
0 30 9 * * * |
每天 09:30:00 |
0 0 9 * * 1-5 |
工作日 09:00:00 |
0 0 0 1 * * |
每月1号 00:00:00 |
0 0 2 * * 0 |
每周日 02:00:00 |
间隔表达式
除了 cron,也支持 @every 简写(trigger 自动推断为 interval):
@cron.job("@every 30s") # 每30秒(自动推断)
@cron.job("@every 5m", trigger="interval") # 每5分钟(显式指定)
@cron.job("2h", trigger="interval") # 每2小时(简写,显式指定时可省略 @every)
@cron.job("@every 1d") # 每天
@cron.job("@every 1w") # 每周
日期表达式(一次性任务)
支持多种日期格式(trigger 自动推断为 date),日期按调度器的时区解析:
@cron.job("2026-10-01 09:00:00", trigger="date") # ISO 格式
@cron.job("2026-10-01", trigger="date") # 仅日期(当天 00:00:00)
@cron.job("2026/10/01 09:00:00", trigger="date") # 斜线分隔
@cron.job("2026年10月01日", trigger="date") # 中文日期
Cron 翻译功能
from nb_cron import explain_cron
print(explain_cron("0 30 9 * * *", "zh")) # "每天09:30:00执行"
print(explain_cron("0 30 9 * * *", "en")) # "At 09:30:00, every day"
print(explain_cron("0 0 9 * * 1-5", "zh")) # "每周一至周五09:00:00执行"
print(explain_cron("*/5 * * * * *", "zh")) # "每5秒执行"
REST API 也支持翻译:GET /nb_cron/api/cron/explain?expression=0+*/5+*+*+*+*
分布式部署
nb_cron 在 Redis/MongoDB/SQLAlchemy 存储模式下自动支持分布式:
cron = NbCron("my_project", "redis://localhost:6379/0")
原理: 每次任务触发时,调度器先用 SET NX PX (Redis) 或 findOneAndUpdate (MongoDB) 获取分布式锁。锁的 key 包含任务 ID 和触发时间戳,确保同一次触发只有一个实例执行。
不需要额外配置,不需要 leader election,开箱即用。
REST API
所有 API 前缀:/nb_cron/api/
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /jobs |
获取所有任务(含指标) |
| GET | /jobs/{job_id} |
获取单个任务详情 |
| POST | /jobs |
创建任务 |
| DELETE | /jobs/{job_id} |
删除任务 |
| POST | /jobs/{job_id}/pause |
暂停任务 |
| POST | /jobs/{job_id}/resume |
恢复任务 |
| POST | /jobs/{job_id}/trigger |
立即触发一次 |
| GET | /jobs/{job_id}/metrics |
获取任务指标 |
| GET | /dashboard/stats |
仪表盘统计(含 error_count) |
| GET | /cron/explain?expression=... |
Cron 表达式翻译 |
| GET | /functions |
获取已注册函数列表 |
| GET | /health |
健康检查(含时区信息) |
创建任务 API
POST /nb_cron/api/jobs
{
"func_ref": "daily_backup",
"expression": "0 0 2 * * *",
"trigger": "cron",
"job_id": "my_job",
"name": "我的任务",
"args": ["backup_db"],
"kwargs": {"compress": true},
"max_instances": 1
}
func_ref: 函数的cron_func_name(通过@cron_register注册的稳定名称)trigger: 可选"cron"/"interval"/"date",不传则自动推断args: 位置参数列表,会按顺序传给函数kwargs: 关键字参数字典,会作为命名参数传给函数
创建带参数的任务示例
发送邮件任务:
{
"func_ref": "send_email",
"expression": "0 9 * * * *",
"trigger": "cron",
"job_id": "morning_email",
"name": "晨报邮件",
"args": ["admin@example.com", "每日晨报"],
"kwargs": {"body": "这是晨报内容"}
}
批量处理任务(同函数不同参数):
{
"func_ref": "process_queue",
"expression": "@every 1m",
"trigger": "interval",
"job_id": "process_queue_1",
"name": "处理队列 1",
"args": ["queue_1"]
}
{
"func_ref": "process_queue",
"expression": "@every 1m",
"trigger": "interval",
"job_id": "process_queue_2",
"name": "处理队列 2",
"args": ["queue_2"]
}
获取已注册函数
GET /nb_cron/api/functions
从存储后端(Redis/MongoDB/SQLAlchemy)读取所有已注册的函数名称列表,支持跨 Git 项目共享。
跨项目工作原理:
- 项目 A 中用
@cron_register标记的函数,会在cron.start()时自动同步到 Redis - 项目 B 的 Web UI 通过此 API 读取 Redis 中的函数列表
- 即使项目 A 没有运行,函数列表依然可用(持久化在 Redis 中)
示例:
# 项目 A:只标记函数,不添加定时任务
@cron_register('send_email')
def send_email(to, subject):
...
@cron_register('generate_report')
def generate_report(type):
...
cron.start() # 函数名自动同步到 Redis
# 项目 B:Web UI 调用 API
GET /nb_cron/api/functions
# 返回:{"functions": ["send_email", "generate_report"]}
# Web UI 下拉框显示这些函数,用户可以选择并创建定时任务
存储后端
| 后端 | URL 格式 | 分布式锁 | 适用场景 |
|---|---|---|---|
| Memory | None(默认) |
进程内锁 | 开发/单实例 |
| Redis | redis://host:port/db |
SET NX PX | 生产/分布式(推荐) |
| MongoDB | mongodb://host:port/db |
findOneAndUpdate | 生产/分布式 |
| SQLAlchemy | sqlite:///path / mysql+pymysql://... |
INSERT conflict | 生产/分布式 |
任务指标
nb_cron 自动收集每个任务的执行指标(固定大小,不爆内存):
total_runs- 总执行次数success_count/fail_count- 成功/失败次数last_run_at- 最后执行时间last_error- 最后一次错误信息(截断500字符)avg_duration_ms/max_duration_ms/min_duration_ms- 执行耗时统计recent_results- 最近10次执行结果(环形缓冲区)hourly_stats- 24小时逐时统计(固定24个槽位)
Redis 中单个任务的指标占用不超过 2KB。
任务状态
| 状态 | 含义 | 前端显示 |
|---|---|---|
active |
正常运行中 | 绿色 运行中 |
paused |
已暂停 | 黄色 已暂停 |
error |
定时函数未找到 | 红色 异常 |
当函数未找到时(如函数被删除、模块未导入),job 自动标记为 error 状态并计入失败次数。修复函数后,cron/interval 任务会在后续触发成功后自动恢复为 active(date 一次性任务除外)。
API 参考
NbCron(name, store_url=None, max_workers=20, tick_seconds=1.0, misfire_grace_seconds=60, tz=None, executor=None)
创建调度器实例。
name: 必传,调度器名称。用于隔离不同项目的 Redis keys / MongoDB collections / SQL 表。多个项目共享同一个 Redis 时互不干扰。UI 侧边栏会显示此名称store_url: 存储后端 URL,None 表示内存存储max_workers: 线程池大小tick_seconds: 调度循环间隔(秒)misfire_grace_seconds: misfire 容忍时间(超过此时间的错过任务会被跳过)tz: 时区,默认None使用本地时区。接受任何datetime.tzinfo对象executor: 可选执行器,默认本地线程池。可传FunboostExecutor实现“调度与执行解耦”
@cron.job(expression, *, trigger=None, job_id=None, name=None, args=(), kwargs=None, max_instances=1)
装饰器,注册定时任务。同时支持同步函数和 async 函数。函数必须先用 @cron_register 注册。
expression: 触发表达式(cron / 间隔 / 日期时间)trigger: 触发器类型,可选"cron"/"interval"/"date",不传则自动推断max_instances: 预留并发控制参数(当前版本会存储并返回该字段,后续版本将提供严格并发上限控制)
cron.add_job(func, expression, *, trigger=None, job_id=None, name=None, ...)
编程方式添加任务,返回 Job 对象。
func: 函数对象 或cron_func_name字符串(通过@cron_register注册的名称)expression: 触发表达式trigger: 触发器类型,可选"cron"/"interval"/"date",不传则自动推断
@cron_register(cron_func_name) / add_cron_register(cron_func_name, func) / cron_register(cron_func_name, func)
给函数绑定路径无关的稳定名称。被装饰的函数会多出 .cron_func_name 属性。
from nb_cron import cron_register, add_cron_register
cron.start() / cron.stop(wait=True)
start()不阻塞,立即返回,后面的代码继续执行。- 主线程跑完后进程不会退出,定时任务持续运行。
Ctrl+C优雅停止。- 不需要任何
sleep/join/input/block参数。
@every 间隔任务首次何时执行
@every 5s 等 IntervalTrigger 首次 next_run 为「注册时刻的 now」,会在下一个调度 tick 内执行第一次,不会先空等 5 秒;之后每隔 5 秒一次。
cron.pause_job(job_id) / cron.resume_job(job_id)
暂停/恢复任务。
cron.trigger_job(job_id)
立即执行一次(不等待下次触发时间)。
cron.get_jobs() / cron.get_job(job_id) / cron.remove_job(job_id)
查询/删除任务。
explain_cron(expression, lang="en")
翻译 cron 表达式为人类可读文本。lang 支持 "zh"(中文)和 "en"(英文)。
get_fastapi_app(cron) / get_flask_app(cron) / get_django_urls(cron)
一行创建带 Web UI 的应用,分别返回 FastAPI app、Flask app、Django URL 列表。
from nb_cron.web import get_fastapi_app, get_flask_app, get_django_urls
示例代码
完整可运行的示例代码在 examples/ 目录下:
| 文件 | 说明 |
|---|---|
example_fastapi_redis.py |
FastAPI + Redis 完整示例,含多个任务 |
example_flask_redis.py |
Flask + Redis 完整示例 |
example_django_redis.py |
Django + Redis 集成指南(分步说明) |
example_memory_simple.py |
最简示例,内存存储,无需 Redis |
demo_clean_history_date_job.py |
一次性任务清理示例(手动清理 + 自动清理 + Funboost 多进程) |
demo_cross_git_project_manage_corn_tasks/proj1.py |
跨项目示例 - 项目 1:函数定义与定时任务 |
demo_cross_git_project_manage_corn_tasks/proj2_fastapi_cron.py |
跨项目示例 - 项目 2:Web UI 管理后台 |
跨 Git 项目示例(重点)
nb_cron 的核心特性:函数定义与任务调度分离,支持跨 Git 项目管理。
- 项目 1(
proj1.py):业务项目,用@cron_register标记函数,函数名自动同步到 Redis - 项目 2(
proj2_fastapi_cron.py):FastAPI 管理后台,通过 Web UI 为项目 1 的函数添加定时任务
快速开始
# 终端 1:启动项目 1
cd examples/demo_cross_git_project_manage_corn_tasks
python proj1.py
# 终端 2:启动项目 2
uvicorn proj2_fastapi_cron:app --reload
# 访问 Web UI:http://localhost:8000/nb_cron/ui/
工作原理
项目 1 (业务项目) Redis 项目 2 (管理后台)
@cron_register('func') → 函数名列表 → GET /nb_cron/api/functions
cron.start() 同步 → job 配置 ← POST /nb_cron/api/jobs 创建
执行函数 (本地进程) ← 调度任务 ← Web UI 操作
优势
- 职责分离:项目 1 专注业务,项目 2 专注调度
- 安全:只有
@cron_register标记的函数才暴露 - 灵活:项目 2 动态创建任务,无需修改项目 1 的代码
- 可维护:项目 1 重构不影响项目 2 的调度
应用场景
微服务架构、多租户 SaaS、DevOps 自动化、数据平台等。
Funboost 执行器(核弹级能力)
关于funboost的教程请参考:https://funboost.readthedocs.io/zh-cn/latest/index.html
nb_cron 默认的执行器是在本地线程池中直接调用函数。但如果你将 executor 指定为 FunboostExecutor,nb_cron 的任务触发时不在本地执行,而是通过 funboost 的 .push() 把任务推送到消息队列(Redis / RabbitMQ / Kafka / MEMORY_QUEUE 等),由 funboost worker 进程消费执行。
这意味着你瞬间获得了 funboost 的全部能力:
为什么 FunboostExecutor 这么强?
因为 funboost 的 BoosterParams 提供了工业级的任务消费能力,一个参数类就覆盖了 99% 的调度和函数运行控制需求:
| 能力 | BoosterParams 参数 | 说明 |
|---|---|---|
| 30+ 种消息队列 | broker_kind |
Redis / RabbitMQ / Kafka / RocketMQ / Celery / SQS ... 30+ 种中间件随意切换 |
| 精准控频 | qps |
指定每秒执行次数,支持小数(如 0.01 = 每100秒1次),无需关心并发数 |
| 分布式控频 | is_using_distributed_frequency_control |
多个消费者实例共享同一 qps 限额,总频率不超 |
| 智能并发池 | concurrent_num + concurrent_mode |
线程/协程/协程+多进程/单线程,自适应扩缩容,任务少时自动缩减线程 |
| 自动重试 | max_retry_times |
函数出错自动重试,支持指数退避(is_using_advanced_retry) |
| 指数退避重试 | advanced_retry_config |
1s → 2s → 4s → 8s → 16s → 32s → 60s...,支持 sleep 模式和 requeue 模式 |
| 死信队列 | is_push_to_dlx_queue_when_retry_max_times |
重试耗尽后自动进入死信队列,不丢消息 |
| 函数超时 | function_timeout |
运行超时自动杀死,防止任务卡死 |
| 消息过期 | msg_expire_seconds |
消息超过指定时间自动丢弃,不执行过期任务 |
| 任务去重 | do_task_filtering + task_filtering_expire_seconds |
相同参数的任务自动去重,防止重复执行 |
| 运行时间限制 | allow_run_time_cron |
只在指定 cron 时间段内消费执行,如 * 9-17 * * 1-5 仅工作日上班时间 |
| 执行结果持久化 | function_result_status_persistance_conf |
保存函数入参、运行结果、运行状态到 MongoDB,可追溯 |
| RPC 模式 | is_using_rpc_mode |
发布端可同步获取消费端的执行结果 |
| 多进程消费 | mp_consume(process_num=N) |
协程/线程 + 多进程叠加,性能炸裂 |
| 消费者分组 | booster_group |
按业务分组启动消费者,灵活管理 |
| 自定义并发池 | specify_concurrent_pool |
多个消费者共享一个线程池,节约资源 |
| async 支持 | specify_async_loop |
指定 event loop,支持 aiohttp 等要求同 loop 的异步库 |
安装
pip install very_nb_cron[redis] funboost
基本用法
from funboost import BoosterParams, BrokerEnum
from nb_cron import NbCron, cron_register
from nb_cron.executors.funboost_executor import FunboostExecutor
# 创建 funboost 执行器,BoosterParams 的所有参数都支持 IDE 自动补全
funboost_executor = FunboostExecutor(
BoosterParams(
queue_name="nb_cron_dispatch", # 消息队列名
broker_kind=BrokerEnum.REDIS, # 中间件类型,30+ 种可选
concurrent_num=50, # 并发数
qps=20, # 精准控频:每秒最多执行 20 次
max_retry_times=3, # 失败自动重试 3 次
is_using_distributed_frequency_control=True, # 分布式控频
)
)
# NbCron 构造时自动调用 executor.bind_cron(self)
# worker 执行完后直接用 cron.metrics.record() 写指标,无需重建 store
cron = NbCron("my_project", "redis://localhost:6379/0", executor=funboost_executor)
@cron.job("0 */5 * * * *", kwargs={"user_id": 42})
@cron_register('my_task')
def my_task(user_id: int):
print(f"processing user {user_id}")
# 启动 funboost 消费者 + nb_cron 调度器
funboost_executor.consume() # 单进程消费(非阻塞)
# funboost_executor.mp_consume(process_num=4) # 多进程消费,性能炸裂
cron.start()
工作原理
nb_cron 调度器 funboost 消息队列 funboost worker
cron tick 触发任务 ─push()→ Redis/RabbitMQ/Kafka ─consume()→ 解析 cron_func_name
计算 next_run_time 持久化存储,不丢消息 FunctionRegistry.resolve()
分布式锁防重复 执行函数 + 上报 metrics
关键区别:默认执行器是「调度 + 执行在同一进程」,FunboostExecutor 是「调度端 push,消费端执行」,天然解耦。
高级用法
BoosterParams 的 入参非常丰富,各种函数控制功能都有,所以nb_cron的执行器可以充分借助funboost的威力,所以作者没有给nb_cron默认的本地线程池executor加太多功能,例如重试功能/超时杀死功能等。因为你即使没有安装任何消息队列,也可以 BoosterParams(...,broker_kind=BrokerEnum.MEMORY_QUEUE) 来使用funboost的内存队列。
1. 指数退避重试
funboost_executor = FunboostExecutor(
BoosterParams(
queue_name="nb_cron_dispatch",
broker_kind=BrokerEnum.REDIS,
max_retry_times=5,
is_using_advanced_retry=True,
advanced_retry_config={
'retry_mode': 'requeue', # requeue 模式:消息发回队列延迟重试,不占线程
'retry_base_interval': 1.0, # 基础间隔 1s
'retry_multiplier': 2.0, # 指数退避倍数
'retry_max_interval': 60.0, # 最大间隔 60s
'retry_jitter': True, # 随机抖动,防止惊群
},
)
)
# 重试间隔:1s → 2s → 4s → 8s → 16s → 32s → 60s → 60s...
2. Worker 独立进程部署(横向扩展)
调度端和消费端可以完全分离部署,消费端可以独立横向扩展:
# scheduler.py — 调度端(只 push,不消费)
from nb_cron import NbCron, cron_register
from nb_cron.executors.funboost_executor import FunboostExecutor
from funboost import BoosterParams, BrokerEnum
funboost_executor = FunboostExecutor(
BoosterParams(queue_name="nb_cron_dispatch", broker_kind=BrokerEnum.REDIS)
)
cron = NbCron("my_project", "redis://localhost:6379/0", executor=funboost_executor)
@cron.job("0 */5 * * * *")
@cron_register('my_task')
def my_task():
print("执行任务")
cron.start() # 只调度,不消费
# worker.py — 消费端(独立进程,可部署多台机器)
from nb_cron import NbCron
from nb_cron.executors.funboost_executor import FunboostExecutor
from funboost import BoosterParams, BrokerEnum
import my_tasks # 触发 @cron_register,让注册表生效
funboost_executor = FunboostExecutor(
BoosterParams(queue_name="nb_cron_dispatch", broker_kind=BrokerEnum.REDIS)
)
cron = NbCron("my_project", "redis://localhost:6379/0", executor=funboost_executor)
funboost_executor.mp_consume(process_num=4) # 4 进程消费
3. 自定义指标回调
def my_recorder(job_id, success, duration_ms, error):
# 同时上报 Prometheus / 自定义监控系统
prometheus_metrics.labels(job_id=job_id).observe(duration_ms)
funboost_executor = FunboostExecutor(
BoosterParams(queue_name="nb_cron_dispatch", broker_kind=BrokerEnum.REDIS),
metrics_recorder=my_recorder,
)
FunboostExecutor vs 默认执行器
| 特性 | 默认 Executor | FunboostExecutor |
|---|---|---|
| 执行方式 | 本地线程池直接调用 | push 到消息队列,worker 消费执行 |
| 消息持久化 | ❌ 进程崩溃任务丢失 | ✅ 消息队列持久化,不丢任务 |
| 横向扩展 | ❌ 只能单进程 | ✅ worker 独立部署,无限扩展 |
| 精准控频 | ❌ | ✅ qps 参数,支持分布式控频 |
| 自动重试 | ❌ | ✅ max_retry_times + 指数退避 |
| 任务去重 | ❌ | ✅ do_task_filtering |
| 消息过期 | ❌ | ✅ msg_expire_seconds |
| 死信队列 | ❌ | ✅ 重试耗尽自动进入死信队列 |
| 函数超时 | ❌ | ✅ function_timeout |
| 运行时间限制 | ❌ | ✅ allow_run_time_cron |
| 30+ 种消息队列 | ❌ | ✅ broker_kind 一键切换 |
| 多进程消费 | ❌ | ✅ mp_consume(process_num=N) |
| 执行结果持久化 | ✅ (store) | ✅ (store + funboost MongoDB) |
运行示例
# FastAPI + Redis(推荐)
cd examples
pip install very_nb_cron[redis,fastapi]
uvicorn example_fastapi_redis:app --reload
# Flask + Redis
pip install very_nb_cron[redis,flask]
python example_flask_redis.py
# 最简示例(无需 Redis)
pip install very_nb_cron[fastapi]
uvicorn example_memory_simple:app --reload
# 跨 Git 项目示例(重点推荐)
# 终端 1:启动项目 1(业务项目)
cd examples/demo_cross_git_project_manage_corn_tasks
python proj1.py
# 终端 2:启动项目 2(FastAPI 管理后台)
uvicorn proj2_fastapi_cron:app --reload
历史一次性定时任务清理功能
nb_cron 会自动清理已执行完毕的一次性任务(date trigger),防止存储膨胀。
手动清理
from nb_cron import NbCron
cron = NbCron("my_project", "redis://localhost:6379/0")
# 清理 30 天前的历史任务
deleted_count = cron.cleanup_history_jobs(older_than_days=30)
print(f"删除了 {deleted_count} 个过期任务")
自动清理(推荐)
from nb_cron import NbCron
cron = NbCron("my_project", "redis://localhost:6379/0")
# 启用自动清理:每天 0 点清理 7 天前的任务
cron.enable_auto_cleanup(
cron_expression="0 0 0 * * *", # 每天 0 点执行(默认值)
older_than_days=7 # 清理 7 天前的任务(默认值)
)
参数说明
| 方法 | 参数 | 默认值 | 说明 |
|---|---|---|---|
cleanup_history_jobs |
older_than_days |
7 |
清理多少天前的历史任务 |
enable_auto_cleanup |
cron_expression |
"0 0 0 * * *" |
自动清理任务的 cron 表达式 |
enable_auto_cleanup |
older_than_days |
7 |
清理多少天前的任务 |
注意事项
- ✅ 只清理
date类型的一次性任务(cron和interval任务不受影响) - ✅ 只清理
next_run_time为None的任务(已过期的日期任务) - ✅ 只清理
updated_at时间超过older_than_days的任务 - ✅ 自动清理功能会创建一个内部任务,定时执行清理工作
- ⚠️ 清理时机:任务执行完毕后,
updated_at会被更新,因此不会立即被清理 - ⚠️ 内部任务
job_id格式为_auto_cleanup_{name},在任务列表中可见(用于排查) - ⚠️ 若使用
FunboostExecutor.mp_consume(),worker 进程需要导入业务函数模块,确保注册表可解析目标函数
License
MIT - ydf0509
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file very_nb_cron-0.1.2.tar.gz.
File metadata
- Download URL: very_nb_cron-0.1.2.tar.gz
- Upload date:
- Size: 890.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.7.16
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
80695bbd9a43eda22088b1a91dd99d69d7f03313df4cd5e3c72f77cb22747387
|
|
| MD5 |
5fe54f18c519f1b4ae793422cee5e30f
|
|
| BLAKE2b-256 |
d566aed193b941e32257b052a1bf1757341e8e2dd461657069110b2ccad8fc5a
|
File details
Details for the file very_nb_cron-0.1.2-py3-none-any.whl.
File metadata
- Download URL: very_nb_cron-0.1.2-py3-none-any.whl
- Upload date:
- Size: 880.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.7.16
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
14abd1694996f4876a4cb60d4f33bf717883d85e1988d79eed0d2cebac9ee510
|
|
| MD5 |
810715f033c307960d573db9a27df6a8
|
|
| BLAKE2b-256 |
5eb9ad976ead5069b14908524efd795cd745f9b9cc5f3147773b713d03c4cf97
|