Skip to main content

A lightweight task processing library written in pure Python

Project description

taskproc

A lightweight workflow building/execution/management tool written in pure Python. Internally, it depends on DiskCache, cloudpickle networkx and concurrent.futures.

Features

  • Decomposing long and complex computation into tasks, i.e., smaller units of work.
  • Executing them in a distributed way, supporting multithreading, multiprocessing, local clusters/containers.
  • Creating/discarding caches per task and reusing them whenever possible.

Nonfeatures

  • Automatic retry
  • External service integration with remote clusters/containers or cloud platforms (GCP, AWS, ...)
  • Periodic scheduling
  • GUI dashboard

Installation

pip install taskproc

Usage

Basic usage

Workflow is a directed acyclic graph (DAG) of tasks, and task is a unit of work represented with a class. Here is an example.

from taskproc import TaskBase, Req, Requires, Const

# Define a task and **its entire upstream workflow** with a class definition.
# Inheriting `TaskBase` is necesary, as it takes care of all the work storing and reusing the result and tracking the dependencies.
# `infer_task_type` decorator helps the type checker to infer the types of the task class. (optional)
@infer_task_type
class Choose(TaskBase):
    """ Compute the binomial coefficient. """
    # Inside a task, we first declare the values that must be computed upstream with the descriptor `Req`.
    # In this example, `Choose(n, k)` depends on `Choose(n - 1, k - 1)` and `Choose(n - 1, k)`,
    # so it requires two `int` values.
    prev1: Requires[int]
    prev2: Requires[int]

    def build_task(self, n: int, k: int):
        # The prerequisite tasks and the other instance attributes are prepared here.
        # It thus recursively defines all the tasks we need to run this task,
        # i.e., the entire upstream workflow.

        if 0 < k < n:
            self.prev1 = Choose(n - 1, k - 1)
            self.prev2 = Choose(n - 1, k)
        elif k == 0 or k == n:
            # We can just pass a value to a requirement slot directly without running tasks.
            self.prev1 = Const(0)
            self.prev2 = Const(1)
        else:
            raise ValueError(f'{(n, k)}')

    def run_task(self) -> int:
        # Here we define the main computation of the task,
        # which is delayed until it is necessary.

        # The return values of the prerequisite tasks are accessible via the descriptors:
        return self.prev1 + self.prev2

# To run the task as well as upstream workflow, use the `run_graph()` method.
ans = Choose(6, 3).run_graph()  # `ans` should be 6 Choose 3, which is 20.

# It greedily executes all the necessary tasks as parallel as possible
# and then spits out the return value of the task on which we call `run_graph()`.
# The return values of the intermediate tasks are cached at
# `{$CP_CACHE_DIR:-./.cache}/taskproc/{module_name}.{task_name}/results/...`
# and reused on the fly whenever possible.

Deleting cache

It is possible to selectively discard cache:

# After some modificaiton of `Choose(3, 3)`,
# selectively discard the cache corresponding to the modification.
Choose(3, 3).clear_task()

# `ans` is recomputed tracing back to the computation of `Choose(3, 3)`.
ans = Choose(6, 3).run_graph()

# Delete all the cache associated with `Choose`,
# equivalent to `rm -r {$CP_CACHE_DIR:-./.cache}/taskproc/{module_name}.Choose`.
Choose.clear_all_tasks()            

Task IO

The arguments of the build_task method can be anything JSON serializable including Tasks:

class MyTask(TaskBase):
    def build_task(self, param1, param2):
        ...

MyTask(
    param1={
        'upstream_task0': UpstreamTask(),
        'other_params': [1, 2],
        ...
    },
    param2={ ... }
}).run_graph()

The output of the run_task method should be serializable with cloudpickle, which is then compressed with gzip. The compression level can be changed as follows (defaults to 9).

class NoCompressionTask(TaskBase, compress_level=0):
    ...

If the output is a dictionary, one can directly access its element:

class MultiOutputTask(TaskBase):
    def run_task(self) -> dict[str, int]:
        return {'foo': 42, ...}

class DownstreamTask(TaskBase):
    dep: Requires[int]

    def build_task(self):
        self.dep = MultiOutputTask()['foo']

Job scheduling and prefixes

To run task on job schedulers, one can add prefix to the call of task.

class TaskWithJobScheduler(TaskBase, job_prefix=['jbsub', '-interactive', '-tty', '-queue x86_1h', '-cores 16+1', '-mem 64g']):
    ...

Data directories

Use task.task_directory to get a fresh path dedicated to each task. The directory is automatically created at {$CP_CACHE_DIR:-./.cache}/taskproc/{module_name}.{task_name}/data/{task_id} and the contents of the directory are cleared at each task call and persist until the task is cleared.

class TrainModel(TaskBase):
    def run_task(self) -> str:
        ...
        model_path = self.task_directory / 'model.bin'
        model.save(model_path)
        return model_path

Execution policy configuration

One can control the task execution with concurrent.futures.Executor class:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

class MyTask(TaskBase):
    ...

# Limit the number of parallel workers
MyTask().run_graph(executor=ProcessPoolExecutor(max_workers=2))

# Thread-based parallelism
MyTask().run_graph(executor=ThreadPoolExecutor())

One can also control the concurrency at a task/channel level:

class TaskUsingGPU(TaskBase, channel='<gpu>'):
    ...

class AnotherTaskUsingGPU(TaskBase, channel=['<gpu>', '<memory>']):
    ...

# Queue-level concurrency control
SomeDownstreamTask().run_graph(rate_limits={'<gpu>': 1})
SomeDownstreamTask().run_graph(rate_limits={'<memory>': 1})

# Task-level concurrency control
SomeDownstreamTask().run_graph(rate_limits={TaskUsingGPU.task_name: 1})

Commandline tool

We can use taskproc from commandline like python -m taskproc.app path/to/taskfile.py, where taskfile.py defines the Main task as follows:

# taskfile.py

class Main(TaskBase):
    ...

The command runs the Main() task and stores the cache right next to taskfile.py as .cache/taskproc/.... Please refer to python -m taskproc.app --help for more info.

Other useful properties

  • TaskBase.task_id: An integer id for each task
  • TaskBase.task_args: The argument of the task
  • TaskBase.task_stdout: path to the task's stdout
  • TaskBase.task_stderr: Path to the task's stderr

TODO

  • Task graph visualizer

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskproc-0.11.2.tar.gz (15.5 kB view hashes)

Uploaded Source

Built Distribution

taskproc-0.11.2-py3-none-any.whl (14.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page