Skip to main content

Distributed graph computation library

Project description

Citi

Citi/pargraph

Efficient, lightweight and reliable distributed computation engine.

PyPI - Version


Pargraph is a lightweight parallel graph computation library for Python. At its core, Pargraph consists of two modules: a graph creation tool and an embedded graph scheduler. You can use either or both modules in your code.

Installation

Install Pargraph via pip

pip install pargraph

If you want to use GraphBLAS for better graph scheduling performance, you may install the optional graphblas extra:

pip install pargraph[graphblas]

Graph creation

Pargraph provides a simple graph creation tool that allows you to build task graphs by decorating Python functions.

There are two decorators:

  • @delayed: Decorate a function to make it delayed. Cannot contain function calls decorated with @delayed or @graph.
  • @graph: Decorate a function to make it a graph. May contain function calls decorated with @delayed or @graph.

Example

import numpy as np
from pargraph import graph, delayed


@delayed
def filter_array(array: np.ndarray, low: float, high: float) -> np.ndarray:
    return array[(array >= low) & (array <= high)]


@delayed
def sort_array(array: np.ndarray) -> np.ndarray:
    return np.sort(array)


@delayed
def reduce_arrays(*arrays: np.ndarray) -> np.ndarray:
    return np.concatenate(arrays)


@graph
def map_reduce_sort(array: np.ndarray, partition_count: int) -> np.ndarray:
    return reduce_arrays(
        *(
            sort_array(filter_array(array, i / partition_count, (i + 1) / partition_count))
            for i in range(partition_count)
        )
    )

The map_reduce_sort function behaves like a normal Python function if called with concrete arguments.

import numpy as np

map_reduce_sort(np.random.rand(20))

# [0.06253707 0.06795382 0.11492823 0.14512393 0.20183152 0.41109117
#  0.42613798 0.45156214 0.4714821  0.54000373 0.54902451 0.62671881
#  0.64402013 0.65147012 0.70903525 0.77846584 0.83861765 0.89170381
#  0.92492478 0.95370363]

Use the to_graph method to generate a graph representation of the function.

map_reduce_sort.to_graph(partition_count=4).to_dot().write_png("map_reduce_sort.png")

Map-Reduce Sort

Moreover, you can compose graph functions with other graph functions to generate ever more complex graphs.

@graph
def map_reduce_sort_recursive(
    array: np.ndarray, partition_counts: List[int], _low: float = 0, _high: float = 1
) -> np.ndarray:
    if len(partition_counts) == 0:
        return sort_array(array)

    partition_count, *partition_counts = partition_counts

    sorted_partitions = []
    for i in range(partition_count):
        low = _low + (_high - _low) * (i / partition_count)
        high = _low + (_high - _low) * ((i + 1) / partition_count)
        sorted_partitions.append(map_reduce_sort_recursive(filter_array(array, low, high), partition_counts, low, high))

    return reduce_arrays(*sorted_partitions)
map_reduce_sort_recursive.to_graph(partition_counts=4).to_dot().write_png("map_reduce_sort_recursive.png")

Map-Reduce Sort Recursive

Use the to_dict method to convert the generated graph to a dict graph.

import numpy as np
from distributed import Client

with Client() as client:
    client.get(map_reduce_sort.to_graph(partition_count=4).to_dict(array=np.random.rand(20)))[0]

# [0.06253707 0.06795382 0.11492823 0.14512393 0.20183152 0.41109117
#  0.42613798 0.45156214 0.4714821  0.54000373 0.54902451 0.62671881
#  0.64402013 0.65147012 0.70903525 0.77846584 0.83861765 0.89170381
#  0.92492478 0.95370363]

Graph scheduler

Pargraph brings graph parallelization to parallel backends that may not support it out of the box. Think of it as a mini graph scheduler that lives in your program/application and sends out tasks concurrently to a parallel backend of your choice.

It implements Dask's get API and supports the same task graph format used by Dask making it a drop-in Dask replacement for applications that don't need a fully-fledged graph scheduler.

If installed, graph scheduling is powered by GraphBLAS, a high-performance sparse matrix linear algebra library. It allows better scheduling performance for large and complex graphs (e.g. graphs with 100k+ nodes) compared to native Python implementations.

Usage

Initialize graph engine

from pargraph import GraphEngine

graph_engine = GraphEngine()

Choose a parallel backend

If you want to use a parallel backend other than the default local multiprocessing backend, you may initialize a different parallel backend and pass it into GraphEngine's constructor.

Example with a dask backend

from distributed import Client
from distributed.cfexecutor import ClientExecutor

dask_client = Client(...)
graph_engine = GraphEngine(ClientExecutor(dask_client))

You may also implement your own parallel backend by implementing the submit method.

Example with a custom backend

from concurrent.futures import Future


class CustomBackend:
    def __init__(self):
        pass

    def submit(self, fn, /, *args, **kwargs) -> Future:
        future = Future()
        future.set_result(fn(*args, **kwargs))
        return future


backend = CustomBackend()
graph_engine = GraphEngine(backend)

Compute graph

Build the task graph and compute a key of your choice:

def inc(i):
    return i + 1


def add(a, b):
    return a + b


graph = {
    "x": 1,
    "y": (inc, "x"),
    "z": (add, "y", 10)
}
graph_engine.get(graph, "z")  # 12

You may also compute multiple keys if you like:

graph_engine.get(graph, ["x", "y", "z"])  # [1, 2, 10]

Contributing

Your contributions are at the core of making this a true open source project. Any contributions you make are greatly appreciated.

We welcome you to:

Please review our community contribution guidelines and functional contribution guidelines to get started 👍.

Code of Conduct

We are committed to making open source an enjoyable and respectful experience for our community. See CODE_OF_CONDUCT for more information.

License

This project is distributed under the Apache-2.0 License. See LICENSE for more information.

Contact

If you have a query or require support with this project, raise an issue. Otherwise, reach out to opensource@citi.com.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pargraph-0.9.6.tar.gz (146.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pargraph-0.9.6-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file pargraph-0.9.6.tar.gz.

File metadata

  • Download URL: pargraph-0.9.6.tar.gz
  • Upload date:
  • Size: 146.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for pargraph-0.9.6.tar.gz
Algorithm Hash digest
SHA256 67e3ec47bcf83da5fb5040df3d12015e7ac33bdb4f034a1c59c8ed9caab4a3c5
MD5 71ae33ccaa079cda08e05bae0faaed8b
BLAKE2b-256 9f26b8f0bfb306c240750604463daebb254b54906158bba35cd9000cae2c1cd5

See more details on using hashes here.

Provenance

The following attestation bundles were made for pargraph-0.9.6.tar.gz:

Publisher: pypi.yml on Citi/pargraph

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pargraph-0.9.6-py3-none-any.whl.

File metadata

  • Download URL: pargraph-0.9.6-py3-none-any.whl
  • Upload date:
  • Size: 27.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for pargraph-0.9.6-py3-none-any.whl
Algorithm Hash digest
SHA256 279ff39df338326fb63ba40b3f51c7eda8f0a7718e9f6545ac08067e344ae912
MD5 216852d1d3b678cb79f6547df2f90ba1
BLAKE2b-256 2e57b997a0fc81a3c1d507bb2de0352b8851372e2e88e9ddbf4906b87bc97f22

See more details on using hashes here.

Provenance

The following attestation bundles were made for pargraph-0.9.6-py3-none-any.whl:

Publisher: pypi.yml on Citi/pargraph

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page