Skip to main content

NeoTask - 轻量级 Python 异步任务队列,支持即时/延时/周期任务,内置优先级队列、自动重试 | Lightweight Python Async Task Queue Manager

Project description

Distributed Task Scheduling System (NeoTask)

Lightweight Python asynchronous task queue manager, no extra services required, ready to use out of the box.

NeoTask is a pure Python-based asynchronous task queue scheduling system specifically designed for time-consuming tasks (such as AI generation, video processing, data scraping, etc.). It supports scheduled tasks, periodic tasks, and delayed tasks. There is no need to deploy external services like Redis or PostgreSQL. After installation, it can be directly used in any Python project.

中文 | English | Documentation | PyPI | website

License Python 3.8+ PyPI Downloads


Features

  • Zero-Dependency Deployment - Pure Python implementation, no Redis/PostgreSQL required
  • Immediate Tasks - Supports priority scheduling, high-priority tasks execute first
  • Scheduled Tasks - Supports delayed execution, fixed intervals, and Cron expressions
  • Asynchronous Concurrency - Based on asyncio, multi-worker concurrent processing
  • Automatic Retry - Failed tasks automatically retry with configurable attempts
  • Persistence - Multiple storage backends: Memory/SQLite/Redis
  • DAG Workflow - Support for task orchestration, conditional branches, and parallel execution
  • Distributed Support - Distributed task scheduling, high availability, and fault tolerance
  • Event Callbacks - Supports task lifecycle event listeners

Use Cases

Scenario Description Recommended Configuration Entry Point
AI Text-to-Image/Video Generation Queue time-consuming tasks to avoid blocking main flow worker_concurrency=3 TaskPool
Batch File Processing Batch operations like transcoding, compression, uploading worker_concurrency=10 TaskPool
Web Scraping Scheduling Distributed scraping to prevent being blocked storage_type="redis" TaskPool
Scheduled Report Sending Send daily reports at 9 AM cron="0 9 * * *" TaskScheduler
Delayed Notifications Send reminders 5 minutes after user action delay_seconds=300 TaskScheduler
Heartbeat Detection Check service health status every 30 seconds interval_seconds=30 TaskScheduler
Background Data Analysis Execute data aggregation tasks at night cron="0 2 * * *" TaskScheduler

Architecture & Evolution

graph TB
    subgraph User["User Application Layer"]
        APP[User Code]
    end
    
    subgraph NeoTask["NeoTask Core"]
        TP[TaskPool<br/>Immediate Task Entry]
        TS[TaskScheduler<br/>Scheduled Task Entry]
        
        subgraph Core["Shared Core Components"]
            LM[LifecycleManager<br/>Task Lifecycle Management]
            QS[QueueScheduler<br/>Priority + Delayed Queue]
            WP[WorkerPool<br/>Worker Pool/Concurrency Control]
            FM[FutureManager<br/>Async Wait/Result Callback]
        end
        
        subgraph Internal["Internal Components"]
            EB[EventBus<br/>Event Bus]
            MC[MetricsCollector<br/>Metrics Collection]
            LF[LockFactory<br/>Distributed Lock]
        end
        
        EX[TaskExecutor<br/>User Business Logic]
    end
    
    subgraph Storage["Storage Layer"]
        MEM[MemoryStorage]
        SQLITE[(SQLiteStorage)]
        REDIS[(RedisStorage)]
    end
    
    APP -->|Immediate Task| TP
    APP -->|Scheduled Task| TS
    TS -->|Delegates| TP
    TP --> LM
    TP --> QS
    TP --> WP
    TP --> FM
    
    LM --> MEM
    LM --> SQLITE
    LM --> REDIS
    
    WP --> EX
    WP --> EB
    WP --> MC
    WP --> LF

Development Roadmap

timeline
    title NeoTask Architecture Evolution Roadmap
    
    section v0.1
        Basic Task Pool : Local Memory Queue
                        : Async Execution Engine
                        : Memory/SQLite Storage
                        
    section v0.2
        Observability : Event Bus
                      : Metrics Collection
                      : Health Check
                      
    section v0.3
        Scheduled Tasks : Delayed Queue/Time Wheel
                        : Periodic Tasks
                        : Cron Expression
                        
    section v0.4
        Distributed Base : Redis Shared Queue
                         : Distributed Lock
                         
    section v0.5
        Performance Optimization : Prefetch Mechanism
                                : Batch Operations
                                : Connection Pool

    section v1.0
        High Availability : Watchdog Renewal
                          : Timeout Detection
                          : Automatic Fault Recovery

    section v1.5
        Task Orchestration : DAG Workflow
                           : Conditional Branch
                           : Parallel Execution

    section v2.0
        Enterprise Features : Independent Web UI
                            : Multi-Tenancy Isolation
                            : Prometheus Integration

Quick Start

Installation

# Basic installation
pip install neotask

# With Redis distributed support
pip install neotask[redis]

# Full installation
pip install neotask[full]

Immediate Tasks (TaskPool)

from neotask import TaskPool

async def process(data):
    return {"result": "done", "data": data}

# Create task pool
pool = TaskPool(executor=process)

# Submit task
task_id = pool.submit({"id": 123})

