Lightweight async task execution framework for Django
Project description
django-simpletask5
本项目由 opencode + deepseek-v4-flash 生成
一个轻量级的 Django 异步任务执行框架,提供声明式的任务模型、信号驱动的自动发布、Worker 进程异步执行,以及内置的 Cron 定时调度。
特性
- 声明式任务模型 — 继承
Task模型即可定义任务,自动处理创建/更新/删除事件 - 信号驱动 — Django 信号自动拦截模型变更,发布
TaskExecution到消息队列 - 自定义事件 — 通过
task.trigger('event_name')触发任意事件 - 灵活的执行器映射 — 不同事件可绑定不同的执行器类
- 队列路由 — 不同事件可路由到不同优先级队列
- 重试与超时 — 失败自动重试(指数退避),支持超时检测
- 加密字段 — 敏感数据自动加密存储
- Cron 调度 — 内置 crontab 守护进程,支持代码注册与数据库覆盖
- 归档统计 — 已完成执行记录自动归档为加密 JSONL,并生成日统计
- Worker 注册中心 — 基于 Redis 的 Worker 心跳与状态追踪
- Django Admin 集成 — 完整的后台管理界面
依赖
- Python >= 3.8
- Django >= 3.2(兼容至 5.2.x)
- Kombu >= 5.3.0(消息队列,支持 RabbitMQ/Redis/内存)
- Redis >= 4.0.0(分布式锁、Worker 注册中心)
- 详见
pyproject.toml
安装
pip install django-simpletask5
django-simpletask5 使用 django-app-requires 自动管理依赖的 app(如 django_safe_fields),无需手动添加到 INSTALLED_APPS。只需在 settings.py 中调用一次 patch:
from django_app_requires import patch_all as django_app_requires_patch_all
django_app_requires_patch_all()
然后在 INSTALLED_APPS 中添加:
INSTALLED_APPS = [
...
'django_simpletask5',
]
配置分布式锁:
DJANGO_SIMPLETASK_LOCK_CONFIG = {
'global_lock_engine_class': 'globallock.redis_global_lock.RedisGlobalLock',
'global_lock_engine_options': {
'host': '127.0.0.1',
'port': 6379,
'db': 0,
},
}
运行迁移:
python manage.py migrate django_simpletask5
快速开始
1. 定义任务模型
from django.db import models
from django_simpletask5.models import Task
class OrderTask(Task):
order_id = models.CharField(max_length=64, unique=True)
customer_name = models.CharField(max_length=128)
amount = models.DecimalField(max_digits=10, decimal_places=2)
status = models.CharField(max_length=32, default='pending')
executor_class = {
'create': 'myapp.executors.OrderCreateExecutor',
'update': 'myapp.executors.OrderUpdateExecutor',
'delete': 'myapp.executors.OrderDeleteExecutor',
}
simpletask_queue = {
'create': 'django_simpletask5.queue.high_priority',
'update': 'django_simpletask5.queue.default',
'delete': 'django_simpletask5.queue.default',
}
trigger_update_fields = ['customer_name', 'amount', 'status']
2. 编写执行器
# myapp/executors.py
from django_simpletask5.executors.base import BaseExecutor
from django_simpletask5.models import TaskExecution
class OrderCreateExecutor(BaseExecutor):
def execute(self, execution: TaskExecution) -> str | None:
context = execution.get_context_dict()
# 业务逻辑...
return 'ok'
3. 启动 Worker
默认只监听 default 队列,high_priority 队列需要单独启动 Worker 处理。
# 启动 4 个 Worker 处理 default 队列
python manage.py django_simpletask_executor --workers 4
# 单独启动 Worker 处理 high_priority 队列
python manage.py django_simpletask_executor --queue django_simpletask5.queue.high_priority
# 指定执行器
python manage.py django_simpletask_executor --workers 2 --service myapp.executors.OrderCreateExecutor
4. 启动 Cron 调度
python manage.py django_simpletask_crontab
5. 触发自定义事件
order = OrderTask.objects.get(order_id='ORD-001')
order.trigger('refund', extra_context={'refund_amount': '50.00'})
Cron 任务
在代码中定义 Cron 任务
在任意 app 的 cronjobs.py 文件中使用 register_cronjob 注册:
# myapp/cronjobs.py
from django_simpletask5.cronjob_registry import register_cronjob
register_cronjob(
name='health_check',
cron_expression='*/5 * * * *',
executor_class='django_simpletask5.executors.simple_request.SimpleRequestExecutor',
context={
'url': 'https://example.com/health',
'method': 'GET',
'timeout': 10,
},
description='定期健康检查',
)
从代码同步到数据库
注册的 Cron 任务需要同步到数据库才会生效。框架默认在 django_simpletask_crontab 启动时自动同步(可通过 DJANGO_SIMPLETASK_CRONJOB_AUTO_SYNC = False 关闭),也可以手动执行:
python manage.py django_simpletask_sync_cronjobs
同步后,用户可以在 Django Admin 中查看和修改 Cron 任务,被手动修改过的任务不会在后续同步中被覆盖(is_modified_by_user 标记保护)。
内置执行器
| 执行器 | 说明 |
|---|---|
PingPongExecutor |
健康检查,返回 'pong' |
BashScriptExecutor |
执行 Shell 脚本 |
PythonScriptExecutor |
执行 Python 代码 |
SimpleRequestExecutor |
发起 HTTP 请求 |
StatusCheckExecutor |
检测卡住的执行并标记超时(每 5 分钟) |
RetryTimeoutExecutor |
重试超时的执行(每 10 分钟) |
ArchiveExecutor |
归档已完成执行并生成统计(每天凌晨 2 点) |
架构
Task 模型变更 → Django 信号 → 创建 TaskExecution 并发布到消息队列
↓
Worker 消费消息 → 获取分布式锁 → 加载执行器 → 执行并保存结果
↓
失败时自动重试,完成后归档
Releases
0.1.0
这是 django-simpletask5 的首个正式版本。核心功能包括:
- 声明式任务模型 — 继承
Task模型即可定义任务,自动处理创建/更新/删除事件 - 信号驱动自动发布 — Django 信号自动拦截模型变更,发布
TaskExecution到消息队列 - Worker 异步执行 — 多 Worker 进程消费消息队列,支持队列路由和优先级
- Cron 定时调度 — 内置 crontab 守护进程,支持代码注册与数据库覆盖
- 重试与超时 — 失败自动重试(指数退避),支持超时检测
- 加密字段 — 敏感数据自动加密存储
- 归档统计 — 已完成执行记录自动归档为加密 JSONL,并生成日统计
- Worker 注册中心 — 基于 Redis 的 Worker 心跳与状态追踪
- Django Admin 集成 — 完整的后台管理界面与仪表盘
0.1.1
- 修复: 修复
DjangoSimpletask5Config缺少ready()方法导致信号监听未注册的问题
许可证
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_simpletask5-0.1.1.tar.gz.
File metadata
- Download URL: django_simpletask5-0.1.1.tar.gz
- Upload date:
- Size: 30.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1ed0e0eedf074b5a4c37fdff1832e8ae97c2b26eee28bbef02bf1e569ed67980
|
|
| MD5 |
ca6bffd5d1eb60054a23535ccd125ad8
|
|
| BLAKE2b-256 |
5aa44365a22825c321fc6ac15a9c85fcb0573d54b64aef07376d6f104fd03f2d
|
File details
Details for the file django_simpletask5-0.1.1-py3-none-any.whl.
File metadata
- Download URL: django_simpletask5-0.1.1-py3-none-any.whl
- Upload date:
- Size: 37.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2c8ec81469a0ef54df8831cfcb315d86ede48e3dee397c925d2eadca6486fec1
|
|
| MD5 |
ef39b8413f3feb98449b11cd073ea279
|
|
| BLAKE2b-256 |
291bd9a81e2019b461d8915a46fc9f30973a7a11802034a596b5b5626cf0f1c7
|