Skip to main content

Package for basic task queue in the filesystem.

Project description

TaskCore

A lightweight, filesystem-based task queue system for Python. TaskCore provides a simple way to distribute and manage tasks across multiple processes or machines using the filesystem as the backend.

Features

  • Simple Setup: No external dependencies - uses only Python standard library
  • Filesystem-based: Tasks are stored as JSON files on disk
  • Multi-process Support: Built-in support for distributed task processing
  • Fault Tolerant: Automatic task recovery and stale task reclamation
  • Atomic Operations: Safe concurrent access using atomic file operations
  • SLURM Integration: Works seamlessly with SLURM job schedulers

Installation

pip install taskcore

Or install from source:

git clone https://github.com/goncalorafaria/taskcore
cd taskcore
pip install -e .

Quick Start

Basic Usage

from taskcore import FileSystemTaskQueueClient

# Initialize the task queue
queue = FileSystemTaskQueueClient("/path/to/task/directory")

# Add tasks to the queue
task_config = {
    "learning_rate": 1e-4,
    "batch_size": 32,
    "epochs": 100,
    "model_name": "bert-base-uncased"
}

task_id = queue.add_task(task_config)
print(f"Added task with ID: {task_id}")

# Process tasks
def init_function(**config):
    # Initialize your model/trainer here
    trainer = create_trainer(**config)
    return trainer, {"status": "initialized"}

def run_function(trainer):
    # Run your training/processing here
    trainer.train()

# Fetch and run tasks
while queue.fetch_and_run_task(init_func=init_function, func=run_function):
    pass

Multi-Process Example

import os
from taskcore import FileSystemTaskQueueClient

def main():
    # Use LOCAL_RANK for multi-process setups (e.g., with torch.distributed)
    rank = int(os.environ.get("LOCAL_RANK", 0))
    queue = FileSystemTaskQueueClient("/path/to/tasks", rank=rank)
    
    def init_func(**config):
        # Initialize your distributed training setup
        trainer = create_distributed_trainer(**config)
        return trainer, {"wandb_link": trainer.wandb_link}
    
    def run_func(trainer):
        trainer.train()
    
    # Process tasks until none are available
    while queue.fetch_and_run_task(init_func=init_func, func=run_func):
        pass

if __name__ == "__main__":
    main()

Task Generation Example

from taskcore import FileSystemTaskQueueClient

# Initialize queue
queue = FileSystemTaskQueueClient("/path/to/task/directory")

# Define hyperparameter sweep
learning_rates = [1e-4, 1e-5, 1e-6]
batch_sizes = [16, 32, 64]
model_names = ["bert-base-uncased", "roberta-base"]

# Generate all combinations
for lr in learning_rates:
    for batch_size in batch_sizes:
        for model_name in model_names:
            config = {
                "learning_rate": lr,
                "batch_size": batch_size,
                "model_name": model_name,
                "epochs": 100,
                "wandb_project": "my-experiment"
            }
            
            task_id = queue.add_task(config)
            print(f"Added task {task_id}: {config}")

API Reference

FileSystemTaskQueueClient

The main client class for interacting with the task queue.

Constructor

FileSystemTaskQueueClient(base_dir: str, rank: int = 0)
  • base_dir: Directory where task files are stored
  • rank: Process rank for multi-process setups (default: 0)

Methods

add_task(task_dict: Dict) -> str

Add a new task to the queue.

task_id = queue.add_task({"param": "value"})
fetch_and_run_task(init_func: Callable, func: Callable) -> bool

Fetch a task, initialize it with init_func, and run it with func.

def init_func(**config):
    return trainer, extra_info

def run_func(trainer):
    trainer.train()

success = queue.fetch_and_run_task(init_func, run_func)
fetch_task() -> Tuple[str, Dict]

Fetch the next available task.

task_id, config = queue.fetch_task()
finish_current_task()

Mark the current task as completed.

release_current_task()

Release the current task back to the pending queue.

edit_current_task(new_dict: Dict)

Update the current task's configuration.

get_current_task() -> Dict

Get the configuration of the current task.

has_task() -> bool

Check if a task is currently being processed.

Directory Structure

TaskCore creates the following directory structure:

base_dir/
├── pending/     # Tasks waiting to be processed
├── running/     # Tasks currently being processed
└── finished/    # Completed tasks

Environment Variables

TaskCore uses these environment variables for distributed processing:

  • SLURM_JOB_ID: Job ID from SLURM scheduler
  • MY_JOB_ID: Fallback job ID if SLURM_JOB_ID is not available
  • LOCAL_RANK: Process rank in multi-process setups

Error Handling

Tasks that fail during execution are automatically released back to the pending queue:

try:
    queue.fetch_and_run_task(init_func, run_func)
except Exception as e:
    print(f"Task failed: {e}")
    # Task is automatically released back to pending queue

Stale Task Recovery

TaskCore automatically reclaims stale tasks (default: 4 hours timeout):

# Custom timeout in seconds
queue.queue.reclaim_stale_tasks(timeout_seconds=3600)  # 1 hour

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskcore_lib-0.0.2.tar.gz (8.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskcore_lib-0.0.2-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file taskcore_lib-0.0.2.tar.gz.

File metadata

  • Download URL: taskcore_lib-0.0.2.tar.gz
  • Upload date:
  • Size: 8.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.15

File hashes

Hashes for taskcore_lib-0.0.2.tar.gz
Algorithm Hash digest
SHA256 3c529d2a49bd36f4d8c4ce43b2e6cba35afc6ac09526c387ebac37caa56b842d
MD5 6608be383c5c7ab855a64f1458aaed45
BLAKE2b-256 67d3a3c14a67ef6e70b8eb2b14ce2d2e6ac7609e5a6f5f084b35c819944602d3

See more details on using hashes here.

File details

Details for the file taskcore_lib-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: taskcore_lib-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.15

File hashes

Hashes for taskcore_lib-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e0e9e2dba91787ebec661236c6e7f34eafd3340ebe2b007159a4e268c88d2712
MD5 b867ef094d2abd29da0b1460ecdb71dd
BLAKE2b-256 4aa84c90cd2184411ee32f376cee2725a462981f584d17ba14e5d5d64273347c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page