Skip to main content

NeoTask - 轻量级 Python 异步任务队列,支持即时/延时/周期任务,内置优先级队列、自动重试 | Lightweight Python Async Task Queue Manager

Project description

分布式任务调度系统(NeoTask)

轻量级 Python 异步任务队列管理器,无需额外服务,开箱即用。

NeoTask 是一个纯 Python 实现的异步任务队列调度系统,专为耗时任务(AI 生成、视频处理、数据爬取等)设计,支持定时任务、周期任务、延迟任务。无需部署 Redis、PostgreSQL 等外部服务,安装后即可在任意 Python 项目中直接使用。

中文 | English | 文档 | PyPI

PyPI version Python 3.8+ License


特性

  • 零依赖部署 - 纯 Python 实现,无需 Redis/PostgreSQL
  • 即时任务 - 支持优先级调度,高优先级优先执行
  • 定时任务 - 支持延时执行、固定间隔、Cron 表达式
  • 异步并发 - 基于 asyncio,多 Worker 并发处理
  • 自动重试 - 失败任务自动重试,可配置次数
  • 持久化 - 内存/SQLite/Redis 多种存储后端
  • 事件回调 - 支持任务生命周期事件监听

应用场景

场景 说明 推荐配置 使用入口
AI 文生图/视频生成 耗时任务排队,避免阻塞主流程 worker_concurrency=3 TaskPool
批量文件处理 转码、压缩、上传等批量操作 worker_concurrency=10 TaskPool
网页爬虫调度 分布式爬取,防止被封 storage_type="redis" TaskPool
定时报表发送 每天9点发送日报 cron="0 9 * * *" TaskScheduler
延迟通知 用户操作后5分钟发送提醒 delay_seconds=300 TaskScheduler
心跳检测 每30秒检测服务健康状态 interval_seconds=30 TaskScheduler
后台数据分析 夜间执行数据聚合任务 cron="0 2 * * *" TaskScheduler

架构&演进

graph TB
    subgraph User["用户应用层"]
        APP[用户代码]
    end
    
    subgraph NeoTask["NeoTask 核心"]
        TP[TaskPool<br/>即时任务入口]
        TS[TaskScheduler<br/>定时任务入口]
        
        subgraph Core["共享核心组件"]
            LM[LifecycleManager<br/>任务生命周期管理]
            QS[QueueScheduler<br/>优先级+延迟队列]
            WP[WorkerPool<br/>Worker池/并发控制]
            FM[FutureManager<br/>异步等待/结果回调]
        end
        
        subgraph Internal["内部组件"]
            EB[EventBus<br/>事件总线]
            MC[MetricsCollector<br/>指标收集]
            LF[LockFactory<br/>分布式锁]
        end
        
        EX[TaskExecutor<br/>用户业务逻辑]
    end
    
    subgraph Storage["存储层"]
        MEM[MemoryStorage]
        SQLITE[(SQLiteStorage)]
        REDIS[(RedisStorage)]
    end
    
    APP -->|即时任务| TP
    APP -->|定时任务| TS
    TS -->|委托| TP
    TP --> LM
    TP --> QS
    TP --> WP
    TP --> FM
    
    LM --> MEM
    LM --> SQLITE
    LM --> REDIS
    
    WP --> EX
    WP --> EB
    WP --> MC
    WP --> LF

发展路线图

timeline
    title NeoTask 架构演进路线图
    
    section MVP v0.1
        基础任务池 : 本地队列
                   : 异步执行
                   : 内存/Redis存储
                   
    section V1.0 v0.2
        监控能力 : 事件总线
                 : 指标收集
                 : 历史存储
        延时任务 : 延时执行
                 : 周期任务
                 : Cron表达式
                 
    section V2.0 v0.3
        分布式核心 : Redis共享队列
                   : 分布式锁
                   : 预取机制
                   
    section V3.0 v1.0
        高可用保障 : 看门狗续期
                   : 超时检测
                   : 故障自动恢复

    section V4.0 v1.5
        企业级 : 独立Web UI
               : 多租户
               : Prometheus集成

快速上手

安装

# 基础安装
pip install neotask

# 带 Redis 分布式支持
pip install neotask[redis]

# 完整安装
pip install neotask[full]

即时任务(TaskPool)

from neotask import TaskPool

async def process(data):
    return {"result": "done", "data": data}

# 创建任务池
pool = TaskPool(executor=process)

# 提交任务
task_id = pool.submit({"id": 123})

# 等待结果
result = pool.wait_for_result(task_id)

pool.shutdown()

定时任务(TaskScheduler)

from neotask import TaskScheduler

scheduler = TaskScheduler(executor=process)

# 延时 60 秒执行
scheduler.submit_delayed({"id": 123}, delay_seconds=60)

# 每 5 分钟执行一次
scheduler.submit_interval({"id": 123}, interval_seconds=300)

# 每天 9 点执行
scheduler.submit_cron({"id": 123}, "0 9 * * *")

scheduler.shutdown()

使用上下文管理器

with TaskPool(executor=process) as pool:
    task_id = pool.submit({"id": 123})
    result = pool.wait_for_result(task_id)

使用事件回调

from neotask import TaskPool

async def on_task_created(event):
    print(f"任务创建: {event.task_id}")

async def on_task_completed(event):
    print(f"任务完成: {event.task_id}, 结果: {event.data}")

async def on_task_failed(event):
    print(f"任务失败: {event.task_id}, 错误: {event.data}")

pool = TaskPool(executor=my_executor)
pool.start()

# 注册事件回调
pool.on_created(on_task_created)
pool.on_completed(on_task_completed)
pool.on_failed(on_task_failed)

task_id = pool.submit({"test": "event"})
result = pool.wait_for_result(task_id)

API 参考

方法 说明
pool.submit(data, priority=2, delay=0) 提交任务
pool.wait_for_result(task_id, timeout=300) 等待结果
pool.get_status(task_id) 获取状态
pool.cancel(task_id) 取消任务
scheduler.submit_delayed(data, delay) 延时任务
scheduler.submit_interval(data, interval) 周期任务
scheduler.submit_cron(data, cron) Cron 任务

详细 API 请参阅 文档

配置示例

from neotask import TaskPool, TaskPoolConfig

config = TaskPoolConfig(
    worker_concurrency=10,      # 并发 Worker 数
    max_retries=3,              # 重试次数
    storage_type="sqlite",      # 存储类型
)

pool = TaskPool(executor=process, config=config)

详细使用示例请参阅 文档

贡献指南

开发环境设置

# 克隆仓库
git clone https://github.com/neopen/task-schedule-manager.git
cd task-schedule-manager

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 安装开发依赖
pip install -e ".[dev]"

# 运行测试
pytest tests/

# 查看测试覆盖率
pytest --cov=neotask tests/

# 运行特定模块测试
pytest tests/test_task_pool.py -v
pytest tests/test_task_scheduler.py -v

项目结构

neotask/
├── api/           # TaskPool, TaskScheduler
├── core/          # 生命周期、队列、Worker
├── storage/       # 内存/SQLite/Redis
├── event/         # 事件总线
└── models/        # 数据模型

贡献流程

欢迎提交 Issue 和 Pull Request

  1. Fork 项目
  2. 创建特性分支 (git checkout -b feature/amazing)
  3. 提交更改 (git commit -m 'Add amazing feature')
  4. 推送分支 (git push origin feature/amazing)
  5. 提交 Pull Request

代码规范

  • 遵循 PEP 8 代码风格
  • 添加适当的 类型注解
  • 编写单元测试覆盖新功能(覆盖率 ≥ 80%)
  • 更新相关文档和示例代码
  • 提交信息遵循 Conventional Commits

测试要求

# 运行所有测试
pytest tests/

# 运行特定模块测试
pytest tests/unit/test_task.py

# 运行手动测试
python examples/01_simple.py
python examples/05_webui.py

问题反馈


许可证

MIT License © 2026 NeoPen


致谢

感谢所有贡献者和开源社区的支持。


联系方式

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

neotask-0.1.1.tar.gz (104.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

neotask-0.1.1-py3-none-any.whl (97.9 kB view details)

Uploaded Python 3

File details

Details for the file neotask-0.1.1.tar.gz.

File metadata

  • Download URL: neotask-0.1.1.tar.gz
  • Upload date:
  • Size: 104.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for neotask-0.1.1.tar.gz
Algorithm Hash digest
SHA256 2a155a3d8e232f0898b0610782101e03844ac879f026ccfdac2488ce19d45810
MD5 5a47a114092108976653713437267e3c
BLAKE2b-256 17715ab26ab16fff21cf5fe88fa18de84b2f4ac98d5fff22be3a6368176c801e

See more details on using hashes here.

Provenance

The following attestation bundles were made for neotask-0.1.1.tar.gz:

Publisher: publish-pypi.yml on neopen/task-schedule-manager

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file neotask-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: neotask-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 97.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for neotask-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c72385b87e587395ff976db430c991a77805b8d9dfcd4659314716224cdb7eb5
MD5 22edac8f4c7efe5cc33b59b48649b1ba
BLAKE2b-256 2a9f9a9faf123b7aa075a8062e656990b111de2a9443e1f576f6d2b2267e3162

See more details on using hashes here.

Provenance

The following attestation bundles were made for neotask-0.1.1-py3-none-any.whl:

Publisher: publish-pypi.yml on neopen/task-schedule-manager

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page