Skip to main content

Parallel task graph framework.

Project description

About TaskGraph

TaskGraph is a library that was developed to help manage complicated computational software pipelines consisting of long running individual tasks. Many of these tasks could be executed in parallel, almost all of them wrote results to disk, and many times results could be reused from part of the pipeline. TaskGraph manages all of this for you. With it you can schedule tasks with dependencies, avoid recomputing results that have already been computed, and allot multiple CPU cores to execute tasks in parallel if desired.

TaskGraph Dependencies

Task Graph is written in pure Python, but if the psutils package is installed the distributed multiprocessing processes will be niced.

Example Use

Install TaskGraph with

pip install taskgraph

Then

def _create_list_on_disk(value, length, target_path):
    """Create a numpy array on disk filled with value of `size`."""
    target_list = [value] * length
    pickle.dump(target_list, open(target_path, 'wb'))


def _sum_lists_from_disk(list_a_path, list_b_path, target_path):
    """Read two lists, add them and save result."""
    list_a = pickle.load(open(list_a_path, 'rb'))
    list_b = pickle.load(open(list_b_path, 'rb'))
    target_list = []
    for a, b in zip(list_a, list_b):
        target_list.append(a+b)
    pickle.dump(target_list, open(target_path, 'wb'))

# create a taskgraph that uses 4 multiprocessing subprocesses when possible
task_graph = taskgraph.TaskGraph(self.workspace_dir, 4)
target_a_path = os.path.join(self.workspace_dir, 'a.dat')
target_b_path = os.path.join(self.workspace_dir, 'b.dat')
result_path = os.path.join(self.workspace_dir, 'result.dat')
result_2_path = os.path.join(self.workspace_dir, 'result2.dat')
value_a = 5
value_b = 10
list_len = 10
task_a = task_graph.add_task(
    target=_create_list_on_disk,
    args=(value_a, list_len, target_a_path),
    target_path_list=[target_a_path])
task_b = task_graph.add_task(
    target=_create_list_on_disk,
    args=(value_b, list_len, target_b_path),
    target_path_list=[target_b_path])
sum_task = task_graph.add_task(
    target=_sum_lists_from_disk,
    args=(target_a_path, target_b_path, result_path),
    target_path_list=[result_path],
    dependent_task_list=[task_a, task_b])

task_graph.close()
task_graph.join()

# expect that result is a list `list_len` long with `value_a+value_b` in it
result = pickle.load(open(result_path, 'rb'))

Running Tests

Taskgraph includes a tox configuration for automating builds across python versions 2.7, 3.6, and whether psutil is installed. To execute all tests on all platforms, run:

$ tox

Alternatively, if you’re only trying to run tests on a single configuration (say, python 3.5 without psutil), you’d run:

$ tox -e py36

Or if you’d like to run the tests for the combination of Python 2.7 with psutil, you’d run:

$ tox -e py27-psutil

TaskGraph Release History

0.7.0 (2018-10-22)

  • Fixed an issue where very long strings might be interpreted as paths and Windows crashes because the path is too long.

  • Fixed a deadlock issue where a Task might raise an unhandled exception as a new task was added to the TaskGraph.

  • Fixed the occasional BrokenPipeError that could occur when a Task encountered an unhandled exception.

  • Added an n_retries parameter to add_task that lets TaskGraph attempt to reexecute a failing Task up to n_retries times before terminating the TaskGraph.

  • Removed the delayed_start option.

0.6.1 (2018-08-14)

  • Resolving an issue with duplicate logging being printed to stdout when n_workers > 0. Logging is now only handled in the process that contains the TaskGraph instance.

  • Updated main logging message to indicate which tasks, by task name, are currently active and how many tasks are ready to execute but can’t because there is not an open worker.

  • Attempted to fix an issue where processes in the process pool were not terminating on a Linux system by aggressively joining all threads and processes when possible.

  • Fixed an issue that would cause tasks that had been previously calculated to prematurely trigger children tasks even if the parent tasks of the current task needed to be reexecuted.

0.6.0 (2018-07-24)

  • Added a delayed_start flag to TaskGraph to allow for delayed execution of taskgraph tasks. If enabled on threaded or multiprocess mode, calls to add_task will not execute tasks until the join method is invoked on taskgraph. This allows for finer control over execution order when tasks are passed non-equivalent priority levels.

  • Fixing an issue where a non-JSON serializeable object would cause add_task to crash. Now TaskGraph is more tolerant of non-JSON serializeable objects and will log warnings when parameters cannot be serialized.

  • TaskGraph constructor has an option to report a ongoing logging message at a set interval. The message reports how many tasks have been committed and completed.

  • Fixed a bug that would cause TaskGraph to needlessly reexecute a task if the only change was the order of the target_path_list or dependent_task_list variables.

  • Fixed a bug that would cause a task to reexecute between runs if input argument was a file that would be generated by a task that had not yet executed.

  • Made a code change that makes it very likely that tasks will be executed in priority order if added to a TaskGraph in delayed execution mode.

  • Refactored internal TaskGraph scheduling to fix a design error that made it likely tasks would be needlessly reexecuted. This also simplified TaskGraph flow control and cause slight performance improvements.

  • Fixed an issue discovered when a scipy.sparse matrix was passed as an argument and add_task crashed on infinite recursion. Type checking of arguments has been simplified and now iteration only occurs on the Python set, dict, list, and tuple types.

  • Fixed an issue where the TaskGraph was not joining the worker process pool on a closed/join TaskGraph, or when the TaskGraph object was being deconstructed. This would occasionally cause a race condition where the TaskGraph may still have a cache .json file open. Discovered through a flaky build test.

  • Added functionality to the TaskGraph object to propagate log messages from workers back to the parent process. This only applies for cases where a TaskGraph instance is started with n_workers > 0.

  • Fixed an issue where a function that was passed as an argument would cause a reexecution on a separate run because the __repr__ of a function includes its pointer address.

  • Adjusted logging levels so that detailed task information is shown on DEBUG but basic status updates are shown in INFO.

