Skip to main content

动态的带结果保存的定时任务管理器

Project description

动态的带结果保存的定时任务管理器-DynamicScheduler

安装

pip install dynamic-scheduler

介绍

主要是结合 apscheduler 完成定时任务动态修改定时任务功能、增加定时任务结果缓存功能

test

  1. 运行下方demo
  2. 重新config类,指定数据库schema
  3. 尝试修改数据库表 cron中cron表达式
  4. 验证定时任务是否修改
  5. 查看apscheduler_jobs表任务结果

demo

from DynamicScheduler.dynamic_scheduler import DynamicSchedulerProxy
from DynamicScheduler.scheduler_utils import SchedulerConfig

import datetime, os, threading
def task():
    # print(datetime.datetime.now())
    # time.sleep(10)
    print('start time: ', datetime.datetime.now(), os.getpid(), threading.current_thread().name)
    return datetime.datetime.now()

class Config(SchedulerConfig):
    DEFAULT_DB_URL = 'postgresql+psycopg2://postgres:123456@{your_ip}:5432/postgres?utf-8'
    SCHEDULER_SCHEMA = 'public'

s = DynamicSchedulerProxy(config_class=Config)

s.add_job_with_default(func=task, cron_str='* * */5', id='task')
s.clear_history()  # 先清除缓存表
s.add_result_listener()
s.start()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

dynamic_scheduler-1.0.0-py3-none-any.whl (7.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page