Skip to main content

Lightweight async task execution framework for Django

Project description

django-simpletask5

本项目由 opencode + deepseek-v4-flash 生成

一个轻量级的 Django 异步任务执行框架,提供声明式的任务模型、信号驱动的自动发布、Worker 进程异步执行,以及内置的 Cron 定时调度。

特性

  • 声明式任务模型 — 继承 Task 模型即可定义任务,自动处理创建/更新/删除事件
  • 信号驱动 — Django 信号自动拦截模型变更,发布 TaskExecution 到消息队列
  • 自定义事件 — 通过 task.trigger('event_name') 触发任意事件
  • 灵活的执行器映射 — 不同事件可绑定不同的执行器类
  • 队列路由 — 不同事件可路由到不同优先级队列
  • 重试与超时 — 失败自动重试(指数退避),支持超时检测
  • 加密字段 — 敏感数据自动加密存储
  • Cron 调度 — 内置 crontab 守护进程,支持代码注册与数据库覆盖
  • 归档统计 — 已完成执行记录自动归档为加密 JSONL,并生成日统计
  • Worker 注册中心 — 基于 Redis 的 Worker 心跳与状态追踪
  • Django Admin 集成 — 完整的后台管理界面

依赖

  • Python >= 3.8
  • Django >= 3.2(兼容至 5.2.x)
  • Kombu >= 5.3.0(消息队列,支持 RabbitMQ/Redis/内存)
  • Redis >= 4.0.0(分布式锁、Worker 注册中心)
  • 详见 pyproject.toml

安装

pip install django-simpletask5

django-simpletask5 使用 django-app-requires 自动管理依赖的 app(如 django_safe_fields),无需手动添加到 INSTALLED_APPS。只需在 settings.py 中调用一次 patch:

from django_app_requires import patch_all as django_app_requires_patch_all
django_app_requires_patch_all()

然后在 INSTALLED_APPS 中添加:

INSTALLED_APPS = [
    ...
    'django_simpletask5',
]

配置分布式锁:

DJANGO_SIMPLETASK_LOCK_CONFIG = {
    'global_lock_engine_class': 'globallock.redis_global_lock.RedisGlobalLock',
    'global_lock_engine_options': {
        'host': '127.0.0.1',
        'port': 6379,
        'db': 0,
    },
}

运行迁移:

python manage.py migrate django_simpletask5

快速开始

1. 定义任务模型

from django.db import models
from django_simpletask5.models import Task

class OrderTask(Task):
    order_id = models.CharField(max_length=64, unique=True)
    customer_name = models.CharField(max_length=128)
    amount = models.DecimalField(max_digits=10, decimal_places=2)
    status = models.CharField(max_length=32, default='pending')

    executor_class = {
        'create': 'myapp.executors.OrderCreateExecutor',
        'update': 'myapp.executors.OrderUpdateExecutor',
        'delete': 'myapp.executors.OrderDeleteExecutor',
    }

    simpletask_queue = {
        'create': 'django_simpletask5.queue.high_priority',
        'update': 'django_simpletask5.queue.default',
        'delete': 'django_simpletask5.queue.default',
    }

    trigger_update_fields = ['customer_name', 'amount', 'status']

2. 编写执行器

# myapp/executors.py
from django_simpletask5.executors.base import BaseExecutor
from django_simpletask5.models import TaskExecution

class OrderCreateExecutor(BaseExecutor):
    def execute(self, execution: TaskExecution) -> str | None:
        context = execution.get_context_dict()
        # 业务逻辑...
        return 'ok'

3. 启动 Worker

默认只监听 default 队列,high_priority 队列需要单独启动 Worker 处理。

# 启动 4 个 Worker 处理 default 队列
python manage.py django_simpletask_executor --workers 4

# 单独启动 Worker 处理 high_priority 队列
python manage.py django_simpletask_executor --queue django_simpletask5.queue.high_priority

# 指定执行器
python manage.py django_simpletask_executor --workers 2 --service myapp.executors.OrderCreateExecutor

4. 启动 Cron 调度

python manage.py django_simpletask_crontab

5. 触发自定义事件

