基于celery的回调服务。业务系统创建celery回调任务,celery-callback-worker执行回调任务,业务系统在回调任务中处理异步任务。
Project description
celery-callback-service
基于celery的回调服务。业务系统创建celery回调任务,celery-callback-worker执行回调任务,业务系统在回调任务中处理异步任务。
使用方法
启动celery-callback-worker
(celery worker)
celeryconfig.py
worker_concurrency = 10
worker_pool = "threads"
broker_url = "redis://redis/0"
result_backend = "redis://redis/1"
accept_content = ["application/json"]
task_serializer = "json"
result_accept_content = ["application/json"]
result_serializer = "json"
timezone = "Asia/Shanghai"
broker_connection_retry_on_startup = True
task_track_started = True
task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = True
# 额外新增的配置项
# 配置后所有任务都使用不同的队列
use_different_queue = True
from hybrid_cipher import HybridCipher
celery_callback_service_cipher_key = """
-----BEGIN RSA PRIVATE KEY-----
xxxxxx 请自行生成rsa证书
xxxxxx from Crypto.PublicKey import RSA
xxxxxx sk = RSA.generate(1024)
xxxxxx print(sk.export_key().decode())
xxxxxx
xxxxxx celery-callback-worker侧使用私钥
xxxxxx celery-callback-service业务侧使用公钥
-----END RSA PRIVATE KEY-----
""".strip()
celery_callback_service_cipher = HybridCipher(celery_callback_service_cipher_key)
start.sh
#!/bin/bash
celery -l celery_callback_service.celery_tasks:app worker -l DEBUG
执行./start.sh
启动celery-callback-worker
。
业务程序中引入celery-callback-service
celeryconfig.py
# 与celery-callback-worker中的celeryconfig.py保持一致
# 这是celery-callback-worker中的私钥需要更新为相应的公钥
pro/settings.py
INSTALLED_APPS = [
...
"django_admin_daterange_listfilter",
"celery_callback_service",
...
]
# 这里用于设置回调接口访问的APIKEYS
# 允许多个,请定期更换
CELERY_CALLBACK_SERVICE_APIKEYS = [
"yEcU2IrtVGslTgw6JmkoTo4Trkplnyg8",
"kBhZB8yKKFmXoAzFHP7HVembYsAeOyBk",
]
# 这里的服务地址必须`celery-callback-worker`能够访问的地址
CELERY_CALLBACK_SERVICE_ADDRESS = "http://127.0.0.1:8000"
初始化celery-callback-service
相关数据表
python manage.py migrate celery_callback_service
app/tasks.py
def task1(arg1, arg2):
pass
task1.execution_lock_timeout = 60 # 设置回调任务的锁定时间。默认60秒。
task1.delay_seconds = 5 # 设置n秒后再回调。默认是5秒后再回调。主要是避免在业务接口中创建的Task还没有commit到数据库,导致回调时Task任务不可见。
注意:task1等回调函数,必须全局可见。
app/services.py
from celery_callback_service.client import start_callback_service
from .tasks import task1
def service_func1(*args, **kwargs):
...
start_callback_service(task1, arg1, arg2...)
...
配置项
业务侧settings.py
额外配置项
- CELERY_CALLBACK_SERVICE_APIKEYS
- 不设置的话,优先继承
DJANGO_APIS_APIKEYS
配置项的值 - 如果
DJANGO_APIS_APIKEYS
也没有设置的话,则自动生成随机授权码,并打印输出。
- 不设置的话,优先继承
- CELERY_CALLBACK_SERVICE_ADDRESS
- 如果不设置,则在启动时告警。不设置的话,是没有办法正常回调的。
业务侧celeryconfig.py
额外配置项
- celery_callback_service_cipher: 无默认值,必须的配置项
- celery_callback_retry_countdown_step: 5
- celery_callback_retry_countdown_max: 300
- celery_callback_max_retries: 2048
回调异常的说明
- 如果回调执行过程中出现程序逻辑异常,则会结束该回调任务,并把错误信息记录在记录表中。
- 如果回调执行过程中出现网络异常、网关异常等HTTP状态码非200的情况,则会将任务纳入重试。
- 重试延迟规则是:5秒(可配置) * 重试次数,最大延迟300秒(可配置)。
版本记录
v0.1.6
- 版本首发。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for celery-callback-service-0.1.6.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | adb5d1b8d3cc4eec28d6f144ef57e02f2c518b3468ca52f0dfb0c79beacdf9a6 |
|
MD5 | ed39c141c6197dda73fee8adcee7e2eb |
|
BLAKE2b-256 | 6f3bb8f66b5c25db567730cfbf9b1b47e4910017290ff04f557931fb18afff36 |
Close
Hashes for celery_callback_service-0.1.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 66eed6a76e69497c92d6f9f8345f65d3c33b95370631f13da473f644ad4cec87 |
|
MD5 | e1fb49c3b767364494671ecb2a0a24e9 |
|
BLAKE2b-256 | cf71862f3751f09b47e9339ef660c38497e3d1059b5579ad19d11a843649e63b |