Skip to main content

Celery-based timing tasks

Project description

基于celery实现的一个定时器

工作流程

celerytimer隐藏了其中大部分内容,让您专注于手头的任务

celerytimer是基于celery实现的定时器,需要配合celery使用,使用celerytimer访问受保护的资源非常简单

from celery import shared_task
from celerytimer.celerytimer import TimingTasks

obj = TimingTasks("127.0.0.1", 3306, "root", "123456", 'runtest')	# 数据库的相关配置

@obj.executed_task("push_task", "推送测试", "flag", ["account_id"])
@shared_task
def push_data(executed_data, a):
    print("值一:", executed_data)
    print("值三:", a)
    print("测试成功")


executed_data = {1602256560: [{"account_id": 83022257, "channel_id": 55824}, ]}
push_data(executed_data, "a")

功能

  • 可动态设置定时任务:限制在未来一小时内(celery中存在定时任务超过一小时未执行会重复执行)
  • 过滤掉已执行的任务

注意点

  • 该定时器可以处理未来一小时内的任务,超出部分会抛弃,建议:脚本每10钟执行一次
  • celerytimer装饰器必须在@shared_task上方

安装

要安装 celerytimer,你可以使用pip

$ pip install celerytimer

历史

V1.1 (2020年10月09日)

  • 基于celery的定时器

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celerytimer-1.2.tar.gz (6.5 kB view hashes)

Uploaded Source

Built Distribution

celerytimer-1.2-py3-none-any.whl (9.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page