Celery-based timing tasks
Project description
基于celery实现的一个定时器
工作流程
celerytimer隐藏了其中大部分内容,让您专注于手头的任务
celerytimer是基于celery实现的定时器,需要配合celery使用,使用celerytimer访问受保护的资源非常简单
from celery import shared_task
from celerytimer.celerytimer import TimingTasks
obj = TimingTasks("127.0.0.1", 3306, "root", "123456", 'runtest') # 数据库的相关配置
@obj.executed_task("push_task", "推送测试", "flag", ["account_id"])
@shared_task
def push_data(executed_data, a):
print("值一:", executed_data)
print("值三:", a)
print("测试成功")
executed_data = {1602256560: [{"account_id": 83022257, "channel_id": 55824}, ]}
push_data(executed_data, "a")
功能
- 可动态设置定时任务:限制在未来一小时内(celery中存在定时任务超过一小时未执行会重复执行)
- 过滤掉已执行的任务
注意点
- 该定时器可以处理
未来一小时内
的任务,超出部分会抛弃,建议:脚本每10钟执行一次 - celerytimer装饰器必须在@shared_task上方
安装
要安装 celerytimer,你可以使用pip
$ pip install celerytimer
历史
V1.1 (2020年10月09日)
- 基于celery的定时器
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
celerytimer-1.2.tar.gz
(6.5 kB
view hashes)
Built Distribution
Close
Hashes for celerytimer-1.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | debbd95e27df9187c2b69667d9c81197ea6d2ec3a754a8f287066c090a02b1da |
|
MD5 | a6b13c442e18a2ccf1a3373ae74b2cdd |
|
BLAKE2b-256 | 9773a30990f63c62e31a2a7afa92e50e37e26443d215d4f6c791f3bd18d32309 |