NeoTask - 轻量级 Python 异步任务队列,支持即时/延时/周期任务,内置优先级队列、自动重试 | Lightweight Python Async Task Queue Manager
Project description
Distributed Task Scheduling System (NeoTask)
Lightweight Python asynchronous task queue manager, no extra services required, ready to use out of the box.
NeoTask is a pure Python-based asynchronous task queue scheduling system specifically designed for time-consuming tasks (such as AI generation, video processing, data scraping, etc.). It supports scheduled tasks, periodic tasks, and delayed tasks. There is no need to deploy external services like Redis or PostgreSQL. After installation, it can be directly used in any Python project.
中文 | English | Documentation | PyPI | website
Features
- Zero-Dependency Deployment - Pure Python implementation, no Redis/PostgreSQL required
- Immediate Tasks - Supports priority scheduling, high-priority tasks execute first
- Scheduled Tasks - Supports delayed execution, fixed intervals, and Cron expressions
- Asynchronous Concurrency - Based on asyncio, multi-worker concurrent processing
- Automatic Retry - Failed tasks automatically retry with configurable attempts
- Persistence - Multiple storage backends: Memory/SQLite/Redis
- DAG Workflow - Support for task orchestration, conditional branches, and parallel execution
- Distributed Support - Distributed task scheduling, high availability, and fault tolerance
- Event Callbacks - Supports task lifecycle event listeners
Use Cases
| Scenario | Description | Recommended Configuration | Entry Point |
|---|---|---|---|
| AI Text-to-Image/Video Generation | Queue time-consuming tasks to avoid blocking main flow | worker_concurrency=3 |
TaskPool |
| Batch File Processing | Batch operations like transcoding, compression, uploading | worker_concurrency=10 |
TaskPool |
| Web Scraping Scheduling | Distributed scraping to prevent being blocked | storage_type="redis" |
TaskPool |
| Scheduled Report Sending | Send daily reports at 9 AM | cron="0 9 * * *" |
TaskScheduler |
| Delayed Notifications | Send reminders 5 minutes after user action | delay_seconds=300 |
TaskScheduler |
| Heartbeat Detection | Check service health status every 30 seconds | interval_seconds=30 |
TaskScheduler |
| Background Data Analysis | Execute data aggregation tasks at night | cron="0 2 * * *" |
TaskScheduler |
Architecture & Evolution
graph TB
subgraph User["User Application Layer"]
APP[User Code]
end
subgraph NeoTask["NeoTask Core"]
TP[TaskPool<br/>Immediate Task Entry]
TS[TaskScheduler<br/>Scheduled Task Entry]
subgraph Core["Shared Core Components"]
LM[LifecycleManager<br/>Task Lifecycle Management]
QS[QueueScheduler<br/>Priority + Delayed Queue]
WP[WorkerPool<br/>Worker Pool/Concurrency Control]
FM[FutureManager<br/>Async Wait/Result Callback]
end
subgraph Internal["Internal Components"]
EB[EventBus<br/>Event Bus]
MC[MetricsCollector<br/>Metrics Collection]
LF[LockFactory<br/>Distributed Lock]
end
EX[TaskExecutor<br/>User Business Logic]
end
subgraph Storage["Storage Layer"]
MEM[MemoryStorage]
SQLITE[(SQLiteStorage)]
REDIS[(RedisStorage)]
end
APP -->|Immediate Task| TP
APP -->|Scheduled Task| TS
TS -->|Delegates| TP
TP --> LM
TP --> QS
TP --> WP
TP --> FM
LM --> MEM
LM --> SQLITE
LM --> REDIS
WP --> EX
WP --> EB
WP --> MC
WP --> LF
Development Roadmap
timeline
title NeoTask Architecture Evolution Roadmap
section v0.1
Basic Task Pool : Local Memory Queue
: Async Execution Engine
: Memory/SQLite Storage
section v0.2
Observability : Event Bus
: Metrics Collection
: Health Check
section v0.3
Scheduled Tasks : Delayed Queue/Time Wheel
: Periodic Tasks
: Cron Expression
section v0.4
Distributed Base : Redis Shared Queue
: Distributed Lock
section v0.5
Performance Optimization : Prefetch Mechanism
: Batch Operations
: Connection Pool
section v1.0
High Availability : Watchdog Renewal
: Timeout Detection
: Automatic Fault Recovery
section v1.5
Task Orchestration : DAG Workflow
: Conditional Branch
: Parallel Execution
section v2.0
Enterprise Features : Independent Web UI
: Multi-Tenancy Isolation
: Prometheus Integration
Quick Start
Installation
# Basic installation
pip install neotask
# With Redis distributed support
pip install neotask[redis]
# Full installation
pip install neotask[full]
Immediate Tasks (TaskPool)
from neotask import TaskPool
async def process(data):
return {"result": "done", "data": data}
# Create task pool
pool = TaskPool(executor=process)
# Submit task
task_id = pool.submit({"id": 123})
# Wait for result
result = pool.wait_for_result(task_id)
pool.shutdown()
Scheduled Tasks (TaskScheduler)
from neotask import TaskScheduler
scheduler = TaskScheduler(executor=process)
# Execute after 60 seconds delay
scheduler.submit_delayed({"id": 123}, delay_seconds=60)
# Execute every 5 minutes
scheduler.submit_interval({"id": 123}, interval_seconds=300)
# Execute daily at 9 AM
scheduler.submit_cron({"id": 123}, "0 9 * * *")
scheduler.shutdown()
Using Context Manager
with TaskPool(executor=process) as pool:
task_id = pool.submit({"id": 123})
result = pool.wait_for_result(task_id)
Using Event Callbacks
from neotask import TaskPool
async def on_task_created(event):
print(f"Task created: {event.task_id}")
async def on_task_completed(event):
print(f"Task completed: {event.task_id}, Result: {event.data}")
async def on_task_failed(event):
print(f"Task failed: {event.task_id}, Error: {event.data}")
pool = TaskPool(executor=my_executor)
pool.start()
# Register event callbacks
pool.on_created(on_task_created)
pool.on_completed(on_task_completed)
pool.on_failed(on_task_failed)
task_id = pool.submit({"test": "event"})
result = pool.wait_for_result(task_id)
API Reference
| Method | Description |
|---|---|
pool.submit(data, priority=2, delay=0) |
Submit task |
pool.wait_for_result(task_id, timeout=300) |
Wait for result |
pool.get_status(task_id) |
Get status |
pool.cancel(task_id) |
Cancel task |
scheduler.submit_delayed(data, delay) |
Delayed task |
scheduler.submit_interval(data, interval) |
Periodic task |
scheduler.submit_cron(data, cron) |
Cron task |
engine.submit_workflow(definition) |
Submit workflow (v1.5) |
engine.wait_workflow(execution_id) |
Wait for workflow (v1.5) |
Detailed API documentation can be found here
Configuration Example
from neotask import TaskPool, TaskPoolConfig
config = TaskPoolConfig(
worker_concurrency=10, # Number of concurrent workers
max_retries=3, # Number of retries
storage_type="sqlite", # Storage type
)
pool = TaskPool(executor=process, config=config)
Detailed usage examples can be found here
Contribution Guide
Setting Up Development Environment
# Clone repository
git clone https://github.com/neopen/neotask.git
cd neotask
# Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/
# View test coverage
pytest --cov=neotask tests/
# Run specific module tests
pytest tests/test_task_pool.py -v
pytest tests/test_task_scheduler.py -v
Project Structure
neotask/
├── api/ # TaskPool, TaskScheduler
├── core/ # Lifecycle, Queue, Worker
├── workflow/ # Workflow Engine
├── engine/ # Task Orchestration
├── executor/ # Async Execution Engine
├── storage/ # Memory/SQLite/Redis
├── event/ # Event Bus
└── models/ # Data Models
Contribution Workflow
Welcome to submit Issues and Pull Requests
- Fork the project
- Create a feature branch (
git checkout -b feature/amazing) - Commit changes (
git commit -m 'Add amazing feature') - Push branch (
git push origin feature/amazing) - Submit Pull Request
Code Style
- Follow PEP 8 code style
- Add appropriate type annotations
- Write unit tests for new features (coverage ≥ 80%)
- Update relevant documentation and example code
- Commit messages follow Conventional Commits
Testing Requirements
# Run all tests
pytest tests/
# Run specific module tests
pytest tests/unit/test_task.py
# Run manual tests
python examples/01_simple.py
python examples/05_webui.py
Issue Reporting
- Submit Issue: https://github.com/neopen/neotask/issues
- Feature Suggestions: Use Enhancement label
- Bug Reports: Use Bug label and provide reproduction steps
- Security Vulnerabilities: Please send email directly to the author's email
License
MIT License © 2026 NeoPen
Acknowledgments
Thanks to all contributors and the open source community for their support.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file neotask-0.4.0.tar.gz.
File metadata
- Download URL: neotask-0.4.0.tar.gz
- Upload date:
- Size: 238.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d4839c2509006ed1992c12d1cfc73cbfacbbe6058c64b26a1980b17bc680c06e
|
|
| MD5 |
5508ccdf72b98204a26518e6f8d45a0c
|
|
| BLAKE2b-256 |
61d5fa5303babd55ea185ff98039aed595121de1adbdb2959c14e25662ee3dd2
|
Provenance
The following attestation bundles were made for neotask-0.4.0.tar.gz:
Publisher:
publish-pypi.yml on neopen/neotask
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
neotask-0.4.0.tar.gz -
Subject digest:
d4839c2509006ed1992c12d1cfc73cbfacbbe6058c64b26a1980b17bc680c06e - Sigstore transparency entry: 1419236305
- Sigstore integration time:
-
Permalink:
neopen/neotask@0d705bd04f8887e56584e15488009a97a0ab77a5 -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/neopen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@0d705bd04f8887e56584e15488009a97a0ab77a5 -
Trigger Event:
release
-
Statement type:
File details
Details for the file neotask-0.4.0-py3-none-any.whl.
File metadata
- Download URL: neotask-0.4.0-py3-none-any.whl
- Upload date:
- Size: 144.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
be829e2f53561509a62df34551fb4acad504857d7a566483040421d8b53beb42
|
|
| MD5 |
e821c6c4bbd7b2ef2a48ef0a05a399c7
|
|
| BLAKE2b-256 |
e64000e3051f052764ec02543d85ec559b1b2aac24bf39f44a2a4c2ed3ff9895
|
Provenance
The following attestation bundles were made for neotask-0.4.0-py3-none-any.whl:
Publisher:
publish-pypi.yml on neopen/neotask
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
neotask-0.4.0-py3-none-any.whl -
Subject digest:
be829e2f53561509a62df34551fb4acad504857d7a566483040421d8b53beb42 - Sigstore transparency entry: 1419236421
- Sigstore integration time:
-
Permalink:
neopen/neotask@0d705bd04f8887e56584e15488009a97a0ab77a5 -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/neopen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@0d705bd04f8887e56584e15488009a97a0ab77a5 -
Trigger Event:
release
-
Statement type: