Skip to main content

Parallel task graph framework

Project description

TaskGraph is a library that was developed to help manage complicated computational software pipelines consisting of long running individual tasks. Many of these tasks could be executed in parallel, almost all of them wrote results to disk, and many times results could be reused from part of the pipeline. TaskGraph manages all of this for you. With it you can schedule tasks with dependencies, avoid recomputing results that have already been computed, and allot multiple CPU cores to execute tasks in parallel if desired.

TaskGraph Dependencies

Task Graph is written in pure Python, but if the psutils package is installed the distributed multiprocessing processes will be niced.

Example Use

Install TaskGraph with

pip install taskgraph

Then

import os
import pickle
import logging

import taskgraph

logging.basicConfig(level=logging.DEBUG)

def _create_list_on_disk(value, length, target_path):
    """Create a numpy array on disk filled with value of `size`."""
    target_list = [value] * length
    pickle.dump(target_list, open(target_path, 'wb'))


def _sum_lists_from_disk(list_a_path, list_b_path, target_path):
    """Read two lists, add them and save result."""
    list_a = pickle.load(open(list_a_path, 'rb'))
    list_b = pickle.load(open(list_b_path, 'rb'))
    target_list = []
    for a, b in zip(list_a, list_b):
        target_list.append(a+b)
    pickle.dump(target_list, open(target_path, 'wb'))

# create a taskgraph that uses 4 multiprocessing subprocesses when possible
if __name__ == '__main__':
    workspace_dir = 'workspace'
    task_graph = taskgraph.TaskGraph(workspace_dir, 4)
    target_a_path = os.path.join(workspace_dir, 'a.dat')
    target_b_path = os.path.join(workspace_dir, 'b.dat')
    result_path = os.path.join(workspace_dir, 'result.dat')
    result_2_path = os.path.join(workspace_dir, 'result2.dat')
    value_a = 5
    value_b = 10
    list_len = 10
    task_a = task_graph.add_task(
        func=_create_list_on_disk,
        args=(value_a, list_len, target_a_path),
        target_path_list=[target_a_path])
    task_b = task_graph.add_task(
        func=_create_list_on_disk,
        args=(value_b, list_len, target_b_path),
        target_path_list=[target_b_path])
    sum_task = task_graph.add_task(
        func=_sum_lists_from_disk,
        args=(target_a_path, target_b_path, result_path),
        target_path_list=[result_path],
        dependent_task_list=[task_a, task_b])

    task_graph.close()
    task_graph.join()

    # expect that result is a list `list_len` long with `value_a+value_b` in it
    result = pickle.load(open(result_path, 'rb'))

Caveats

  • Taskgraph’s default method of checking whether a file has changed (hash_algorithm='sizetimestamp') uses the filesystem’s modification timestamp, interpreted in integer nanoseconds. This check is only as accurate as the filesystem’s timestamp. For example:

    • FAT and FAT32 timestamps have a 2-second modification timestamp resolution

    • exFAT has a 10 millisecond timestamp resolution

    • NTFS has a 100 nanosecond timestamp resolution

    • HFS+ has a 1 second timestamp resolution

    • APFS has a 1 nanosecond timestamp resolution

    • ext3 has a 1 second timestamp resolution

    • ext4 has a 1 nanosecond timestamp resolution

    If you suspect timestamp resolution to be an issue on your filesystem, you may wish to store your files on a filesystem with more accurate timestamps or else consider using a different hash_algorithm.

Running Tests

Taskgraph includes a tox configuration for automating builds across multiple python versions and whether psutil is installed. To execute all tests on all platforms, run:

$ tox

Alternatively, if you’re only trying to run tests on a single configuration (say, python 3.7 without psutil), you’d run:

$ tox -e py37

Or if you’d like to run the tests for the combination of Python 3.7 with psutil, you’d run:

$ tox -e py37-psutil

If you don’t have multiple python installations already available on your system, an easy way to accomplish this is to use tox-conda (https://github.com/tox-dev/tox-conda) which will use conda environments to manage the versions of python available:

$ pip install tox-conda
$ tox

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskgraph-0.11.1.tar.gz (42.4 kB view details)

Uploaded Source

Built Distribution

taskgraph-0.11.1-py3-none-any.whl (23.0 kB view details)

Uploaded Python 3

File details

Details for the file taskgraph-0.11.1.tar.gz.

File metadata

  • Download URL: taskgraph-0.11.1.tar.gz
  • Upload date:
  • Size: 42.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.13

File hashes

Hashes for taskgraph-0.11.1.tar.gz
Algorithm Hash digest
SHA256 536cf4fc4dde6ae9f953363b52917f3eb961313178053694a154d872b5f3fc3d
MD5 dd003c8c598f631eb3a5f367605a791b
BLAKE2b-256 1fa60e8b2eaaf5f2d307e60a93b75f7df586ae59fa44a8428b690a06678cf28e

See more details on using hashes here.

File details

Details for the file taskgraph-0.11.1-py3-none-any.whl.

File metadata

  • Download URL: taskgraph-0.11.1-py3-none-any.whl
  • Upload date:
  • Size: 23.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.13

File hashes

Hashes for taskgraph-0.11.1-py3-none-any.whl
Algorithm Hash digest
SHA256 32f4c98f89d06a210ab473d14c03fd807543c469e2b6ac191376d4b617ff675c
MD5 0c4aa094ff6f6b176781989c0b8b0780
BLAKE2b-256 27c4b2b88d64a6b369fd9869725e32ee45770741c4e93a8365915918a20dfaeb

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page