Package for basic task queue in the filesystem.
Project description
TaskCore
A lightweight, filesystem-based task queue system for Python. TaskCore provides a simple way to distribute and manage tasks across multiple processes or machines using the filesystem as the backend.
Features
- Simple Setup: No external dependencies - uses only Python standard library
- Filesystem-based: Tasks are stored as JSON files on disk
- Multi-process Support: Built-in support for distributed task processing
- Fault Tolerant: Automatic task recovery and stale task reclamation
- Atomic Operations: Safe concurrent access using atomic file operations
- SLURM Integration: Works seamlessly with SLURM job schedulers
Installation
pip install taskcore
Or install from source:
git clone https://github.com/goncalorafaria/taskcore
cd taskcore
pip install -e .
Quick Start
Basic Usage
from taskcore import FileSystemTaskQueueClient
# Initialize the task queue
queue = FileSystemTaskQueueClient("/path/to/task/directory")
# Add tasks to the queue
task_config = {
"learning_rate": 1e-4,
"batch_size": 32,
"epochs": 100,
"model_name": "bert-base-uncased"
}
task_id = queue.add_task(task_config)
print(f"Added task with ID: {task_id}")
# Process tasks
def init_function(**config):
# Initialize your model/trainer here
trainer = create_trainer(**config)
return trainer, {"status": "initialized"}
def run_function(trainer):
# Run your training/processing here
trainer.train()
# Fetch and run tasks
while queue.fetch_and_run_task(init_func=init_function, func=run_function):
pass
Multi-Process Example
import os
from taskcore import FileSystemTaskQueueClient
def main():
# Use LOCAL_RANK for multi-process setups (e.g., with torch.distributed)
rank = int(os.environ.get("LOCAL_RANK", 0))
queue = FileSystemTaskQueueClient("/path/to/tasks", rank=rank)
def init_func(**config):
# Initialize your distributed training setup
trainer = create_distributed_trainer(**config)
return trainer, {"wandb_link": trainer.wandb_link}
def run_func(trainer):
trainer.train()
# Process tasks until none are available
while queue.fetch_and_run_task(init_func=init_func, func=run_func):
pass
if __name__ == "__main__":
main()
Task Generation Example
from taskcore import FileSystemTaskQueueClient
# Initialize queue
queue = FileSystemTaskQueueClient("/path/to/task/directory")
# Define hyperparameter sweep
learning_rates = [1e-4, 1e-5, 1e-6]
batch_sizes = [16, 32, 64]
model_names = ["bert-base-uncased", "roberta-base"]
# Generate all combinations
for lr in learning_rates:
for batch_size in batch_sizes:
for model_name in model_names:
config = {
"learning_rate": lr,
"batch_size": batch_size,
"model_name": model_name,
"epochs": 100,
"wandb_project": "my-experiment"
}
task_id = queue.add_task(config)
print(f"Added task {task_id}: {config}")
API Reference
FileSystemTaskQueueClient
The main client class for interacting with the task queue.
Constructor
FileSystemTaskQueueClient(base_dir: str, rank: int = 0)
base_dir: Directory where task files are storedrank: Process rank for multi-process setups (default: 0)
Methods
add_task(task_dict: Dict) -> str
Add a new task to the queue.
task_id = queue.add_task({"param": "value"})
fetch_and_run_task(init_func: Callable, func: Callable) -> bool
Fetch a task, initialize it with init_func, and run it with func.
def init_func(**config):
return trainer, extra_info
def run_func(trainer):
trainer.train()
success = queue.fetch_and_run_task(init_func, run_func)
fetch_task() -> Tuple[str, Dict]
Fetch the next available task.
task_id, config = queue.fetch_task()
finish_current_task()
Mark the current task as completed.
release_current_task()
Release the current task back to the pending queue.
edit_current_task(new_dict: Dict)
Update the current task's configuration.
get_current_task() -> Dict
Get the configuration of the current task.
has_task() -> bool
Check if a task is currently being processed.
Directory Structure
TaskCore creates the following directory structure:
base_dir/
├── pending/ # Tasks waiting to be processed
├── running/ # Tasks currently being processed
└── finished/ # Completed tasks
Environment Variables
TaskCore uses these environment variables for distributed processing:
SLURM_JOB_ID: Job ID from SLURM schedulerMY_JOB_ID: Fallback job ID if SLURM_JOB_ID is not availableLOCAL_RANK: Process rank in multi-process setups
Error Handling
Tasks that fail during execution are automatically released back to the pending queue:
try:
queue.fetch_and_run_task(init_func, run_func)
except Exception as e:
print(f"Task failed: {e}")
# Task is automatically released back to pending queue
Stale Task Recovery
TaskCore automatically reclaims stale tasks (default: 4 hours timeout):
# Custom timeout in seconds
queue.queue.reclaim_stale_tasks(timeout_seconds=3600) # 1 hour
License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskcore_lib-0.0.2.tar.gz.
File metadata
- Download URL: taskcore_lib-0.0.2.tar.gz
- Upload date:
- Size: 8.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3c529d2a49bd36f4d8c4ce43b2e6cba35afc6ac09526c387ebac37caa56b842d
|
|
| MD5 |
6608be383c5c7ab855a64f1458aaed45
|
|
| BLAKE2b-256 |
67d3a3c14a67ef6e70b8eb2b14ce2d2e6ac7609e5a6f5f084b35c819944602d3
|
File details
Details for the file taskcore_lib-0.0.2-py3-none-any.whl.
File metadata
- Download URL: taskcore_lib-0.0.2-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e0e9e2dba91787ebec661236c6e7f34eafd3340ebe2b007159a4e268c88d2712
|
|
| MD5 |
b867ef094d2abd29da0b1460ecdb71dd
|
|
| BLAKE2b-256 |
4aa84c90cd2184411ee32f376cee2725a462981f584d17ba14e5d5d64273347c
|