# Wait for result
result = pool.wait_for_result(task_id)

pool.shutdown()

Scheduled Tasks (TaskScheduler)

from neotask import TaskScheduler

scheduler = TaskScheduler(executor=process)

# Execute after 60 seconds delay
scheduler.submit_delayed({"id": 123}, delay_seconds=60)

# Execute every 5 minutes
scheduler.submit_interval({"id": 123}, interval_seconds=300)

# Execute daily at 9 AM
scheduler.submit_cron({"id": 123}, "0 9 * * *")

scheduler.shutdown()

Using Context Manager

with TaskPool(executor=process) as pool:
    task_id = pool.submit({"id": 123})
    result = pool.wait_for_result(task_id)

Using Event Callbacks

from neotask import TaskPool

async def on_task_created(event):
    print(f"Task created: {event.task_id}")

async def on_task_completed(event):
    print(f"Task completed: {event.task_id}, Result: {event.data}")

async def on_task_failed(event):
    print(f"Task failed: {event.task_id}, Error: {event.data}")

pool = TaskPool(executor=my_executor)
pool.start()

# Register event callbacks
pool.on_created(on_task_created)
pool.on_completed(on_task_completed)
pool.on_failed(on_task_failed)

task_id = pool.submit({"test": "event"})
result = pool.wait_for_result(task_id)

API Reference

Method Description
pool.submit(data, priority=2, delay=0) Submit task
pool.wait_for_result(task_id, timeout=300) Wait for result
pool.get_status(task_id) Get status
pool.cancel(task_id) Cancel task
scheduler.submit_delayed(data, delay) Delayed task
scheduler.submit_interval(data, interval) Periodic task
scheduler.submit_cron(data, cron) Cron task
engine.submit_workflow(definition) Submit workflow (v1.5)
engine.wait_workflow(execution_id) Wait for workflow (v1.5)

Detailed API documentation can be found here

Configuration Example

from neotask import TaskPool, TaskPoolConfig

config = TaskPoolConfig(
    worker_concurrency=10,      # Number of concurrent workers
    max_retries=3,              # Number of retries
    storage_type="sqlite",      # Storage type
)

pool = TaskPool(executor=process, config=config)

Detailed usage examples can be found here

Contribution Guide

Setting Up Development Environment

# Clone repository
git clone https://github.com/neopen/neotask.git
cd neotask

# Create virtual environment
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest tests/

# View test coverage
pytest --cov=neotask tests/

# Run specific module tests
pytest tests/test_task_pool.py -v
pytest tests/test_task_scheduler.py -v

Project Structure

neotask/
├── api/           # TaskPool, TaskScheduler
├── core/          # Lifecycle, Queue, Worker
├── workflow/      # Workflow  Engine
├── engine/        # Task Orchestration
├── executor/      # Async Execution Engine
├── storage/       # Memory/SQLite/Redis
├── event/         # Event Bus
└── models/        # Data Models

Contribution Workflow

Welcome to submit Issues and Pull Requests

  1. Fork the project
  2. Create a feature branch (git checkout -b feature/amazing)
  3. Commit changes (git commit -m 'Add amazing feature')
  4. Push branch (git push origin feature/amazing)
  5. Submit Pull Request

Code Style

Testing Requirements

# Run all tests
pytest tests/

# Run specific module tests
pytest tests/unit/test_task.py

# Run manual tests
python examples/01_simple.py
python examples/05_webui.py

Issue Reporting

  • Submit Issue: https://github.com/neopen/neotask/issues
  • Feature Suggestions: Use Enhancement label
  • Bug Reports: Use Bug label and provide reproduction steps
  • Security Vulnerabilities: Please send email directly to the author's email

License

MIT License © 2026 NeoPen


Acknowledgments

Thanks to all contributors and the open source community for their support.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

neotask-0.4.0.tar.gz (238.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

neotask-0.4.0-py3-none-any.whl (144.7 kB view details)

Uploaded Python 3

File details

Details for the file neotask-0.4.0.tar.gz.

File metadata

  • Download URL: neotask-0.4.0.tar.gz
  • Upload date:
  • Size: 238.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for neotask-0.4.0.tar.gz
Algorithm Hash digest
SHA256 d4839c2509006ed1992c12d1cfc73cbfacbbe6058c64b26a1980b17bc680c06e
MD5 5508ccdf72b98204a26518e6f8d45a0c
BLAKE2b-256 61d5fa5303babd55ea185ff98039aed595121de1adbdb2959c14e25662ee3dd2

See more details on using hashes here.

Provenance

The following attestation bundles were made for neotask-0.4.0.tar.gz:

Publisher: publish-pypi.yml on neopen/neotask

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file neotask-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: neotask-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 144.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for neotask-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 be829e2f53561509a62df34551fb4acad504857d7a566483040421d8b53beb42
MD5 e821c6c4bbd7b2ef2a48ef0a05a399c7
BLAKE2b-256 e64000e3051f052764ec02543d85ec559b1b2aac24bf39f44a2a4c2ed3ff9895

See more details on using hashes here.

Provenance

The following attestation bundles were made for neotask-0.4.0-py3-none-any.whl:

Publisher: publish-pypi.yml on neopen/neotask

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page