Skip to main content

Lightweight async task execution framework for Django

Project description

django-simpletask5

本项目由 opencode + deepseek-v4-flash 生成

一个轻量级的 Django 异步任务执行框架,提供声明式的任务模型、信号驱动的自动发布、Worker 进程异步执行,以及内置的 Cron 定时调度。

特性

  • 声明式任务模型 — 继承 Task 模型即可定义任务,自动处理创建/更新/删除事件
  • 信号驱动 — Django 信号自动拦截模型变更,发布 TaskExecution 到消息队列
  • 自定义事件 — 通过 task.trigger('event_name') 触发任意事件
  • 灵活的执行器映射 — 不同事件可绑定不同的执行器类
  • 队列路由 — 不同事件可路由到不同优先级队列
  • 重试与超时 — 失败自动重试(指数退避),支持超时检测
  • 加密字段 — 敏感数据自动加密存储
  • Cron 调度 — 内置 crontab 守护进程,支持代码注册与数据库覆盖
  • 归档统计 — 已完成执行记录自动归档为加密 JSONL,并生成日统计
  • Worker 注册中心 — 基于 Redis 的 Worker 心跳与状态追踪
  • Django Admin 集成 — 完整的后台管理界面

依赖

  • Python >= 3.8
  • Django >= 3.2(兼容至 5.2.x)
  • Kombu >= 5.3.0(消息队列,支持 RabbitMQ/Redis/内存)
  • Redis >= 4.0.0(分布式锁、Worker 注册中心)
  • 详见 pyproject.toml

安装

pip install django-simpletask5

django-simpletask5 使用 django-app-requires 自动管理依赖的 app(如 django_safe_fields),无需手动添加到 INSTALLED_APPS。只需在 settings.py 中调用一次 patch:

from django_app_requires import patch_all as django_app_requires_patch_all
django_app_requires_patch_all()

然后在 INSTALLED_APPS 中添加:

INSTALLED_APPS = [
    ...
    'django_simpletask5',
]

配置分布式锁:

DJANGO_SIMPLETASK_LOCK_CONFIG = {
    'global_lock_engine_class': 'globallock.redis_global_lock.RedisGlobalLock',
    'global_lock_engine_options': {
        'host': '127.0.0.1',
        'port': 6379,
        'db': 0,
    },
}

运行迁移:

python manage.py migrate django_simpletask5

快速开始

1. 定义任务模型

from django.db import models
from django_simpletask5.models import Task

class OrderTask(Task):
    order_id = models.CharField(max_length=64, unique=True)
    customer_name = models.CharField(max_length=128)
    amount = models.DecimalField(max_digits=10, decimal_places=2)
    status = models.CharField(max_length=32, default='pending')

    executor_class = {
        'create': 'myapp.executors.OrderCreateExecutor',
        'update': 'myapp.executors.OrderUpdateExecutor',
        'delete': 'myapp.executors.OrderDeleteExecutor',
    }

    simpletask_queue = {
        'create': 'django_simpletask5.queue.high_priority',
        'update': 'django_simpletask5.queue.default',
        'delete': 'django_simpletask5.queue.default',
    }

    trigger_update_fields = ['customer_name', 'amount', 'status']

2. 编写执行器

# myapp/executors.py
from django_simpletask5.executors.base import BaseExecutor
from django_simpletask5.models import TaskExecution

class OrderCreateExecutor(BaseExecutor):
    def execute(self, execution: TaskExecution) -> str | None:
        context = execution.get_context_dict()
        # 业务逻辑...
        return 'ok'

3. 启动 Worker

默认只监听 default 队列,high_priority 队列需要单独启动 Worker 处理。

# 启动 4 个 Worker 处理 default 队列
python manage.py django_simpletask_executor --workers 4

# 单独启动 Worker 处理 high_priority 队列
python manage.py django_simpletask_executor --queue django_simpletask5.queue.high_priority

# 指定执行器
python manage.py django_simpletask_executor --workers 2 --service myapp.executors.OrderCreateExecutor

4. 启动 Cron 调度

python manage.py django_simpletask_crontab

5. 触发自定义事件

order = OrderTask.objects.get(order_id='ORD-001')
order.trigger('refund', extra_context={'refund_amount': '50.00'})

Cron 任务

在代码中定义 Cron 任务

在任意 app 的 cronjobs.py 文件中使用 register_cronjob 注册:

# myapp/cronjobs.py
from django_simpletask5.cronjob_registry import register_cronjob

