Skip to main content

基于celery的回调服务。业务系统创建celery回调任务,celery-callback-worker执行回调任务,业务系统在回调任务中处理异步任务。

Project description

celery-callback-service

基于celery的回调服务。业务系统创建celery回调任务,celery-callback-worker执行回调任务,业务系统在回调任务中处理异步任务。

使用方法

启动celery-callback-worker(celery worker)

celeryconfig.py

worker_concurrency = 10
worker_pool = "threads"
broker_url = "redis://redis/0"
result_backend = "redis://redis/1"
accept_content = ["application/json"]
task_serializer = "json"
result_accept_content = ["application/json"]
result_serializer = "json"
timezone = "Asia/Shanghai"
broker_connection_retry_on_startup = True
task_track_started = True
task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = True
# 额外新增的配置项
# 配置后所有任务都使用不同的队列
use_different_queue = True

from hybrid_cipher import HybridCipher

celery_callback_service_cipher_key = """
-----BEGIN RSA PRIVATE KEY-----
xxxxxx 请自行生成rsa证书
xxxxxx from Crypto.PublicKey import RSA
xxxxxx sk = RSA.generate(1024)
xxxxxx print(sk.export_key().decode())
xxxxxx 
xxxxxx celery-callback-worker侧使用私钥
xxxxxx celery-callback-service业务侧使用公钥
-----END RSA PRIVATE KEY-----
""".strip()
celery_callback_service_cipher = HybridCipher(celery_callback_service_cipher_key)

start.sh

#!/bin/bash
celery -l celery_callback_service.celery_tasks:app worker -l DEBUG

执行./start.sh启动celery-callback-worker

业务程序中引入celery-callback-service

celeryconfig.py

# 与celery-callback-worker中的celeryconfig.py保持一致
# 这是celery-callback-worker中的私钥需要更新为相应的公钥

pro/settings.py

INSTALLED_APPS = [
    ...
    "django_admin_daterange_listfilter",
    "celery_callback_service",
    ...
]

# 这里用于设置回调接口访问的APIKEYS
# 允许多个,请定期更换
CELERY_CALLBACK_SERVICE_APIKEYS = [
    "yEcU2IrtVGslTgw6JmkoTo4Trkplnyg8",
    "kBhZB8yKKFmXoAzFHP7HVembYsAeOyBk",
]
# 这里的服务地址必须`celery-callback-worker`能够访问的地址
CELERY_CALLBACK_SERVICE_ADDRESS = "http://127.0.0.1:8000"

初始化celery-callback-service相关数据表

python manage.py migrate celery_callback_service

app/tasks.py

def task1(arg1, arg2):
   pass

task1.execution_lock_timeout = 60 # 设置回调任务的锁定时间。默认60秒。
task1.delay_seconds = 5 # 设置n秒后再回调。默认是5秒后再回调。主要是避免在业务接口中创建的Task还没有commit到数据库,导致回调时Task任务不可见。

注意:task1等回调函数,必须全局可见。

app/services.py

from celery_callback_service.client import start_callback_service
from .tasks import task1

def service_func1(*args, **kwargs):
    ...
    start_callback_service(task1, arg1, arg2...)
    ...

配置项

业务侧settings.py额外配置项

  • CELERY_CALLBACK_SERVICE_APIKEYS
    • 不设置的话,优先继承DJANGO_APIS_APIKEYS配置项的值
    • 如果DJANGO_APIS_APIKEYS也没有设置的话,则自动生成随机授权码,并打印输出。
  • CELERY_CALLBACK_SERVICE_ADDRESS
    • 如果不设置,则在启动时告警。不设置的话,是没有办法正常回调的。

业务侧celeryconfig.py额外配置项

  • celery_callback_service_cipher: 无默认值,必须的配置项
  • celery_callback_retry_countdown_step: 5
  • celery_callback_retry_countdown_max: 300
  • celery_callback_max_retries: 2048

回调异常的说明

  • 如果回调执行过程中出现程序逻辑异常,则会结束该回调任务,并把错误信息记录在记录表中。
  • 如果回调执行过程中出现网络异常、网关异常等HTTP状态码非200的情况,则会将任务纳入重试。
    • 重试延迟规则是:5秒(可配置) * 重试次数,最大延迟300秒(可配置)。

版本记录

v0.1.6

  • 版本首发。

v0.1.7

  • 管理界面添加重新推送、添加删除标记、取消删除标记等指处理动作。

v0.1.8

  • django-apis兼容性改造。

v0.1.9

  • Doc update.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_callback_service-0.1.9.tar.gz (16.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_callback_service-0.1.9-py3-none-any.whl (20.7 kB view details)

Uploaded Python 3

File details

Details for the file celery_callback_service-0.1.9.tar.gz.

File metadata

  • Download URL: celery_callback_service-0.1.9.tar.gz
  • Upload date:
  • Size: 16.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.12

File hashes

Hashes for celery_callback_service-0.1.9.tar.gz
Algorithm Hash digest
SHA256 b66bfd8634de3c17b59be2d6cee728dfe0c4743c494548bbd202d6c433e87039
MD5 866a239928b27dfe37ebb340fcaa78de
BLAKE2b-256 795f64e39c8e4fbeed83a6f22cfb2e3a6f747c1ea041281220338085f74351a1

See more details on using hashes here.

File details

Details for the file celery_callback_service-0.1.9-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_callback_service-0.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 d1b38f2405f341d164d2ac406d4cf843e95e65742f5d55cfef6a548d6facdc02
MD5 84e396b734f27471f5beceb22c61309f
BLAKE2b-256 d748e2ee4ccf5f2541e31d7c851977b1882f1f92255292563c7c162570742709

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page