Skip to main content

NeoTask - 轻量级 Python 异步任务队列,支持即时/延时/周期任务,内置优先级队列、自动重试 | Lightweight Python Async Task Queue Manager

Project description

分布式任务调度系统(NeoTask)

轻量级 Python 异步任务队列管理器,无需额外服务,开箱即用。

NeoTask 是一个纯 Python 实现的异步任务队列调度系统,专为耗时任务(AI 生成、视频处理、数据爬取等)设计,支持定时任务、周期任务、延迟任务。无需部署 Redis、PostgreSQL 等外部服务,安装后即可在任意 Python 项目中直接使用。

中文 | English | 文档 | PyPI | 官网

License Python 3.8+ PyPI Downloads GitHub stars


特性

  • 零依赖部署 - 纯 Python 实现,无需 Redis/PostgreSQL
  • 即时任务 - 支持优先级调度,高优先级优先执行
  • 定时任务 - 支持延时执行、固定间隔、Cron 表达式
  • 异步并发 - 基于 asyncio,多 Worker 并发处理
  • 自动重试 - 失败任务自动重试,可配置次数
  • 持久化 - 内存/SQLite/Redis 多种存储后端
  • 任务编排 - DAG工作流引擎,支持复杂任务依赖管理和条件分支
  • 事件回调 - 支持任务生命周期事件监听

应用场景

场景 说明 推荐配置 使用入口
AI 文生图/视频生成 耗时任务排队,避免阻塞主流程 worker_concurrency=3 TaskPool
批量文件处理 转码、压缩、上传等批量操作 worker_concurrency=10 TaskPool
网页爬虫调度 分布式爬取,防止被封 storage_type="redis" TaskPool
定时报表发送 每天9点发送日报 cron="0 9 * * *" TaskScheduler
延迟通知 用户操作后5分钟发送提醒 delay_seconds=300 TaskScheduler
心跳检测 每30秒检测服务健康状态 interval_seconds=30 TaskScheduler
后台数据分析 夜间执行数据聚合任务 cron="0 2 * * *" TaskScheduler
数据处理流水线 ETL任务依赖编排 DAG工作流 WorkflowEngine
条件分支处理 根据结果执行不同分支 condition 表达式 WorkflowEngine

架构&演进

graph TB
    subgraph User["用户应用层"]
        APP[用户代码]
    end
    
    subgraph NeoTask["NeoTask 核心"]
        TP[TaskPool<br/>即时任务入口 v0.1]
        TS[TaskScheduler<br/>定时任务入口 v0.3]
        WF[WorkflowEngine<br/>工作流编排入口 v1.5]
        
        subgraph Core["共享核心组件"]
            LM[LifecycleManager<br/>任务生命周期管理]
            QS[QueueScheduler<br/>优先级+延迟队列]
            WP[WorkerPool<br/>Worker池/并发控制]
            FM[FutureManager<br/>异步等待/结果回调]
        end
        
        subgraph Internal["内部组件"]
            EB[EventBus<br/>事件总线 v0.2]
            MC[MetricsCollector<br/>指标收集 v0.2]
            LF[LockFactory<br/>分布式锁 v0.4]
        end
        
        EX[TaskExecutor<br/>用户业务逻辑]
    end
    
    subgraph Storage["存储层"]
        MEM[MemoryStorage]
        SQLITE[(SQLiteStorage)]
        REDIS[(RedisStorage v0.4)]
    end
    
    APP -->|即时任务| TP
    APP -->|定时任务| TS
    APP -->|工作流| WF
    TS -->|委托| TP
    WF -->|委托| TP
    TP --> LM
    TP --> QS
    TP --> WP
    TP --> FM
    
    LM --> MEM
    LM --> SQLITE
    LM --> REDIS
    
    WP --> EX
    WP --> EB
    WP --> MC
    WP --> LF

发展路线图

timeline
    title NeoTask 架构演进路线图
    
    section v0.1
        基础任务池 : 本地内存队列
                   : 异步执行引擎
                   : 内存/SQLite存储
                   
    section v0.2
        可观测性 : 事件总线
                 : 指标收集
                 : 健康检查
                 
    section v0.3
        定时调度 : 延时队列/时间轮
                 : 周期任务
                 : Cron表达式
                 
    section v0.4
        分布式基础 : Redis共享队列
                   : 分布式锁
                   
    section v0.5
        性能优化 : 预取机制
                 : 批量操作
                 : 连接池优化

    section v1.0
        高可用保障 : 看门狗续期
                   : 超时检测
                   : 故障自动恢复

    section v1.5
        任务编排 : DAG工作流
                 : 条件分支
                 : 并行执行

    section v2.0
        企业级特性 : 独立Web UI
                   : 多租户隔离
                   : Prometheus集成

快速上手

详细使用方式 请参阅 文档

安装

# 基础安装
pip install neotask

# 带 Redis 分布式支持
pip install neotask[redis]

# 完整安装
pip install neotask[full]

即时任务(TaskPool)

from neotask import TaskPool

async def process(data):
    return {"result": "done", "data": data}

# 创建任务池
pool = TaskPool(executor=process)

# 提交任务
task_id = pool.submit({"id": 123})

# 等待结果
result = pool.wait_for_result(task_id)

pool.shutdown()

