Skip to main content

Package for basic task queue in the filesystem.

Project description

TaskCore

A lightweight, filesystem-based task queue system for Python. TaskCore provides a simple way to distribute and manage tasks across multiple processes or machines using the filesystem as the backend.

Features

  • Simple Setup: No external dependencies - uses only Python standard library
  • Filesystem-based: Tasks are stored as JSON files on disk
  • Multi-process Support: Built-in support for distributed task processing
  • Fault Tolerant: Automatic task recovery and stale task reclamation
  • Atomic Operations: Safe concurrent access using atomic file operations
  • SLURM Integration: Works seamlessly with SLURM job schedulers

Installation

pip install taskcore

Or install from source:

git clone https://github.com/goncalorafaria/taskcore
cd taskcore
pip install -e .

Quick Start

Basic Usage

from taskcore import FileSystemTaskQueueClient

# Initialize the task queue
queue = FileSystemTaskQueueClient("/path/to/task/directory")

# Add tasks to the queue
task_config = {
    "learning_rate": 1e-4,
    "batch_size": 32,
    "epochs": 100,
    "model_name": "bert-base-uncased"
}

task_id = queue.add_task(task_config)
print(f"Added task with ID: {task_id}")

# Process tasks
def init_function(**config):
    # Initialize your model/trainer here
    trainer = create_trainer(**config)
    return trainer, {"status": "initialized"}

def run_function(trainer):
    # Run your training/processing here
    trainer.train()

# Fetch and run tasks
while queue.fetch_and_run_task(init_func=init_function, func=run_function):
    pass

Multi-Process Example

import os
from taskcore import FileSystemTaskQueueClient

def main():
    # Use LOCAL_RANK for multi-process setups (e.g., with torch.distributed)
    rank = int(os.environ.get("LOCAL_RANK", 0))
    queue = FileSystemTaskQueueClient("/path/to/tasks", rank=rank)
    
    def init_func(**config):
        # Initialize your distributed training setup
        trainer = create_distributed_trainer(**config)
        return trainer, {"wandb_link": trainer.wandb_link}
    
    def run_func(trainer):
        trainer.train()
    
    # Process tasks until none are available
    while queue.fetch_and_run_task(init_func=init_func, func=run_func):
        pass

if __name__ == "__main__":
    main()

Task Generation Example

from taskcore import FileSystemTaskQueueClient

# Initialize queue
queue = FileSystemTaskQueueClient("/path/to/task/directory")

# Define hyperparameter sweep
learning_rates = [1e-4, 1e-5, 1e-6]
batch_sizes = [16, 32, 64]
model_names = ["bert-base-uncased", "roberta-base"]

# Generate all combinations
for lr in learning_rates:
    for batch_size in batch_sizes:
        for model_name in model_names:
            config = {
                "learning_rate": lr,
                "batch_size": batch_size,
                "model_name": model_name,
                "epochs": 100,
                "wandb_project": "my-experiment"
            }
            
            task_id = queue.add_task(config)
            print(f"Added task {task_id}: {config}")

API Reference

FileSystemTaskQueueClient

The main client class for interacting with the task queue.

Constructor

FileSystemTaskQueueClient(base_dir: str, rank: int = 0)
  • base_dir: Directory where task files are stored
  • rank: Process rank for multi-process setups (default: 0)

Methods

add_task(task_dict: Dict) -> str

Add a new task to the queue.

task_id = queue.add_task({"param": "value"})
fetch_and_run_task(init_func: Callable, func: Callable) -> bool

Fetch a task, initialize it with init_func, and run it with func.

def init_func(**config):
    return trainer, extra_info

def run_func(trainer):
    trainer.train()

success = queue.fetch_and_run_task(init_func, run_func)
fetch_task() -> Tuple[str, Dict]

Fetch the next available task.

task_id, config = queue.fetch_task()
finish_current_task()

Mark the current task as completed.

release_current_task()

Release the current task back to the pending queue.

edit_current_task(new_dict: Dict)

Update the current task's configuration.

get_current_task() -> Dict

Get the configuration of the current task.

has_task() -> bool

Check if a task is currently being processed.

Directory Structure

TaskCore creates the following directory structure:

base_dir/
├── pending/     # Tasks waiting to be processed
├── running/     # Tasks currently being processed
└── finished/    # Completed tasks

Environment Variables

TaskCore uses these environment variables for distributed processing:

  • SLURM_JOB_ID: Job ID from SLURM scheduler
  • MY_JOB_ID: Fallback job ID if SLURM_JOB_ID is not available
  • LOCAL_RANK: Process rank in multi-process setups

Error Handling

Tasks that fail during execution are automatically released back to the pending queue:

try:
    queue.fetch_and_run_task(init_func, run_func)
except Exception as e:
    print(f"Task failed: {e}")
    # Task is automatically released back to pending queue

Stale Task Recovery

TaskCore automatically reclaims stale tasks (default: 4 hours timeout):

# Custom timeout in seconds
queue.queue.reclaim_stale_tasks(timeout_seconds=3600)  # 1 hour

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytaskcore-0.0.2.tar.gz (7.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pytaskcore-0.0.2-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file pytaskcore-0.0.2.tar.gz.

File metadata

  • Download URL: pytaskcore-0.0.2.tar.gz
  • Upload date:
  • Size: 7.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.15

File hashes

Hashes for pytaskcore-0.0.2.tar.gz
Algorithm Hash digest
SHA256 8924a9e1f601876d32ff9d0455bcad79c86ce9138cd97684758cc6b769e6527b
MD5 2a63c4c43e3b0d6c43118edae11d3ce2
BLAKE2b-256 09d36edc5bc139999b2d7fc290497c5543cc1eb5f3eb7eeb2182b08aa19d5601

See more details on using hashes here.

File details

Details for the file pytaskcore-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: pytaskcore-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.15

File hashes

Hashes for pytaskcore-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d0df8b69063b9042abb6a997de26955612c2ac1975b15543c1c11d7b79570fa0
MD5 5e7bbcea60530e78dcf6f90654b56cbe
BLAKE2b-256 729f8095f185c929b5d2e7cabcc0ce6912d097aa91f061a095fcf175ab4909aa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page