Skip to main content

A robust DAG implementation for parallel programming

Project description

paradag

PyPI Travis CI Status Code Climate

paradag a robust DAG package for easy parallel execution. paradag is implemented in pure python and totally independent of any third party packages.

Directed acyclic graph (DAG) is commonly used as a dependency graph. It could be used to describe the dependencies of tasks. The order of task executions must comply with the dependencies, where tasks with direct or indirect path must run in sequence, and tasks without any connection could run in parallel.

Installation

$ pip install paradag

Create a DAG

Before running tasks, first create a DAG, with each vertex representing a task. The vertex of DAG instance could be any hashable object, like integer, string, tuple of hashable objects, instance of user-defined class, etc.

from paradag import DAG

class Vtx(object):
    def __init__(self, v):
        self.__value = v

vtx = Vtx(999)

dag = DAG()
dag.add_vertex(123, 'abcde', 'xyz', ('a', 'b', 3), vtx)

dag.add_edge(123, 'abcde')                  # 123 -> 'abcde'
dag.add_edge('abcde', ('a', 'b', 3), vtx)   # 'abcde' -> ('a', 'b', 3), 'abcde' -> vtx

add_edge accepts one starting vertex and one or more ending vertices. Please pay attention not to make a cycle with add_edge, which will raise a DAGCycleError.

The common DAG properties are accessible:

print(dag.vertex_size())
print(dag.edge_size())

print(dag.successors('abcde'))
print(dag.predecessors(vtx))

print(dag.all_starts())
print(dag.all_terminals())

Run tasks in sequence

Write your executor and optionally a selector. The executor handles the real execution for each vertex.

from paradag import dag_run
from paradag import SequentialProcessor

class CustomExecutor:
    def param(self, vertex):
        return vertex

    def execute(self, param):
        print('Executing:', param)

print(dag_run(dag, processor=SequentialProcessor(), executor=CustomExecutor()))

dag_run is the core function for task scheduling.

Run tasks in parallel

Run tasks in parallel is quite similar, while only change the processor to MultiThreadProcessor.

from paradag import MultiThreadProcessor

dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor())

The default selector FullSelector will try to find as many tasks as possible which could run in parallel. This could be adjusted with custom selector. The following selector will only allow at most 4 tasks running in parallel.

class CustomSelector(object):
    def select(self, running, idle):
        task_number = max(0, 4-len(running))
        return list(idle)[:task_number]

dag_run(dag, processor=MultiThreadProcessor(), selector=CustomSelector(), executor=CustomExecutor())

Once you are using MultiThreadProcessor, great attentions must be paid that execute of executor could run in parallel. Try not to modify any variables outside the execute function, and all parameters should be passed by the param argument. Also make sure that the return values generated from param function are independent.

Get task running status

The executor could also implement the optional methods which could get the task running status.

class CustomExecutor:
    def param(self, vertex):
        return vertex

    def execute(self, param):
        print('Executing:', param)

    def report_start(self, vertices):
        print('Start to run:', vertices)

    def report_running(self, vertices):
        print('Current running:', vertices)

    def report_finish(self, vertices_result):
        for vertex, result in vertices_result:
            print('Finished running {0} with result: {1}'.format(vertex, result))

dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor())

Deliver result to descendants

In case the result for one task should be used for its descendants, deliver method could be implemented in executor.

class CustomExecutor:
    def __init__(self):
        self.__level = {}

    def param(self, vertex):
        return self.__level.get(vertex, 0)

    def execute(self, param):
        return param + 1

    def report_finish(self, vertices_result):
        for vertex, result in vertices_result:
            print('Vertex {0} finished, level: {1}'.format(vertex, result))

    def deliver(self, vertex, result):
        self.__level[vertex] = result

The result from parent will be delivered to the vertex before execution.

Topological sorting

Topological sorting could also be done by paradag.dag_run function. The return value of dag_run could be considered as the result of topological sorting.

A simple topological sorting without any execution:

from paradag import SingleSelector, RandomSelector, ShuffleSelector

dag = DAG()
dag.add_vertex(1, 2, 3, 4, 5)
dag.add_edge(1, 4)
dag.add_edge(4, 2, 5)

print(dag_run(dag))
print(dag_run(dag, selector=SingleSelector()))
print(dag_run(dag, selector=RandomSelector()))
print(dag_run(dag, selector=ShuffleSelector()))

The solution for topological sorting is not necessarily unique, and the final orders may vary with different selectors.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

paradag-1.2.0.tar.gz (6.5 kB view details)

Uploaded Source

Built Distribution

paradag-1.2.0-py2.py3-none-any.whl (8.2 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file paradag-1.2.0.tar.gz.

File metadata

  • Download URL: paradag-1.2.0.tar.gz
  • Upload date:
  • Size: 6.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/42.0.2 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.6.8

File hashes

Hashes for paradag-1.2.0.tar.gz
Algorithm Hash digest
SHA256 3d2977892de62bd111c517c25f4c75750259c94c9711a190094b942723697ab7
MD5 d2285f54b7905eae8bc9876822fccc1d
BLAKE2b-256 59f09aa8f1b14c60a25554a12c40c0eda1d3402d86e7a0cb0e7e80063ecf07cd

See more details on using hashes here.

File details

Details for the file paradag-1.2.0-py2.py3-none-any.whl.

File metadata

  • Download URL: paradag-1.2.0-py2.py3-none-any.whl
  • Upload date:
  • Size: 8.2 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/42.0.2 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.6.8

File hashes

Hashes for paradag-1.2.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 dbd57185a1553970877c452aedf961912eb7825a9a25e8a71010e84660710c42
MD5 ed033dbac206f7eb07b7f6ad436e7caa
BLAKE2b-256 164e11febcb72e37437442c8b0dd9cff9e5c2676d120d8d5355a5b4b3e4384ed

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page