0.5.2 (2018-06-20)

  • Fixing an issue where a Task would hang on a join if the number of workers in TaskGraph was -1 and a call to add_task has a non-None passed to target_path_list and the resulting task was \.joined after a second run of the same program.

0.5.1 (2018-06-20)

  • Fixing an issue where TaskGraph would hang on a join if the number of workers was -1 and a call to add_task has None passed to target_path_list.

0.5.0 (2018-05-04)

  • Taskgraph now supports python versions 2 and 3 (tested with python 2.7, 3.6).

  • Fixed an issue with taskgraph.TaskGraph that prevented a multiprocessed graph from executing on POSIX systems when psutil was installed.

  • Adding matrix-based test automation (python 2.7, python 3.6, with/without psutil) via tox.

  • Updating repository path to https://bitbucket.org/natcap/taskgraph.

0.4.0 (2018-04-18)

  • Auto-versioning now happens via setuptools_scm, replacing previous calls to natcap.versioner.

  • Added an option to TaskGraph constructor to allow negative values in the n_workers argument to indicate that the entire object should run in the main thread. A value of 0 will indicate that no multiprocessing will be used but concurrency will be allowed for non-blocking add_task.

  • Added an abstract class task.EncapsulatedTaskOp that can be used to instance a class that needs scope in order to be used as an operation passed to a process. The advantage of using EncapsulatedTaskOp is that the __name__ hash used by TaskGraph to determine if a task is unique is calculated in the superclass and the subclass need only worry about implementation of __call__.

  • Added a priority optional scalar argument to TaskGraph.add_task to indicates the priority preference of the task to be executed. A higher priority task whose dependencies are satisfied will executed before one with a lower priority.

0.3.0 (2017-11-17)

  • Refactor of core scheduler. Old scheduler used asynchronicity to attempt to test if a Task was complete, occasionally testing all Tasks in potential work queue per task completion. Scheduler now uses bookkeeping to keep track of all dependencies and submits tasks for work only when all dependencies are satisfied.

  • TaskGraph and Task .join methods now have a timeout parameter. Additionally join now also returns False if join terminates because of a timeout.

  • More robust error reporting and shutdown of TaskGraph if any tasks fail during execution using pure threading or multiprocessing.

0.2.7 (2017-11-09)

  • Fixed a critical error from the last hotfix that prevented taskgraph from avoiding recomputation of already completed tasks.

0.2.6 (2017-11-07)

  • Fixed an issue from the previous hotfix that could cause taskgraph to exceed the number of available threads if enough tasks were added with long running dependencies.

  • Additional error checking and flow control ensures that a TaskGraph will catastrophically fail and report useful exception logging a task fails during runtime.

  • Fixed a deadlock issue where a failure on a subtask would occasionally cause a TaskGraph to hang.

  • Task.is_complete raises a RuntimeError if the task is complete but failed.

  • More efficient handling of topological progression of task execution to attempt to maximize total possible CPU load.

  • Fixing an issue from the last release that caused the test cases to fail. (Don’t use 0.2.5 at all).

0.2.5 (2017-10-11)

  • Fixed a bug where tasks with satisfied dependencies or no dependencies were blocked on dependent tasks added to the task graph earlier in the main thread execution.

  • Indicating that psutil is an optional dependency through the setup function.

0.2.4 (2017-09-19)

  • Empty release. Possible bug with PyPI release, so re-releasing with a bumped up version.

0.2.3 (2017-09-18)

  • More robust testing on a chain of tasks that might fail because an ancestor failed.

0.2.2 (2017-08-15)

  • Changed how TaskGraph determines of work is complete. Now records target paths in file token with modified time and file size. When checking if work is complete, the token is loaded and the target file stats are compared for each file.

0.2.1 (2017-08-11)

  • Handling cases where a function might be an object or something else that can’t import source code.

  • Using natcap.versioner for versioning.

0.2.0 (2017-07-31)

  • Fixing an issue where types.StringType is not the same as types.StringTypes.

  • Redefined target in add_task to func to avoid naming collision with target_path_list in the same function.

0.1.1 (2017-07-31)

  • Fixing a TYPO on __version__ number scheme.

  • Importing psutil if it exists.

0.1.0 (2017-07-29)

  • Initial release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskgraph-0.7.0.tar.gz (30.6 kB view hashes)

Uploaded Source

Built Distributions

taskgraph-0.7.0-py3-none-any.whl (20.3 kB view hashes)

Uploaded Python 3

taskgraph-0.7.0-py2-none-any.whl (26.0 kB view hashes)

Uploaded Python 2

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page