Skip to main content

基于 APScheduler 的任务调度服务器,支持多种 JobStore 后端

Project description

APSchedulerServer 使用教程

项目简介

APSchedulerServer 是一个基于 APScheduler 的任务调度服务器,支持多种 JobStore 后端(Redis、MySQL、Django 等),提供了灵活的任务配置和注册机制。

项目结构

APSchedulerServer/
├── src/                     # 源代码目录
│   └── APSchedulerServer/   # 核心代码包
│       ├── __init__.py
│       ├── base_config.py   # 基础配置类
│       ├── loader.py        # 配置加载器
│       └── main.py          # 主程序入口
├── APSchedulerConfig/       # 配置目录(用户需要创建)
│   ├── config.py           # 调度器配置
│   └── registry_tasks.py   # 任务注册
├── pyproject.toml          # 项目配置文件
├── setup.py               # 安装脚本
└── README.md              # 项目文档

安装

基础安装

pip install APSchedulerServer

安装可选依赖

根据所选的 JobStore,安装额外的依赖:

# Redis JobStore
pip install APSchedulerServer[redis]

# MySQL JobStore
pip install APSchedulerServer[mysql]

# Django JobStore (需要在 Django 项目中使用)
pip install APSchedulerServer[django]

# SQLAlchemy JobStore
pip install APSchedulerServer[sqlalchemy]

# 安装所有可选依赖
pip install APSchedulerServer[all]

从源码安装

git clone https://github.com/yourusername/APSchedulerServer.git
cd APSchedulerServer
pip install -e .

快速开始

1. 基础配置

编辑 APSchedulerConfig/config.py,配置调度器和 JobStore:

from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.django import DjangoJobStore


class ApsConfig(BaseConfig):

    def __init__(self):
        print("配置加载成功")

    def set_job_stores(self):
        """配置 JobStore"""
        return {
            # Redis JobStore
            'redis': RedisJobStore(
                jobs_key='apscheduler.jobs',
                run_times_key='apscheduler.run_times',
                host='localhost',
                port=6379,
                db=0
            ),
            # MySQL JobStore
            'mysql': SQLAlchemyJobStore(
                url='mysql+mysqlconnector://user:password@localhost/scheduler_db'
            ),
            # Django JobStore
            'django': DjangoJobStore(),
            # Memory JobStore (默认)
            'default': MemoryJobStore()
        }

    def set_scheduler(self):
        """配置调度器"""
        scheduler = super().set_scheduler()
        # 可以在这里添加自定义配置
        return scheduler

2. 任务注册

编辑 APSchedulerConfig/registry_tasks.py,注册你的定时任务:

from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
import datetime


def set_registry_tasks(scheduler):
    """注册所有定时任务"""

    # 示例1: 使用 Cron 表达式(每天 8:30 执行)
    scheduler.add_job(
        func=daily_task,
        trigger=CronTrigger(hour=8, minute=30),
        id='daily_task',
        name='每日任务',
        replace_existing=True
    )

    # 示例2: 使用间隔触发器(每 5 分钟执行一次)
    scheduler.add_job(
        func=interval_task,
        trigger=IntervalTrigger(minutes=5),
        id='interval_task',
        name='间隔任务',
        replace_existing=True
    )

    # 示例3: 使用日期触发器(在指定时间执行一次)
    scheduler.add_job(
        func=one_time_task,
        trigger=DateTrigger(run_date=datetime.datetime(2024, 1, 1, 12, 0)),
        id='one_time_task',
        name='一次性任务',
        replace_existing=True
    )

    print("所有任务注册完成")


# 定义任务函数
def daily_task():
    """每日任务"""
    print("执行每日任务")
    # 在这里添加你的业务逻辑


def interval_task():
    """间隔任务"""
    print("执行间隔任务")
    # 在这里添加你的业务逻辑


def one_time_task():
    """一次性任务"""
    print("执行一次性任务")
    # 在这里添加你的业务逻辑

3. 运行调度器

方法1:使用命令行工具(推荐)

apscheduler-server

方法2:使用 Python 模块方式

在项目根目录下执行:

python -m APSchedulerServer.main

方法3:直接运行

from APSchedulerServer.main import main

if __name__ == '__main__':
    main()

JobStore 配置详解

Redis JobStore

from apscheduler.jobstores.redis import RedisJobStore


def set_job_stores(self):
    return {
        'redis': RedisJobStore(
            jobs_key='apscheduler.jobs',  # 存储任务的键名
            run_times_key='apscheduler.run_times',  # 存储运行时间的键名
            host='localhost',  # Redis 主机
            port=6379,  # Redis 端口
            db=0,  # Redis 数据库
            password='your_password'  # Redis 密码(可选)
        )
    }

MySQL JobStore

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore


def set_job_stores(self):
    return {
        'mysql': SQLAlchemyJobStore(
            url='mysql+mysqlconnector://user:password@localhost/scheduler_db',
            tablename='my_custom_jobs_table'  # 自定义表名
        )
    }