定时任务(TaskScheduler)

from neotask import TaskScheduler

scheduler = TaskScheduler(executor=process)

# 延时 60 秒执行
scheduler.submit_delayed({"id": 123}, delay_seconds=60)

# 每 5 分钟执行一次
scheduler.submit_interval({"id": 123}, interval_seconds=300)

# 每天 9 点执行
scheduler.submit_cron({"id": 123}, "0 9 * * *")

scheduler.shutdown()

使用上下文管理器

with TaskPool(executor=process) as pool:
    task_id = pool.submit({"id": 123})
    result = pool.wait_for_result(task_id)

使用事件回调

from neotask import TaskPool

async def on_task_created(event):
    print(f"任务创建: {event.task_id}")

async def on_task_completed(event):
    print(f"任务完成: {event.task_id}, 结果: {event.data}")

async def on_task_failed(event):
    print(f"任务失败: {event.task_id}, 错误: {event.data}")

pool = TaskPool(executor=my_executor)
pool.start()

# 注册事件回调
pool.on_created(on_task_created)
pool.on_completed(on_task_completed)
pool.on_failed(on_task_failed)

task_id = pool.submit({"test": "event"})
result = pool.wait_for_result(task_id)

API 参考

方法 说明
pool.submit(data, priority=2, delay=0) 提交任务
pool.wait_for_result(task_id, timeout=300) 等待结果
pool.get_status(task_id) 获取状态
pool.cancel(task_id) 取消任务
scheduler.submit_delayed(data, delay) 延时任务
scheduler.submit_interval(data, interval) 周期任务
scheduler.submit_cron(data, cron) Cron 任务
engine.submit_workflow(definition) Submit workflow
engine.wait_workflow(execution_id) Wait for workflow

详细 API 请参阅 文档

配置示例

from neotask import TaskPool, TaskPoolConfig

config = TaskPoolConfig(
    worker_concurrency=10,      # 并发 Worker 数
    max_retries=3,              # 重试次数
    storage_type="sqlite",      # 存储类型
)

pool = TaskPool(executor=process, config=config)

详细使用示例请参阅 文档

贡献指南

开发环境设置

# 克隆仓库
git clone https://github.com/neopen/task-schedule-manager.git
cd task-schedule-manager

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 安装开发依赖
pip install -e ".[dev]"

# 运行测试
pytest tests/

# 查看测试覆盖率
pytest --cov=neotask tests/

# 运行特定模块测试
pytest tests/test_task_pool.py -v
pytest tests/test_task_scheduler.py -v

项目结构

neotask/
├── api/           # TaskPool, TaskScheduler, WorkflowEngine
├── core/          # 生命周期、队列、Worker
├── workflow/      # DAG引擎、条件分支、并行执行
├── executor/      # 任务执行器
├── scheduler/     # 定时任务
├── storage/       # 内存/SQLite/Redis
├── event/         # 事件总线
└── models/        # 数据模型

贡献流程

欢迎提交 Issue 和 Pull Request

  1. Fork 项目
  2. 创建特性分支 (git checkout -b feature/amazing)
  3. 提交更改 (git commit -m 'Add amazing feature')
  4. 推送分支 (git push origin feature/amazing)
  5. 提交 Pull Request

代码规范

  • 遵循 PEP 8 代码风格
  • 添加适当的 类型注解
  • 编写单元测试覆盖新功能(覆盖率 ≥ 80%)
  • 更新相关文档和示例代码
  • 提交信息遵循 Conventional Commits

测试要求

# 运行所有测试
pytest tests/

# 运行特定模块测试
pytest tests/test_task_pool.py -v
pytest tests/test_task_scheduler.py -v
pytest tests/test_workflow.py -v 

# 运行手动测试
python examples/01_simple.py
python examples/05_webui.py

问题反馈


许可证

MIT License © 2026 NeoPen


致谢

感谢所有贡献者和开源社区的支持。


联系方式

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

neotask-0.2.0.tar.gz (146.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

neotask-0.2.0-py3-none-any.whl (116.6 kB view details)

Uploaded Python 3

File details

Details for the file neotask-0.2.0.tar.gz.

File metadata

  • Download URL: neotask-0.2.0.tar.gz
  • Upload date:
  • Size: 146.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for neotask-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d93c6586d5a5f40b640361323fb837944fafe1bb7c5e8eebb904c8d1901b449d
MD5 8a417c15b2943c1721d239d524f0ad78
BLAKE2b-256 36b76171c1548ad75332f3df559243148c985f52d29eca79dee9ec45db261853

See more details on using hashes here.

Provenance

The following attestation bundles were made for neotask-0.2.0.tar.gz:

Publisher: publish-pypi.yml on neopen/task-schedule-manager

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file neotask-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: neotask-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 116.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for neotask-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bb965af266fcec0fe5854042800a876a4eff6eeb44a46bce3d3174e007937955
MD5 95cc2caf3b5b5f7c89d18ce06d2a7c0c
BLAKE2b-256 70b1516b92128e26cf403ca7161e302602413091ad7d08ed8682c2889df4aed6

See more details on using hashes here.

Provenance

The following attestation bundles were made for neotask-0.2.0-py3-none-any.whl:

Publisher: publish-pypi.yml on neopen/task-schedule-manager

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page