基于celery的回调服务。业务系统创建celery回调任务,celery-callback-worker执行回调任务,业务系统在回调任务中处理异步任务。
Project description
celery-callback-service
基于celery的回调服务。业务系统创建celery回调任务,celery-callback-worker执行回调任务,业务系统在回调任务中处理异步任务。
使用方法
启动celery-callback-worker
(celery worker)
celeryconfig.py
worker_concurrency = 10
worker_pool = "threads"
broker_url = "redis://redis/0"
result_backend = "redis://redis/1"
accept_content = ["application/json"]
task_serializer = "json"
result_accept_content = ["application/json"]
result_serializer = "json"
timezone = "Asia/Shanghai"
broker_connection_retry_on_startup = True
task_track_started = True
task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = True
# 额外新增的配置项
# 配置后所有任务都使用不同的队列
use_different_queue = True
from hybrid_cipher import HybridCipher
celery_callback_service_cipher_key = """
-----BEGIN RSA PRIVATE KEY-----
xxxxxx 请自行生成rsa证书
xxxxxx from Crypto.PublicKey import RSA
xxxxxx sk = RSA.generate(1024)
xxxxxx print(sk.export_key().decode())
xxxxxx
xxxxxx celery-callback-worker侧使用私钥
xxxxxx celery-callback-service业务侧使用公钥
-----END RSA PRIVATE KEY-----
""".strip()
celery_callback_service_cipher = HybridCipher(celery_callback_service_cipher_key)
start.sh
#!/bin/bash
celery -l celery_callback_service.celery_tasks:app worker -l DEBUG
执行./start.sh
启动celery-callback-worker
。
业务程序中引入celery-callback-service
celeryconfig.py
# 与celery-callback-worker中的celeryconfig.py保持一致
# 这是celery-callback-worker中的私钥需要更新为相应的公钥
pro/settings.py
INSTALLED_APPS = [
...
"django_admin_daterange_listfilter",
"celery_callback_service",
...
]
# 这里用于设置回调接口访问的APIKEYS
# 允许多个,请定期更换
CELERY_CALLBACK_SERVICE_APIKEYS = [
"yEcU2IrtVGslTgw6JmkoTo4Trkplnyg8",
"kBhZB8yKKFmXoAzFHP7HVembYsAeOyBk",
]
# 这里的服务地址必须`celery-callback-worker`能够访问的地址
CELERY_CALLBACK_SERVICE_ADDRESS = "http://127.0.0.1:8000"
初始化celery-callback-service
相关数据表
python manage.py migrate celery_callback_service
app/tasks.py
def task1(arg1, arg2):
pass
task1.execution_lock_timeout = 60 # 设置回调任务的锁定时间。默认60秒。
task1.delay_seconds = 5 # 设置n秒后再回调。默认是5秒后再回调。主要是避免在业务接口中创建的Task还没有commit到数据库,导致回调时Task任务不可见。
注意:task1等回调函数,必须全局可见。
app/services.py
from celery_callback_service.client import start_callback_service
from .tasks import task1
def service_func1(*args, **kwargs):
...
start_callback_service(task1, arg1, arg2...)
...
配置项
业务侧settings.py
额外配置项
- CELERY_CALLBACK_SERVICE_APIKEYS
- 不设置的话,优先继承
DJANGO_APIS_APIKEYS
配置项的值 - 如果
DJANGO_APIS_APIKEYS
也没有设置的话,则自动生成随机授权码,并打印输出。
- 不设置的话,优先继承
- CELERY_CALLBACK_SERVICE_ADDRESS
- 如果不设置,则在启动时告警。不设置的话,是没有办法正常回调的。
业务侧celeryconfig.py
额外配置项
- celery_callback_service_cipher: 无默认值,必须的配置项
- celery_callback_retry_countdown_step: 5
- celery_callback_retry_countdown_max: 300
- celery_callback_max_retries: 2048
回调异常的说明
- 如果回调执行过程中出现程序逻辑异常,则会结束该回调任务,并把错误信息记录在记录表中。
- 如果回调执行过程中出现网络异常、网关异常等HTTP状态码非200的情况,则会将任务纳入重试。
- 重试延迟规则是:5秒(可配置) * 重试次数,最大延迟300秒(可配置)。
版本记录
v0.1.6
- 版本首发。
v0.1.7
- 管理界面添加重新推送、添加删除标记、取消删除标记等指处理动作。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file celery-callback-service-0.1.7.tar.gz
.
File metadata
- Download URL: celery-callback-service-0.1.7.tar.gz
- Upload date:
- Size: 15.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2357532fdecbb0540ff381ca0ba2e83540318387f6f9fccb08cd1f9adffaaeec |
|
MD5 | 8d3d5b2a6dc242b786ea38b8f92fdb4e |
|
BLAKE2b-256 | eb987d45ca4f6a2ae4a790ed5c69253135b1425c901d90f13e2d7d70df079ee4 |
File details
Details for the file celery_callback_service-0.1.7-py3-none-any.whl
.
File metadata
- Download URL: celery_callback_service-0.1.7-py3-none-any.whl
- Upload date:
- Size: 20.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3cd1632d9b12f62af0e9a7a01568fa76f2baa0b5e3ac4399b44affa230f8bbd5 |
|
MD5 | 344b896e8dbccc0ef1163dc7a1d5c3c7 |
|
BLAKE2b-256 | 2a014364938f335597f4fef808f69700f18e7e394385d48775765aeb813eab8d |