A lightweight task processing library written in pure Python
Project description
taskproc
A Python library for building/executing/managing task pipelines that is
- Minimal yet sufficient for general use.
- Focused on the composability of tasks for building large/complex pipelines effortlessly.
Features
- User defines a long and complex computation by composing shorter and simpler units of work, i.e., tasks.
taskproc
automatically executes them in a distributed way, supporting multithreading/multiprocessing and third-party job-controlling commands such asjbsub
anddocker
. It also creates/reuses/discards a cache per task automatically.- Full type-hinting support.
Nonfeatures
- Periodic scheduling
- Automatic retry
- External service integration (GCP, AWS, ...)
- Graphical user interface
Installation
pip install taskproc
Documentation
Basics
We define a task by class.
from taskproc import Task, Const, Cache
class Choose(Task):
""" Compute the binomial coefficient. """
def __init__(self, n: int, k: int):
# Declaration of the upstream tasks.
# Any instance of `Task` registered as an attirbute is considered as an upstream task.
if 0 < k < n:
self.left = Choose(n - 1, k - 1)
self.right = Choose(n - 1, k)
elif k == 0 or k == n:
# We can also set dummy tasks with their value already computed.
self.left = Const(0)
self.right = Const(1)
else:
raise ValueError(f'{(n, k)}')
def run_task(self) -> int:
# The main computation of the task, which is delayed until necessary.
# The return values of the prerequisite tasks are accessible via `.get_result()`.
return self.left.get_result() + self.right.get_result()
# A task pipeline is constructed with instantiation, which should be done inside the `Cache` context.
with Cache('./cache'): # Specifies the cache directory
task = Choose(6, 3) # Builds a pipeline to compute 6 chooses 3.
# Use the `run_graph()` method to run the pipeline.
ans, stats = task.run_graph() # `ans` should be 6 chooses 3, which is 20. `stats` is the execution statistics.
Commandline Interface
taskproc
has a utility classmethod to run with commandline arguments, which is useful if all you need is to run a single task.
For example,
python -m taskproc /your/taskfile.py -o path/to/cache/directory # Calling with a script
python -m your.taskfile -o path/to/cache/directory # Calling with a module
# taskfile.py
from taskproc import Task, DefaultArguments
# ...
class Main(Task):
def __init__(self):
self.result = Choose(100, 50)
def run_task(self):
print(self.result.get_result())
# # Optionally you can configure default arguments.
# DefaultArguments(
# prefix={ ... },
# rate_limits={ ... },
# ...
# ).populate()
# # If the file is a module, one must call the entrypoint task explicitly.
# if __name__ == '__main__':
# Main.cli()
See also python -m taskproc your/script.py --help
or python -m your.module --help
for more details.
Futures and Task Composition
To be more precise, any attributes of a task implementing the Future
protocol are considered as upstream tasks.
For example, Task
s and Const
s are Future
s.
One can pass a future into the initialization of another task to compose the computation.
from taskproc import Future
class DownstreamTask(Task):
def __init__(self, upstream: Future[int], other_args: Any):
self.upstream = upstream # Register upstream task
...
class Main(Task):
def __init__(self):
self.composed = DownstreamTask(
upstream=UpstreamTaskProducingInt(),
...
)
FutureList
and FutureDict
can be used to aggregate multiple futures into one, allowing us to register a batch of upstream futures.
from taskproc import FutureList, FutureDict
class SummarizeScores(Task):
def __init__(self, task_dict: dict[str, Future[float]]):
self.score_list = FutureList([ScoringTask(i) for i in range(10)])
self.score_dict = FutureDict(task_dict)
def run_task(self) -> float:
# `.get_result()` evaluates `FutureList[T]` and `FutureDict[K, T]` into
# `list[T]` and `dict[K, T]`, respectively.
return sum(self.score_dict.get_result().values()) / len(self.score_dict.get_result())
If a future is wrapping a sequence or a mapping, one can directly access its element with the standard indexing operation.
The result is also a Future
.
class MultiOutputTask(Task):
def run_task(self) -> dict[str, int]:
return {'foo': 42, ...}
class DownstreamTask(Task):
def __init__(self):
self.dep = MultiOutputTask()['foo'] # type of Future[int]
Input and Output Specifications
In general, tasks can be initialized with any JSON serializable objects which may or may not contain futures. Any non-jsonable objects can be also passed, as the output of a task.
SomeTask(1, 'foo', bar={'baz': TaskProducingNonJsonableObj(), 'other': [1, 2, 3]})
On the other hand, the output of a task, i.e., the return value of the .run_task()
method, should be serializable with cloudpickle
.
Data Directories
Use task.task_directory
to get a fresh path dedicated to each task.
The directory is automatically created and managed along with the cache.
class TrainModel(Task):
def run_task(self) -> str:
...
model_path = self.task_directory / 'model.bin'
model.save(model_path)
return model_path
Task Label for Computational Resource Control
Each task class can be tagged with multiple labels. The task labels are useful to configure prefix commands and concurrency limits for controlling of computational resources.
class TaskUsingGPU(Task):
task_label = 'gpu'
...
class AnotherTaskUsingGPU(Task):
task_label = ['gpu', 'memory']
...
with Cache('./cache'):
# Label-wise prefix/concurrency control
SomeDownstreamTask().run_graph(
rate_limits={'gpu': 1, 'memory': 2}, # The number of tasks labeled with "gpu" running simultaneously is at most 1 (resp. "memory" is at most 2).
prefixes={'gpu': 'jbsub -wait -queue x86_1h -cores 16+1 -mem 64g'} # Tasks labeled with "gpu" is run with the job-dispatching command "jbsub ...".
)
Advanced Topics
Execution Policy Configuration
One can control the policy of parallelism with concurrent.futures.Executor
classes.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
class MyTask(Task):
...
with Cache('./cache'):
# Limit the number of parallel workers
MyTask().run_graph(executor=ProcessPoolExecutor(max_workers=2))
# Thread-based parallelism
MyTask().run_graph(executor=ThreadPoolExecutor())
Selective Cache Deletion
It is possible to selectively discard cache:
with Cache('./cache'):
# Selectively discard the cache of a specific task.
Choose(3, 3).clear_task()
# `ans` is recomputed tracing back to the computation of `Choose(3, 3)`.
ans, _ = Choose(6, 3).run_graph()
# Delete all the cache associated with `Choose`.
Choose.clear_all_tasks()
One can also manage caches directly from the disk location, i.e., ./cache
in the above.
Cache Compression
The task output is compressed with gzip
.
The compression level can be changed as follows (defaults to 9), trading the space efficiency with the time efficiency.
class NoCompressionTask(Task):
task_compress_level = 0
...
Built-in properties/methods
Below is the list of the built-in properties/methods of Task
. Do not override these attributes in the subclass.
Name | Owner | Type | Description |
---|---|---|---|
task_name |
class | property | String id of the task class |
task_id |
instance | property | Integer id of the task, unique within the same task class |
task_args |
instance | property | The arguments of the task in JSON |
task_directory |
instance | property | Path to the data directory of the task |
task_stdout |
instance | property | Path to the task's stdout |
task_stderr |
instance | property | Path to the task's stderr |
run_task |
instance | method | Run the task |
run_graph |
instance | method | Run the task after necessary upstream tasks and save the results in the cache |
get_result |
instance | method | Get the result of the task (fails if the result is missing) |
to_json |
instance | method | Serialize itself as a JSON dictionary |
clear_task |
instance | method | Clear the cache of the task instance |
clear_all_tasks |
class | method | Clear the cache of the task class |
cli |
class | method | run_graph with command line arguments |
TODO
- Potential bug
- Current task argument serialization is not ideal since JSON is mapping two different values into the same text representation (e.g., tuple and list). Consider using consistency check
x == json.loads(json.dumps(x))
, or redesign the format.
- Current task argument serialization is not ideal since JSON is mapping two different values into the same text representation (e.g., tuple and list). Consider using consistency check
- Enhancement
- Simple task graph visualizer.
- Pydantic/dataclass support in task arguments (as an incompatible, but better-UX object with TypedDict).
- Dynamic prefix generation with prefix template (e.g., for specifying the log locations).
- Remove
DiscCache
and use `shelve. - Use methods in TaskWorker directly.
- TaskConfig should be accessible outside Cache context.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for taskproc-0.21.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7ecb8b144742f2f74bd94124ecd1c6b0eb9b6838b6f886eac366bcc33a2b548c |
|
MD5 | 55e41d90c11547fceedb5825336b6537 |
|
BLAKE2b-256 | df1b385f84bfae8ec77769a3c0e7a48ea56234085bc19b76c97c0660c24efdfc |