order = OrderTask.objects.get(order_id='ORD-001')
order.trigger('refund', extra_context={'refund_amount': '50.00'})

Cron 任务

在代码中定义 Cron 任务

在任意 app 的 cronjobs.py 文件中使用 register_cronjob 注册:

# myapp/cronjobs.py
from django_simpletask5.cronjob_registry import register_cronjob

register_cronjob(
    name='health_check',
    cron_expression='*/5 * * * *',
    executor_class='django_simpletask5.executors.simple_request.SimpleRequestExecutor',
    context={
        'url': 'https://example.com/health',
        'method': 'GET',
        'timeout': 10,
    },
    description='定期健康检查',
)

从代码同步到数据库

注册的 Cron 任务需要同步到数据库才会生效。框架默认在 django_simpletask_crontab 启动时自动同步(可通过 DJANGO_SIMPLETASK_CRONJOB_AUTO_SYNC = False 关闭),也可以手动执行:

python manage.py django_simpletask_sync_cronjobs

同步后,用户可以在 Django Admin 中查看和修改 Cron 任务,被手动修改过的任务不会在后续同步中被覆盖(is_modified_by_user 标记保护)。

内置执行器

执行器 说明
PingPongExecutor 健康检查,返回 'pong'
BashScriptExecutor 执行 Shell 脚本
PythonScriptExecutor 执行 Python 代码
SimpleRequestExecutor 发起 HTTP 请求
StatusCheckExecutor 检测卡住的执行并标记超时(每 5 分钟)
RetryTimeoutExecutor 重试超时的执行(每 10 分钟)
ArchiveExecutor 归档已完成执行并生成统计(每天凌晨 2 点)

架构

Task 模型变更 → Django 信号 → 创建 TaskExecution 并发布到消息队列
                                                    ↓
Worker 消费消息 → 获取分布式锁 → 加载执行器 → 执行并保存结果
                                                    ↓
失败时自动重试,完成后归档

Releases

0.1.0

这是 django-simpletask5 的首个正式版本。核心功能包括:

  • 声明式任务模型 — 继承 Task 模型即可定义任务,自动处理创建/更新/删除事件
  • 信号驱动自动发布 — Django 信号自动拦截模型变更,发布 TaskExecution 到消息队列
  • Worker 异步执行 — 多 Worker 进程消费消息队列,支持队列路由和优先级
  • Cron 定时调度 — 内置 crontab 守护进程,支持代码注册与数据库覆盖
  • 重试与超时 — 失败自动重试(指数退避),支持超时检测
  • 加密字段 — 敏感数据自动加密存储
  • 归档统计 — 已完成执行记录自动归档为加密 JSONL,并生成日统计
  • Worker 注册中心 — 基于 Redis 的 Worker 心跳与状态追踪
  • Django Admin 集成 — 完整的后台管理界面与仪表盘

0.1.1

  • 修复: 修复 DjangoSimpletask5Config 缺少 ready() 方法导致信号监听未注册的问题

许可证

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_simpletask5-0.1.1.tar.gz (30.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_simpletask5-0.1.1-py3-none-any.whl (37.8 kB view details)

Uploaded Python 3

File details

Details for the file django_simpletask5-0.1.1.tar.gz.

File metadata

  • Download URL: django_simpletask5-0.1.1.tar.gz
  • Upload date:
  • Size: 30.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for django_simpletask5-0.1.1.tar.gz
Algorithm Hash digest
SHA256 1ed0e0eedf074b5a4c37fdff1832e8ae97c2b26eee28bbef02bf1e569ed67980
MD5 ca6bffd5d1eb60054a23535ccd125ad8
BLAKE2b-256 5aa44365a22825c321fc6ac15a9c85fcb0573d54b64aef07376d6f104fd03f2d

See more details on using hashes here.

File details

Details for the file django_simpletask5-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for django_simpletask5-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2c8ec81469a0ef54df8831cfcb315d86ede48e3dee397c925d2eadca6486fec1
MD5 ef39b8413f3feb98449b11cd073ea279
BLAKE2b-256 291bd9a81e2019b461d8915a46fc9f30973a7a11802034a596b5b5626cf0f1c7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page