A lightweight task processing library written in pure Python
Project description
taskproc
A lightweight workflow building/execution/management tool written in pure Python.
Internally, it depends on DiskCache
, cloudpickle
networkx
and concurrent.futures
.
Features
- Decomposing long and complex computation into tasks, i.e., smaller units of work.
- Executing them in a distributed way, supporting multithreading, multiprocessing, local clusters/containers.
- Creating/discarding caches per task and reusing them whenever possible.
Nonfeatures
- Automatic retry
- External service integration with remote clusters/containers or cloud platforms (GCP, AWS, ...)
- Periodic scheduling
- GUI dashboard
Installation
pip install taskproc
Usage
Basic usage
Workflow is a directed acyclic graph (DAG) of tasks, and task is a unit of work represented with a class. Here is an example.
from taskproc import TaskBase, Req, Requires, Const
# Define a task and **its entire upstream workflow** with a class definition.
# Inheriting `TaskBase` is necesary, as it takes care of all the work storing and reusing the result and tracking the dependencies.
# `infer_task_type` decorator helps the type checker to infer the types of the task class. (optional)
@infer_task_type
class Choose(TaskBase):
""" Compute the binomial coefficient. """
# Inside a task, we first declare the values that must be computed upstream with the descriptor `Req`.
# In this example, `Choose(n, k)` depends on `Choose(n - 1, k - 1)` and `Choose(n - 1, k)`,
# so it requires two `int` values.
prev1: Requires[int]
prev2: Requires[int]
def build_task(self, n: int, k: int):
# The prerequisite tasks and the other instance attributes are prepared here.
# It thus recursively defines all the tasks we need to run this task,
# i.e., the entire upstream workflow.
if 0 < k < n:
self.prev1 = Choose(n - 1, k - 1)
self.prev2 = Choose(n - 1, k)
elif k == 0 or k == n:
# We can just pass a value to a requirement slot directly without running tasks.
self.prev1 = Const(0)
self.prev2 = Const(1)
else:
raise ValueError(f'{(n, k)}')
def run_task(self) -> int:
# Here we define the main computation of the task,
# which is delayed until it is necessary.
# The return values of the prerequisite tasks are accessible via the descriptors:
return self.prev1 + self.prev2
# To run the task as well as upstream workflow, use the `run_graph()` method.
ans = Choose(6, 3).run_graph() # `ans` should be 6 Choose 3, which is 20.
# It greedily executes all the necessary tasks as parallel as possible
# and then spits out the return value of the task on which we call `run_graph()`.
# The return values of the intermediate tasks are cached at
# `{$CP_CACHE_DIR:-./.cache}/taskproc/{module_name}.{task_name}/results/...`
# and reused on the fly whenever possible.
Deleting cache
It is possible to selectively discard cache:
# After some modificaiton of `Choose(3, 3)`,
# selectively discard the cache corresponding to the modification.
Choose(3, 3).clear_task()
# `ans` is recomputed tracing back to the computation of `Choose(3, 3)`.
ans = Choose(6, 3).run_graph()
# Delete all the cache associated with `Choose`,
# equivalent to `rm -r {$CP_CACHE_DIR:-./.cache}/taskproc/{module_name}.Choose`.
Choose.clear_all_tasks()
Task IO
The arguments of the build_task
method can be anything JSON serializable including Task
s:
class MyTask(TaskBase):
def build_task(self, param1, param2):
...
MyTask(
param1={
'upstream_task0': UpstreamTask(),
'other_params': [1, 2],
...
},
param2={ ... }
}).run_graph()
The output of the run_task
method should be serializable with cloudpickle
,
which is then compressed with gzip
.
The compression level can be changed as follows (defaults to 9).
class NoCompressionTask(TaskBase, compress_level=0):
...
If the output is a dictionary, one can directly access its element:
class MultiOutputTask(TaskBase):
def run_task(self) -> dict[str, int]:
return {'foo': 42, ...}
class DownstreamTask(TaskBase):
dep: Requires[int]
def build_task(self):
self.dep = MultiOutputTask()['foo']
Job scheduling and prefixes
To run task on job schedulers, one can add prefix to the call of task.
class TaskWithJobScheduler(TaskBase, job_prefix=['jbsub', '-interactive', '-tty', '-queue x86_1h', '-cores 16+1', '-mem 64g']):
...
Data directories
Use task.task_directory
to get a fresh path dedicated to each task.
The directory is automatically created at
{$CP_CACHE_DIR:-./.cache}/taskproc/{module_name}.{task_name}/data/{task_id}
and the contents of the directory are cleared at each task call and persist until the task is cleared.
class TrainModel(TaskBase):
def run_task(self) -> str:
...
model_path = self.task_directory / 'model.bin'
model.save(model_path)
return model_path
Execution policy configuration
One can control the task execution with concurrent.futures.Executor
class:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
class MyTask(TaskBase):
...
# Limit the number of parallel workers
MyTask().run_graph(executor=ProcessPoolExecutor(max_workers=2))
# Thread-based parallelism
MyTask().run_graph(executor=ThreadPoolExecutor())
One can also control the concurrency at a task/channel level:
class TaskUsingGPU(TaskBase, channel='<gpu>'):
...
class AnotherTaskUsingGPU(TaskBase, channel=['<gpu>', '<memory>']):
...
# Queue-level concurrency control
SomeDownstreamTask().run_graph(rate_limits={'<gpu>': 1})
SomeDownstreamTask().run_graph(rate_limits={'<memory>': 1})
# Task-level concurrency control
SomeDownstreamTask().run_graph(rate_limits={TaskUsingGPU.task_name: 1})
Commandline tool
We can use taskproc from commandline like python -m taskproc.app path/to/taskfile.py
, where taskfile.py
defines the Main
task as follows:
# taskfile.py
class Main(TaskBase):
...
The command runs the Main()
task and stores the cache right next to taskfile.py
as .cache/taskproc/...
.
Please refer to python -m taskproc.app --help
for more info.
Other useful properties
TaskBase.task_id
: An integer id for each taskTaskBase.task_args
: The argument of the taskTaskBase.task_stdout
: path to the task's stdoutTaskBase.task_stderr
: Path to the task's stderr
TODO
- Task graph visualizer
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for taskproc-0.11.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 76e8b0c7bdc125fb6bf24fd8ddd5c64b1ebd3efa18c7f28f9804630b7e145203 |
|
MD5 | d9991ab2ab3163f5ef2e4dcfc95a5d0c |
|
BLAKE2b-256 | 267d8e86b20c1d64acd0115a5b95b3cf89c75d0500575cdc644404e7d72011bd |