A lightweight task processing library written in pure Python
Project description
taskproc
A Python library for building/executing/managing task pipelines that is
- Minimal yet sufficient for general use.
- Focused on the composability of tasks for building large/complex pipelines effortlessly.
Features
- User defines a long and complex computation by composing shorter and simpler units of work, i.e., tasks.
taskproc
automatically executes them in a distributed way, supporting multithreading/multiprocessing and third-party job-controlling commands such asjbsub
anddocker
. It also creates/reuses/discards a cache per task automatically.- Full type-hinting support.
Nonfeatures
- Periodic scheduling
- Automatic retry
- External service integration (GCP, AWS, ...)
- Graphical user interface
Installation
pip install taskproc
Documentation
Basics
We define a task by class.
from taskproc import Task, Const, Graph
class Choose(Task):
""" Compute the binomial coefficient. """
def __init__(self, n: int, k: int):
# Declaration of the upstream tasks.
# Any instance of `Task` registered as an attirbute is considered as an upstream task.
if 0 < k < n:
self.left = Choose(n - 1, k - 1)
self.right = Choose(n - 1, k)
elif k == 0 or k == n:
# We can also set dummy tasks with their value already computed.
self.left = Const(0)
self.right = Const(1)
else:
raise ValueError(f'{(n, k)}')
def run_task(self) -> int:
# The main computation of the task, which is delayed until necessary.
# The return values of the prerequisite tasks are accessible via `.get_result()`.
return self.left.get_result() + self.right.get_result()
# A task pipeline is constructed with instantiation, which should be done inside the `Graph` context.
with Graph('./cache'): # Specifies the directory to store the cache.
task = Choose(6, 3) # Builds a pipeline to compute 6 chooses 3.
# Use the `run_graph()` method to run the pipeline.
ans, stats = task.run_graph() # `ans` should be 6 chooses 3, which is 20. `stats` is the execution statistics.
Commandline Interface
taskproc
has a utility classmethod to run with commandline arguments, which is useful if all you need is to run a single task.
For example, if you have
# taskfile.py
from taskproc import Task, DefaultCliArguments
# ...
class Main(Task):
def __init__(self):
self.result = Choose(100, 50)
def run_task(self):
print(self.result.get_result())
# Optionally you can configure default CLI arguments.
DefaultCliArguments(
# ...
).populate()
Then Main
task can be run with CLI:
taskproc /path/to/taskfile.py -o /path/to/cache/directory
or
python -m taskproc /path/to/taskfile.py -o /path/to/cache/directory
Besides, if you have the entrypoint inside some module, you can run it with
# taskfile.py
...
class Main(Task):
...
# Must call the entrypoint explicitly.
if __name__ == '__main__':
Main.cli()
and
python -m module.path.to.taskfile -o /path/to/cache/directory
See also taskproc /path/to/taskfile.py --help
or python -m module.path.to.taskfile --help
for more details.
Futures and Task Composition
To be more precise, any attributes of a task implementing the Future
protocol are considered as upstream tasks.
For example, Task
s and Const
s are Future
s.
One can pass a future into the initialization of another task to compose the computation.
from taskproc import Future
class DownstreamTask(Task):
def __init__(self, upstream: Future[int], other_args: Any):
self.upstream = upstream # Register upstream task
...
class Main(Task):
def __init__(self):
self.composed = DownstreamTask(
upstream=UpstreamTaskProducingInt(),
...
)
FutureList
and FutureDict
can be used to aggregate multiple futures into one, allowing us to register a batch of upstream futures.
from taskproc import FutureList, FutureDict
class SummarizeScores(Task):
def __init__(self, task_dict: dict[str, Future[float]]):
self.score_list = FutureList([ScoringTask(i) for i in range(10)])
self.score_dict = FutureDict(task_dict)
def run_task(self) -> float:
# `.get_result()` evaluates `FutureList[T]` and `FutureDict[K, T]` into
# `list[T]` and `dict[K, T]`, respectively.
return sum(self.score_dict.get_result().values()) / len(self.score_dict.get_result())
If a future is wrapping a sequence or a mapping, one can directly access its element with the standard indexing operation.
The result is also a Future
.
class MultiOutputTask(Task):
def run_task(self) -> dict[str, int]:
return {'foo': 42, ...}
class DownstreamTask(Task):
def __init__(self):
self.dep = MultiOutputTask()['foo'] # type of Future[int]
Input and Output Specifications
In general, tasks can be initialized with any JSON serializable objects which may or may not contain futures. Any non-jsonable objects can be also passed, as the output of a task.
SomeTask(1, 'foo', bar={'baz': TaskProducingNonJsonableObj(), 'other': [1, 2, 3]})
On the other hand, the output of a task, i.e., the return value of the .run_task()
method, should be serializable with cloudpickle
.
Data Directories
Use task.task_directory
to get a fresh path dedicated to each task.
The directory is automatically created and managed along with the cache.
class TrainModel(Task):
def run_task(self) -> str:
...
model_path = self.task_directory / 'model.bin'
model.save(model_path)
return model_path
Task Label for Computational Resource Control
Each task class can be tagged with multiple labels. The task labels are useful to configure prefix commands and concurrency limits for controlling of computational resources.
class TaskUsingGPU(Task):
task_label = 'gpu'
...
class AnotherTaskUsingGPU(Task):
task_label = ['gpu', 'memory']
...
with Graph('./cache'):
# Label-wise prefix/concurrency control
SomeDownstreamTask().run_graph(
# The number of tasks labeled with "gpu" running simultaneously is at most 2 (resp. "memory" is at most 1).
rate_limits={
'gpu': 2,
'memory': 1,
TaskUsingGPU.task_name: 5, # Each task is also labeld with `cls.task_name` by default.
},
prefixes={
# Task labeled with "gpu" is run with the job-dispatching command "jbsub ...".
# Left labels prevail in the prefix collision.
'gpu': 'jbsub -wait -queue x86_1h -cores 16+1 -mem 64g'
}
)
Advanced Topics
Execution Policy Configuration
One can control the policy of parallelism with concurrent.futures.Executor
classes.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
class MyTask(Task):
...
with Graph('./cache'):
# Limit the number of parallel workers
MyTask().run_graph(executor=ProcessPoolExecutor(max_workers=2))
# Thread-based parallelism
MyTask().run_graph(executor=ThreadPoolExecutor())
Selective Cache Deletion
It is possible to selectively discard cache:
with Graph('./cache'):
# Selectively discard the cache of a specific task.
Choose(3, 3).clear_task()
# `ans` is recomputed tracing back to the computation of `Choose(3, 3)`.
ans, _ = Choose(6, 3).run_graph()
# Delete all the cache associated with `Choose`.
Choose.clear_all_tasks()
One can also manage caches directly from the disk location, i.e., ./cache
in the above.
Cache Compression
The task output is compressed with gzip
.
The compression level can be changed as follows (defaults to 9), trading the space efficiency with the time efficiency.
class NoCompressionTask(Task):
task_compress_level = 0
...
Built-in properties/methods
Below is the list of the built-in attributes/properties/methods of Task
. Do not override these attributes in the subclass.
Name | Owner | Type | Description |
---|---|---|---|
run_task |
instance | method | Run the task |
task_name |
class | property | String id of the task class |
task_directory |
instance | property | Path to the data directory of the task |
run_graph |
instance | method | Run the task after necessary upstream tasks and save the results in the cache |
cli |
class | method | run_graph with command line arguments |
clear_task |
instance | method | Clear the cache of the task instance |
clear_all_tasks |
class | method | Clear the cache of the task class |
get_task_config |
class | method | Get task config from the current graph |
task_worker |
instance | attribute | Task worker of instance |
task_config |
instance | attribute | Task config of instance |
task_compress_level |
instance | attribute | Compression level of instance |
task_label |
instance | attribute | Label of instance |
get_result |
instance | method | Get the result of the task (fails if the result is missing) |
to_json |
instance | method | Serialize itself as a JSON dictionary |
get_workers |
instance | method | Get the dictionary of the workers |
Browsing caches
Show the whole task dependency tree:
tree -l /<path_to_cache_directory>/<task_name>/results/<root_task_id>
Show finished tasks only:
tree -l -P result.pkl.gz --prune /<path_to_cache_directory>/<task_name>/results/<root_task_id>
Show finished tasks + running tasks:
tree -l -P *.txt --prune /<path_to_cache_directory>/<task_name>/results/<root_task_id>
TODO
-
Known issue
- Current task argument serialization is not ideal since JSON is mapping two different values into the same text representation (e.g., tuple and list). Consider using consistency check
x == json.loads(json.dumps(x))
, or redesign the format.
- Current task argument serialization is not ideal since JSON is mapping two different values into the same text representation (e.g., tuple and list). Consider using consistency check
-
Feature enhancement
- Error handling policy (eager/lazy) as an argument.
- Task-state tracker.
- Simple task graph visualizer.
- Pydantic/dataclass support in task arguments (as an incompatible, but better-UX object with TypedDict).
- Dynamic prefix generation with prefix template (e.g., for specifying the log locations).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for taskproc-0.25.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6939dd7b833c1d02503544e31c782ca396a42ab44890c63fc008d64bca46ad2e |
|
MD5 | 04e883ab0aae124cfa06872db7d8a9ef |
|
BLAKE2b-256 | c68e23fc53c1fb6069f29df782275d1282ba2c74983c551db4c8120d5cc21144 |