mysql数据库配置

CREATE TABLE `apscheduler_jobs`
(
    `id`            varchar(191) NOT NULL,
    `next_run_time` float(20) DEFAULT NULL,
    `job_state`     longblob     NOT NULL,
    PRIMARY KEY (`id`),
    KEY `apscheduler_jobs_next_run_time` ON `apscheduler_jobs` (`next_run_time`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

支持的数据库 URL 格式:

  • MySQL: mysql+mysqlconnector://user:password@host/dbname
  • PostgreSQL: postgresql://user:password@host/dbname
  • SQLite: sqlite:///jobs.sqlite

Django JobStore

from apscheduler.jobstores.django import DjangoJobStore


def set_job_stores(self):
    return {
        'django': DjangoJobStore()
    }

注意:使用 Django JobStore 需要将 apscheduler 添加到 Django 的 INSTALLED_APPS 中。

高级配置

修改执行器配置

编辑 APSchedulerServer/base_config.py 中的 SCHEDULER_EXECUTORS

SCHEDULER_EXECUTORS = {
    "default": {"type": "threadpool", "max_workers": 20},
    "cpu": {"type": "processpool", "max_workers": 8}
}

修改任务默认参数

编辑 APSchedulerServer/base_config.py 中的 SCHEDULER_JOB_DEFAULTS

SCHEDULER_JOB_DEFAULTS = {
    "coalesce": True,  # 合并错过的执行
    "max_instances": 5,  # 最大并发实例数
    "misfire_grace_time": 300  # 错过执行的宽限期(秒)
}

时区配置

编辑 APSchedulerServer/base_config.py 中的 TIME_ZONE

TIME_ZONE = "Asia/Shanghai"

常见问题

1. 找不到模块错误

确保在项目根目录下运行:

python -m APSchedulerServer.main

2. Redis 连接失败

检查 Redis 服务是否启动,以及连接配置是否正确。

3. MySQL 连接失败

确保 MySQL 服务已启动,并且数据库已创建。

4. 任务没有执行

检查任务的触发器配置是否正确,以及调度器是否正常运行。

示例项目

示例 1: 使用 Redis JobStore

# APSchedulerConfig/config.py
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.redis import RedisJobStore


class ApsConfig(BaseConfig):
    def __init__(self):
        print("Redis JobStore 配置加载成功")

    def set_job_stores(self):
        return {
            'default': RedisJobStore(
                host='localhost',
                port=6379,
                db=0
            )
        }

示例 2: 使用 MySQL JobStore

# APSchedulerConfig/config.py
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore


class ApsConfig(BaseConfig):
    def __init__(self):
        print("MySQL JobStore 配置加载成功")

    def set_job_stores(self):
        return {
            'default': SQLAlchemyJobStore(
                url='mysql+mysqlconnector://user:password@localhost/scheduler_db'
            )
        }

示例 3: 使用 Django JobStore

# APSchedulerConfig/config.py
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.django import DjangoJobStore


class ApsConfig(BaseConfig):
    def __init__(self):
        print("Django JobStore 配置加载成功")

    def set_job_stores(self):
        return {
            'default': DjangoJobStore()
        }

示例 4: 使用 SQLAlchemy JobStore

# APSchedulerConfig/config.py
from APSchedulerServer.base_config import BaseConfig
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore


class ApsConfig(BaseConfig):
    def __init__(self):
        print("SQLAlchemy JobStore 配置加载成功")

    def set_job_stores(self):
        return {
            'default': SQLAlchemyJobStore(
                url='postgresql://user:password@localhost/scheduler_db'
            )
        }

最佳实践

  1. 选择合适的 JobStore

    • 开发环境:使用 MemoryJobStore
    • 生产环境:使用 Redis 或 SQLAlchemy JobStore
    • Django 项目:使用 Django JobStore
    • 需要跨平台支持:使用 SQLAlchemy JobStore(支持 PostgreSQL、MySQL、SQLite 等)
  2. 任务设计

    • 任务函数应该是幂等的(可以安全地多次执行)
    • 避免长时间运行的任务
    • 合理设置任务的并发实例数
  3. 监控和日志

    • 添加任务执行日志
    • 监控任务执行状态
    • 设置任务失败告警
  4. 错误处理

    • 在任务函数中添加适当的异常处理
    • 设置合理的重试策略

许可证

本项目采用 MIT 许可证。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

apschedulerserver-0.1.3-py3-none-any.whl (8.4 kB view details)

Uploaded Python 3

File details

Details for the file apschedulerserver-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for apschedulerserver-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 587b21d5d35bf4662f59c5cd0602eb0aa7a8dc22eae7a188e9727aee6a62658d
MD5 5ce38c79a21de1922f97a3c4becfae82
BLAKE2b-256 cafa5e4e68adfe57f9f2a3afef5cb1cbe4638b24be6336ff970d8f9e9970cbe4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page