register_cronjob(
    name='health_check',
    cron_expression='*/5 * * * *',
    executor_class='django_simpletask5.executors.simple_request.SimpleRequestExecutor',
    context={
        'url': 'https://example.com/health',
        'method': 'GET',
        'timeout': 10,
    },
    description='定期健康检查',
)

从代码同步到数据库

注册的 Cron 任务需要同步到数据库才会生效。框架默认在 django_simpletask_crontab 启动时自动同步(可通过 DJANGO_SIMPLETASK_CRONJOB_AUTO_SYNC = False 关闭),也可以手动执行:

python manage.py django_simpletask_sync_cronjobs

同步后,用户可以在 Django Admin 中查看和修改 Cron 任务,被手动修改过的任务不会在后续同步中被覆盖(is_modified_by_user 标记保护)。

内置执行器

执行器 说明
PingPongExecutor 健康检查,返回 'pong'
BashScriptExecutor 执行 Shell 脚本
PythonScriptExecutor 执行 Python 代码
SimpleRequestExecutor 发起 HTTP 请求
StatusCheckExecutor 检测卡住的执行并标记超时(每 5 分钟)
RetryTimeoutExecutor 重试超时的执行(每 10 分钟)
ArchiveExecutor 归档已完成执行并生成统计(每天凌晨 2 点)

架构

Task 模型变更 → Django 信号 → 创建 TaskExecution 并发布到消息队列
                                                    ↓
Worker 消费消息 → 获取分布式锁 → 加载执行器 → 执行并保存结果
                                                    ↓
失败时自动重试,完成后归档

Releases

0.1.3

  • Admin 界面大升级 — 集成 Tabbed ChangeForm 分页签展示字段,CronJob 后台支持直接执行/启用/禁用操作;TaskExecution 列表优化,execution_id 显示前8字符并支持点击复制,重试次数合并显示,全局字段中文翻译
  • 归档管理增强 — 新增 django_simpletask_archive 管理命令,支持手动触发归档与过期清理;PurgeExpiredExecutor 定期自动清理过期记录;保留天数可配置;采用流式 AES-GCM 加密,大文件归档更高效
  • Dashboard 修复 — 卡片链接使用 reverse 正确跳转,无执行数据时成功率显示 -
  • 超时检测优化 — 模型新增 expire_time 字段,超时判定更准确高效

0.1.2

  • 修复: 修复 Transaction-Message 排序竞态条件,使用 transaction.on_commit 确保消息在事务提交后才发送到队列

0.1.1

  • 修复: 修复 DjangoSimpletask5Config 缺少 ready() 方法导致信号监听未注册的问题

0.1.0

这是 django-simpletask5 的首个正式版本。核心功能包括:

  • 声明式任务模型 — 继承 Task 模型即可定义任务,自动处理创建/更新/删除事件
  • 信号驱动自动发布 — Django 信号自动拦截模型变更,发布 TaskExecution 到消息队列
  • Worker 异步执行 — 多 Worker 进程消费消息队列,支持队列路由和优先级
  • Cron 定时调度 — 内置 crontab 守护进程,支持代码注册与数据库覆盖
  • 重试与超时 — 失败自动重试(指数退避),支持超时检测
  • 加密字段 — 敏感数据自动加密存储
  • 归档统计 — 已完成执行记录自动归档为加密 JSONL,并生成日统计
  • Worker 注册中心 — 基于 Redis 的 Worker 心跳与状态追踪
  • Django Admin 集成 — 完整的后台管理界面与仪表盘

许可证

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_simpletask5-0.1.3.tar.gz (36.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_simpletask5-0.1.3-py3-none-any.whl (45.5 kB view details)

Uploaded Python 3

File details

Details for the file django_simpletask5-0.1.3.tar.gz.

File metadata

  • Download URL: django_simpletask5-0.1.3.tar.gz
  • Upload date:
  • Size: 36.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for django_simpletask5-0.1.3.tar.gz
Algorithm Hash digest
SHA256 3058721bccb35a208ddbbea1314d635d6b592731ec20192a784bcea2a8487928
MD5 8a5391195356f0e1f4016a7bbc7cc059
BLAKE2b-256 c3a2e26c7a51e89419dc7f8fe42480bbb8942d56808d9459ee38d434a8a5984c

See more details on using hashes here.

File details

Details for the file django_simpletask5-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for django_simpletask5-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 c97e36295decfabdbd3c9d7d5a0bb3aeda1883d0c11c1b7f64e01f5e338e1bd0
MD5 976a65a002dab72758488c4f05cb39fa
BLAKE2b-256 db83226ba88973f3a89d6691a7f59aabc267b27a1e0ce572d8a27a0031cb6b1f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page