基于 APScheduler 的任务调度服务器,支持多种 JobStore 后端
Project description
APSchedulerServer 使用教程
项目简介
APSchedulerServer 是一个基于 APScheduler 的任务调度服务器,支持多种 JobStore 后端(Redis、MySQL、Django 等),提供了灵活的任务配置和注册机制。
项目结构
APSchedulerServer/
├── src/ # 源代码目录
│ └── APSchedulerServer/ # 核心代码包
│ ├── __init__.py
│ ├── base_config.py # 基础配置类
│ ├── loader.py # 配置加载器
│ └── main.py # 主程序入口
├── APSchedulerConfig/ # 配置目录(用户需要创建)
│ ├── config.py # 调度器配置
│ └── registry_tasks.py # 任务注册
├── pyproject.toml # 项目配置文件
├── setup.py # 安装脚本
└── README.md # 项目文档
安装
基础安装
pip install APSchedulerServer
安装可选依赖
根据所选的 JobStore,安装额外的依赖:
# Redis JobStore
pip install APSchedulerServer[redis]
# MySQL JobStore
pip install APSchedulerServer[mysql]
# Django JobStore (需要在 Django 项目中使用)
pip install APSchedulerServer[django]
# SQLAlchemy JobStore
pip install APSchedulerServer[sqlalchemy]
# 安装所有可选依赖
pip install APSchedulerServer[all]
从源码安装
git clone https://github.com/yourusername/APSchedulerServer.git
cd APSchedulerServer
pip install -e .
快速开始
1. 基础配置
编辑 APSchedulerConfig/config.py,配置调度器和 JobStore:
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.django import DjangoJobStore
class ApsConfig(BaseConfig):
def __init__(self):
print("配置加载成功")
def set_job_stores(self):
"""配置 JobStore"""
return {
# Redis JobStore
'redis': RedisJobStore(
jobs_key='apscheduler.jobs',
run_times_key='apscheduler.run_times',
host='localhost',
port=6379,
db=0
),
# MySQL JobStore
'mysql': SQLAlchemyJobStore(
url='mysql+mysqlconnector://user:password@localhost/scheduler_db'
),
# Django JobStore
'django': DjangoJobStore(),
# Memory JobStore (默认)
'default': MemoryJobStore()
}
def set_scheduler(self):
"""配置调度器"""
scheduler = super().set_scheduler()
# 可以在这里添加自定义配置
return scheduler
2. 任务注册
编辑 APSchedulerConfig/registry_tasks.py,注册你的定时任务:
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
import datetime
def set_registry_tasks(scheduler):
"""注册所有定时任务"""
# 示例1: 使用 Cron 表达式(每天 8:30 执行)
scheduler.add_job(
func=daily_task,
trigger=CronTrigger(hour=8, minute=30),
id='daily_task',
name='每日任务',
replace_existing=True
)
# 示例2: 使用间隔触发器(每 5 分钟执行一次)
scheduler.add_job(
func=interval_task,
trigger=IntervalTrigger(minutes=5),
id='interval_task',
name='间隔任务',
replace_existing=True
)
# 示例3: 使用日期触发器(在指定时间执行一次)
scheduler.add_job(
func=one_time_task,
trigger=DateTrigger(run_date=datetime.datetime(2024, 1, 1, 12, 0)),
id='one_time_task',
name='一次性任务',
replace_existing=True
)
print("所有任务注册完成")
# 定义任务函数
def daily_task():
"""每日任务"""
print("执行每日任务")
# 在这里添加你的业务逻辑
def interval_task():
"""间隔任务"""
print("执行间隔任务")
# 在这里添加你的业务逻辑
def one_time_task():
"""一次性任务"""
print("执行一次性任务")
# 在这里添加你的业务逻辑
3. 运行调度器
方法1:使用命令行工具(推荐)
apscheduler-server
方法2:使用 Python 模块方式
在项目根目录下执行:
python -m APSchedulerServer.main
方法3:直接运行
from APSchedulerServer.main import main
if __name__ == '__main__':
main()
JobStore 配置详解
Redis JobStore
from apscheduler.jobstores.redis import RedisJobStore
def set_job_stores(self):
return {
'redis': RedisJobStore(
jobs_key='apscheduler.jobs', # 存储任务的键名
run_times_key='apscheduler.run_times', # 存储运行时间的键名
host='localhost', # Redis 主机
port=6379, # Redis 端口
db=0, # Redis 数据库
password='your_password' # Redis 密码(可选)
)
}
MySQL JobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def set_job_stores(self):
return {
'mysql': SQLAlchemyJobStore(
url='mysql+mysqlconnector://user:password@localhost/scheduler_db',
tablename='my_custom_jobs_table' # 自定义表名
)
}
mysql数据库配置
CREATE TABLE `apscheduler_jobs`
(
`id` varchar(191) NOT NULL,
`next_run_time` float(20) DEFAULT NULL,
`job_state` longblob NOT NULL,
PRIMARY KEY (`id`),
KEY `apscheduler_jobs_next_run_time` ON `apscheduler_jobs` (`next_run_time`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
支持的数据库 URL 格式:
- MySQL:
mysql+mysqlconnector://user:password@host/dbname - PostgreSQL:
postgresql://user:password@host/dbname - SQLite:
sqlite:///jobs.sqlite
Django JobStore
from apscheduler.jobstores.django import DjangoJobStore
def set_job_stores(self):
return {
'django': DjangoJobStore()
}
注意:使用 Django JobStore 需要将 apscheduler 添加到 Django 的 INSTALLED_APPS 中。
高级配置
修改执行器配置
编辑 APSchedulerServer/base_config.py 中的 SCHEDULER_EXECUTORS:
SCHEDULER_EXECUTORS = {
"default": {"type": "threadpool", "max_workers": 20},
"cpu": {"type": "processpool", "max_workers": 8}
}
修改任务默认参数
编辑 APSchedulerServer/base_config.py 中的 SCHEDULER_JOB_DEFAULTS:
SCHEDULER_JOB_DEFAULTS = {
"coalesce": True, # 合并错过的执行
"max_instances": 5, # 最大并发实例数
"misfire_grace_time": 300 # 错过执行的宽限期(秒)
}
时区配置
编辑 APSchedulerServer/base_config.py 中的 TIME_ZONE:
TIME_ZONE = "Asia/Shanghai"
常见问题
1. 找不到模块错误
确保在项目根目录下运行:
python -m APSchedulerServer.main
2. Redis 连接失败
检查 Redis 服务是否启动,以及连接配置是否正确。
3. MySQL 连接失败
确保 MySQL 服务已启动,并且数据库已创建。
4. 任务没有执行
检查任务的触发器配置是否正确,以及调度器是否正常运行。
示例项目
示例 1: 使用 Redis JobStore
# APSchedulerConfig/config.py
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.redis import RedisJobStore
class ApsConfig(BaseConfig):
def __init__(self):
print("Redis JobStore 配置加载成功")
def set_job_stores(self):
return {
'default': RedisJobStore(
host='localhost',
port=6379,
db=0
)
}
示例 2: 使用 MySQL JobStore
# APSchedulerConfig/config.py
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
class ApsConfig(BaseConfig):
def __init__(self):
print("MySQL JobStore 配置加载成功")
def set_job_stores(self):
return {
'default': SQLAlchemyJobStore(
url='mysql+mysqlconnector://user:password@localhost/scheduler_db'
)
}
示例 3: 使用 Django JobStore
# APSchedulerConfig/config.py
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.django import DjangoJobStore
class ApsConfig(BaseConfig):
def __init__(self):
print("Django JobStore 配置加载成功")
def set_job_stores(self):
return {
'default': DjangoJobStore()
}
示例 4: 使用 SQLAlchemy JobStore
# APSchedulerConfig/config.py
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
class ApsConfig(BaseConfig):
def __init__(self):
print("SQLAlchemy JobStore 配置加载成功")
def set_job_stores(self):
return {
'default': SQLAlchemyJobStore(
url='postgresql://user:password@localhost/scheduler_db'
)
}
最佳实践
-
选择合适的 JobStore:
- 开发环境:使用 MemoryJobStore
- 生产环境:使用 Redis 或 SQLAlchemy JobStore
- Django 项目:使用 Django JobStore
- 需要跨平台支持:使用 SQLAlchemy JobStore(支持 PostgreSQL、MySQL、SQLite 等)
-
任务设计:
- 任务函数应该是幂等的(可以安全地多次执行)
- 避免长时间运行的任务
- 合理设置任务的并发实例数
-
监控和日志:
- 添加任务执行日志
- 监控任务执行状态
- 设置任务失败告警
-
错误处理:
- 在任务函数中添加适当的异常处理
- 设置合理的重试策略
许可证
本项目采用 MIT 许可证。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file apschedulerserver-0.1.3-py3-none-any.whl.
File metadata
- Download URL: apschedulerserver-0.1.3-py3-none-any.whl
- Upload date:
- Size: 8.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
587b21d5d35bf4662f59c5cd0602eb0aa7a8dc22eae7a188e9727aee6a62658d
|
|
| MD5 |
5ce38c79a21de1922f97a3c4becfae82
|
|
| BLAKE2b-256 |
cafa5e4e68adfe57f9f2a3afef5cb1cbe4638b24be6336ff970d8f9e9970cbe4
|