Skip to main content

基于celery的回调服务。业务系统创建celery回调任务,celery-callback-worker执行回调任务,业务系统在回调任务中处理异步任务。

Project description

celery-callback-service

基于celery的回调服务。业务系统创建celery回调任务,celery-callback-worker执行回调任务,业务系统在回调任务中处理异步任务。

使用方法

启动celery-callback-worker(celery worker)

celeryconfig.py

worker_concurrency = 10
worker_pool = "threads"
broker_url = "redis://redis/0"
result_backend = "redis://redis/1"
accept_content = ["application/json"]
task_serializer = "json"
result_accept_content = ["application/json"]
result_serializer = "json"
timezone = "Asia/Shanghai"
broker_connection_retry_on_startup = True
task_track_started = True
task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = True
# 额外新增的配置项
# 配置后所有任务都使用不同的队列
use_different_queue = True

from hybrid_cipher import HybridCipher

celery_callback_service_cipher_key = """
-----BEGIN RSA PRIVATE KEY-----
xxxxxx 请自行生成rsa证书
xxxxxx from Crypto.PublicKey import RSA
xxxxxx sk = RSA.generate(1024)
xxxxxx print(sk.export_key().decode())
xxxxxx 
xxxxxx celery-callback-worker侧使用私钥
xxxxxx celery-callback-service业务侧使用公钥
-----END RSA PRIVATE KEY-----
""".strip()
celery_callback_service_cipher = HybridCipher(celery_callback_service_cipher_key)

start.sh

#!/bin/bash
celery -l celery_callback_service.celery_tasks:app worker -l DEBUG

执行./start.sh启动celery-callback-worker

业务程序中引入celery-callback-service

celeryconfig.py

# 与celery-callback-worker中的celeryconfig.py保持一致
# 这是celery-callback-worker中的私钥需要更新为相应的公钥

pro/settings.py

INSTALLED_APPS = [
    ...
    "django_admin_daterange_listfilter",
    "celery_callback_service",
    ...
]

# 这里用于设置回调接口访问的APIKEYS
# 允许多个,请定期更换
CELERY_CALLBACK_SERVICE_APIKEYS = [
    "yEcU2IrtVGslTgw6JmkoTo4Trkplnyg8",
    "kBhZB8yKKFmXoAzFHP7HVembYsAeOyBk",
]
# 这里的服务地址必须`celery-callback-worker`能够访问的地址
CELERY_CALLBACK_SERVICE_ADDRESS = "http://127.0.0.1:8000"

初始化celery-callback-service相关数据表

python manage.py migrate celery_callback_service

app/tasks.py

def task1(arg1, arg2):
   pass

task1.execution_lock_timeout = 60 # 设置回调任务的锁定时间。默认60秒。
task1.delay_seconds = 5 # 设置n秒后再回调。默认是5秒后再回调。主要是避免在业务接口中创建的Task还没有commit到数据库,导致回调时Task任务不可见。

注意:task1等回调函数,必须全局可见。

app/services.py

from celery_callback_service.client import start_callback_service
from .tasks import task1

def service_func1(*args, **kwargs):
    ...
    start_callback_service(task1, arg1, arg2...)
    ...

配置项

业务侧settings.py额外配置项

  • CELERY_CALLBACK_SERVICE_APIKEYS
    • 不设置的话,优先继承DJANGO_APIS_APIKEYS配置项的值
    • 如果DJANGO_APIS_APIKEYS也没有设置的话,则自动生成随机授权码,并打印输出。
  • CELERY_CALLBACK_SERVICE_ADDRESS
    • 如果不设置,则在启动时告警。不设置的话,是没有办法正常回调的。

业务侧celeryconfig.py额外配置项

  • celery_callback_service_cipher: 无默认值,必须的配置项
  • celery_callback_retry_countdown_step: 5
  • celery_callback_retry_countdown_max: 300
  • celery_callback_max_retries: 2048

回调异常的说明

  • 如果回调执行过程中出现程序逻辑异常,则会结束该回调任务,并把错误信息记录在记录表中。
  • 如果回调执行过程中出现网络异常、网关异常等HTTP状态码非200的情况,则会将任务纳入重试。
    • 重试延迟规则是:5秒(可配置) * 重试次数,最大延迟300秒(可配置)。

版本记录

v0.1.6

  • 版本首发。

v0.1.7

  • 管理界面添加重新推送、添加删除标记、取消删除标记等指处理动作。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-callback-service-0.1.7.tar.gz (15.7 kB view hashes)

Uploaded Source

Built Distribution

celery_callback_service-0.1.7-py3-none-any.whl (20.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page