Parallel task graph framework
Project description
TaskGraph is a library that was developed to help manage complicated computational software pipelines consisting of long running individual tasks. Many of these tasks could be executed in parallel, almost all of them wrote results to disk, and many times results could be reused from part of the pipeline. TaskGraph manages all of this for you. With it you can schedule tasks with dependencies, avoid recomputing results that have already been computed, and allot multiple CPU cores to execute tasks in parallel if desired.
TaskGraph Dependencies
Task Graph is written in pure Python, but if the psutils package is installed the distributed multiprocessing processes will be niced.
Example Use
Install TaskGraph with
pip install taskgraph
Then
import os
import pickle
import logging
import taskgraph
logging.basicConfig(level=logging.DEBUG)
def _create_list_on_disk(value, length, target_path):
"""Create a numpy array on disk filled with value of `size`."""
target_list = [value] * length
pickle.dump(target_list, open(target_path, 'wb'))
def _sum_lists_from_disk(list_a_path, list_b_path, target_path):
"""Read two lists, add them and save result."""
list_a = pickle.load(open(list_a_path, 'rb'))
list_b = pickle.load(open(list_b_path, 'rb'))
target_list = []
for a, b in zip(list_a, list_b):
target_list.append(a+b)
pickle.dump(target_list, open(target_path, 'wb'))
# create a taskgraph that uses 4 multiprocessing subprocesses when possible
if __name__ == '__main__':
workspace_dir = 'workspace'
task_graph = taskgraph.TaskGraph(workspace_dir, 4)
target_a_path = os.path.join(workspace_dir, 'a.dat')
target_b_path = os.path.join(workspace_dir, 'b.dat')
result_path = os.path.join(workspace_dir, 'result.dat')
result_2_path = os.path.join(workspace_dir, 'result2.dat')
value_a = 5
value_b = 10
list_len = 10
task_a = task_graph.add_task(
func=_create_list_on_disk,
args=(value_a, list_len, target_a_path),
target_path_list=[target_a_path])
task_b = task_graph.add_task(
func=_create_list_on_disk,
args=(value_b, list_len, target_b_path),
target_path_list=[target_b_path])
sum_task = task_graph.add_task(
func=_sum_lists_from_disk,
args=(target_a_path, target_b_path, result_path),
target_path_list=[result_path],
dependent_task_list=[task_a, task_b])
task_graph.close()
task_graph.join()
# expect that result is a list `list_len` long with `value_a+value_b` in it
result = pickle.load(open(result_path, 'rb'))
Caveats
Taskgraph’s default method of checking whether a file has changed (hash_algorithm='sizetimestamp') uses the filesystem’s modification timestamp, interpreted in integer nanoseconds. This check is only as accurate as the filesystem’s timestamp. For example:
FAT and FAT32 timestamps have a 2-second modification timestamp resolution
exFAT has a 10 millisecond timestamp resolution
NTFS has a 100 nanosecond timestamp resolution
HFS+ has a 1 second timestamp resolution
APFS has a 1 nanosecond timestamp resolution
ext3 has a 1 second timestamp resolution
ext4 has a 1 nanosecond timestamp resolution
If you suspect timestamp resolution to be an issue on your filesystem, you may wish to store your files on a filesystem with more accurate timestamps or else consider using a different hash_algorithm.
Running Tests
Taskgraph includes a tox configuration for automating builds across multiple python versions and whether psutil is installed. To execute all tests on all platforms, run:
$ tox
Alternatively, if you’re only trying to run tests on a single configuration (say, python 3.7 without psutil), you’d run:
$ tox -e py37
Or if you’d like to run the tests for the combination of Python 3.7 with psutil, you’d run:
$ tox -e py37-psutil
If you don’t have multiple python installations already available on your system, an easy way to accomplish this is to use tox-conda (https://github.com/tox-dev/tox-conda) which will use conda environments to manage the versions of python available:
$ pip install tox-conda $ tox
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file taskgraph-0.11.1.tar.gz
.
File metadata
- Download URL: taskgraph-0.11.1.tar.gz
- Upload date:
- Size: 42.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 536cf4fc4dde6ae9f953363b52917f3eb961313178053694a154d872b5f3fc3d |
|
MD5 | dd003c8c598f631eb3a5f367605a791b |
|
BLAKE2b-256 | 1fa60e8b2eaaf5f2d307e60a93b75f7df586ae59fa44a8428b690a06678cf28e |
File details
Details for the file taskgraph-0.11.1-py3-none-any.whl
.
File metadata
- Download URL: taskgraph-0.11.1-py3-none-any.whl
- Upload date:
- Size: 23.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 32f4c98f89d06a210ab473d14c03fd807543c469e2b6ac191376d4b617ff675c |
|
MD5 | 0c4aa094ff6f6b176781989c0b8b0780 |
|
BLAKE2b-256 | 27c4b2b88d64a6b369fd9869725e32ee45770741c4e93a8365915918a20dfaeb |