Using for Debug, finding problem
Project description
1、介绍
本项目是对通用文件导出任务的抽象组件提取,通过使用本项目,可以简化通用文件导相关业务的开发。
2、使用方式
数据库表:
CREATE TABLE `download_task_main` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`org_id` bigint unsigned NOT NULL,
`task_name` varchar(64) DEFAULT NULL COMMENT '任务名称',
`platform` int NOT NULL COMMENT ' 生成平台 , 1 :cfg , 2:cti , 3 : open , 4: crm ',
`type` int NOT NULL COMMENT '导出类型 1: 导出联系人 , 2 :公司对账单导出 3 : 导出 计划item 4 :导出 通话记录统计分析 5 : 录音下载 6 :信修下载 7 .通话记录加录音下载',
`sub_type` int NOT NULL COMMENT '导出子类型 导出类型下 分出来的子类型 ',
`description` varchar(255) DEFAULT NULL COMMENT '描述',
`status` int DEFAULT '0' COMMENT ' 0 :任务创建中 , 1 : 文件上传中 ,2 :文件上传完成 ,-1 :文件上传失败 3 : 任务失效 4:取消任务',
`top` int DEFAULT '0' COMMENT '是否置顶 0:否 1:是',
`data` varchar(1024) DEFAULT NULL COMMENT '序列化字段',
`notify` tinyint DEFAULT '0' COMMENT '是否通知 0:否 1:是',
`remark` varchar(256) DEFAULT NULL COMMENT '备注',
`created_by` bigint DEFAULT '0' COMMENT '创建人 , agent org id ',
`created_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
`last_updated_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '最后更新时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `index_created_time` (`created_time`),
KEY `index_org_id_created_time` (`org_id`,`created_time`),
KEY `index_type` (`type`),
KEY `index_type_org_id` (`type`,`org_id`),
KEY `index_org_id_platform` (`org_id`,`platform`),
KEY `index_task_name_org_id` (`task_name`,`org_id`),
KEY `index_sub_type_status` (`sub_type`,`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='下载任务表';
CREATE TABLE `download_item` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键 ',
`task_id` bigint NOT NULL COMMENT '主任务id',
`org_id` bigint unsigned NOT NULL COMMENT ' 公司ID ',
`type` int DEFAULT NULL COMMENT '下载类型。一般跟 task的 子类型相同 ',
`file_name` varchar(256) DEFAULT NULL COMMENT '文件名称',
`target_id` bigint DEFAULT NULL COMMENT '关联id',
`expire_time` timestamp NULL DEFAULT NULL COMMENT '过期时间',
`download_times` int DEFAULT NULL COMMENT '下载次数',
`url` varchar(1024) DEFAULT NULL COMMENT '文件下载地址',
`status` int DEFAULT '0' COMMENT '0 :未开始 , 1: 进行中 ,2:已完成 ,3 :失效(针对下载次数超限) -1 :任务失败 4:取消任务',
`data` varchar(1024) DEFAULT NULL COMMENT '序列化字段',
`remark` varchar(256) DEFAULT NULL COMMENT '备注',
`md5_value` varchar(45) DEFAULT NULL COMMENT '文件MD5',
`created_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
`last_updated_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '最后更新时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `index_created_time` (`created_time`),
KEY `index_task_id` (`task_id`),
KEY `index_task_id_org_id` (`task_id`,`org_id`),
KEY `index_org_id_target_id` (`org_id`,`target_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='下载任务明细';
示例:
1、配置文件
[download_task]
dbHost=localhost
dbPort=3306
dbUser=root
dbPassword=12345678
dbName=cti
redisHost=localhost
redisPort=6379
redisDb=0
2、任务分发调度及状态管理
from duyan_download_task import DownloadTaskScheduler
def test_scheduler():
scheduler = DownloadTaskScheduler('config.ini', 'download_task_scheduler')
scheduler.start()
if __name__ == '__main__':
test_scheduler()
3、自定义导出业务逻辑-单个子任务
import json
from duyan_download_task import DownloadTaskBase, SubTypeInfo
from duyan_download_task.model import DownloadTaskMain, DownloadItem
class DemoTask(DownloadTaskBase):
def __init__(self, task_info: SubTypeInfo, config_path: str, logger_name: str):
super().__init__(task_info, config_path, logger_name)
def get_validated_data_params(self, task_item_id: int, data: str) -> dict or None:
self._logger.info(f"校验数据:[ID:{task_item_id}],data:{data}")
return json.loads(data)
def export(self, task_item: DownloadItem, data_obj: dict) -> tuple:
self._logger.info(f"校验导出:[item:{data_obj}],data:{data_obj}")
return None, None, None
def split_task(self, task: DownloadTaskMain) -> list:
pass
def task_test():
info = SubTypeInfo(task_name='DemoTask', type=10, sub_type=15, queue_key='demo_task_queue', is_multi_task=False)
task = DemoTask(info, 'config.ini', 'demo_task')
task.start()
if __name__ == '__main__':
task_test()
4、自定义导出业务逻辑-多个子任务
import json
import datetime
from duyan_download_task import DownloadTaskBase, SubTypeInfo
from duyan_download_task.model import DownloadTaskMain, DownloadItem
from duyan_download_task.constants import ItemStatus
class DemoMultiItemTask(DownloadTaskBase):
task_info = SubTypeInfo(task_name='DemoMultiItemTask', type=11, sub_type=11, queue_key='demo_multi_item_task_queue',
is_multi_task=True)
def __init__(self, config_path: str, logger_name: str):
super().__init__(self.task_info, config_path, logger_name)
def get_validated_data_params(self, task_item_id: int, data: str) -> dict or None:
self._logger.info(f"校验数据:[ID:{task_item_id}],data:{data}")
return json.loads(data)
def export(self, task_item: DownloadItem, data_obj: dict) -> tuple:
self._logger.info(f"校验导出:[item:{data_obj}],data:{data_obj}")
url = 'http://duyan-record-download.oss-cn-hangzhou.aliyuncs.com/100757218773722020-04-27.zip?OSSAccessKeyId=LTAI4GFAeToJVw9E8eoLK8Wu&Expires=1968908476&Signature=KugYlSKBX6d6ExiDWcJKk4KoA1Y%3D'
md5 = 'f9d991b4a0522dfa77da67d68c675261'
file_name = '100757218773722020-04-27.zip'
return url, md5, file_name
def split_task(self, task: DownloadTaskMain) -> list:
items = []
now = datetime.datetime.now()
for i in range(2):
item = DownloadItem()
item.org_id = task.org_id
item.task_id = task.id
item.type = self.task_info.sub_type
item.data = task.data
item.status = ItemStatus.READY.value
item.created_time = now
item.last_updated_time = now
items.append(item)
return items
def multi_item_task_test():
task = DemoMultiItemTask('config.ini', 'demo_task')
task.start()
if __name__ == '__main__':
multi_item_task_test()
任务参数
class DownloadTaskBase:
def __init__(self, task_info: SubTypeInfo, config_path: str, logger_name: str, max_job=1,
cron="0 0-59/2 * * * *"):
"""
init task
:param task_info:
SubTypeInfo 任务级别信息包括 名称、类型、子任务类型、任务队列、是否是多任务定义
task_name 名称 type 类型 sub_type 子任务类型 queue_key 任务队列 is_multi_task 是否是多任务定义
任务队列维护在redis中
:param config_path:
配置文件所在路径 如 ./config.ini
:param logger_name:
日志文件名称, 日志文件默认在./log目录下
:param max_job:
最大支持并发数量
:param cron:
任务执行周期cron表达式
秒 分 时 日 月 年
* 表示所有值
*/a 表示每隔a时间
a-b 表示a-b之间任何一个时间
a-b/c 表示a-b之间每隔c时间
取值范围:
秒 0-59
分 0-59
时 0-23
日 1-31
月 1-12
年 四位数字
示例:
0 */2 * * * * 每2分钟执行一次
*/1 * * * * * 每2秒执行一次
"""
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
duyan-download-task-0.0.8.tar.gz
(19.6 kB
view hashes)
Built Distribution
Close
Hashes for duyan-download-task-0.0.8.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 751aa171caa47274ba1623af12437d0bb4cd39c9dbfc3fc9f74dd995d2b5cb84 |
|
MD5 | 1af78a30c261bd75cd4e7b54a519cdff |
|
BLAKE2b-256 | 08d0364cbe07cf633105a0543f802d6992f3c5f7a6c81b475f529edaa5d87d53 |
Close
Hashes for duyan_download_task-0.0.8-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2a8142cbfe9920d5429cb78278efd445a7d7cfcf9a86e32c75af841c7a48434e |
|
MD5 | 57c3c4300afdd3bc1e0933145e7cdec2 |
|
BLAKE2b-256 | b2ba108470a15c1b0c840e9b5131897059b7db9c70406bf697eb50f76b295d6c |