Distributed graph computation library
Project description
Pargraph is a lightweight parallel graph computation library for Python. At its core, Pargraph consists of two modules: a graph creation tool and an embedded graph scheduler. You can use either or both modules in your code.
Installation
Install Pargraph via pip
pip install pargraph
If you want to use GraphBLAS for better graph scheduling performance, you may install the optional graphblas extra:
pip install pargraph[graphblas]
Graph creation
Pargraph provides a simple graph creation tool that allows you to build task graphs by decorating Python functions.
There are two decorators:
@delayed: Decorate a function to make it delayed. Cannot contain function calls decorated with@delayedor@graph.@graph: Decorate a function to make it a graph. May contain function calls decorated with@delayedor@graph.
Example
import numpy as np
from pargraph import graph, delayed
@delayed
def filter_array(array: np.ndarray, low: float, high: float) -> np.ndarray:
return array[(array >= low) & (array <= high)]
@delayed
def sort_array(array: np.ndarray) -> np.ndarray:
return np.sort(array)
@delayed
def reduce_arrays(*arrays: np.ndarray) -> np.ndarray:
return np.concatenate(arrays)
@graph
def map_reduce_sort(array: np.ndarray, partition_count: int) -> np.ndarray:
return reduce_arrays(
*(
sort_array(filter_array(array, i / partition_count, (i + 1) / partition_count))
for i in range(partition_count)
)
)
The map_reduce_sort function behaves like a normal Python function if called with concrete arguments.
import numpy as np
map_reduce_sort(np.random.rand(20))
# [0.06253707 0.06795382 0.11492823 0.14512393 0.20183152 0.41109117
# 0.42613798 0.45156214 0.4714821 0.54000373 0.54902451 0.62671881
# 0.64402013 0.65147012 0.70903525 0.77846584 0.83861765 0.89170381
# 0.92492478 0.95370363]
Use the to_graph method to generate a graph representation of the function.
map_reduce_sort.to_graph(partition_count=4).to_dot().write_png("map_reduce_sort.png")
Moreover, you can compose graph functions with other graph functions to generate ever more complex graphs.
@graph
def map_reduce_sort_recursive(
array: np.ndarray, partition_counts: List[int], _low: float = 0, _high: float = 1
) -> np.ndarray:
if len(partition_counts) == 0:
return sort_array(array)
partition_count, *partition_counts = partition_counts
sorted_partitions = []
for i in range(partition_count):
low = _low + (_high - _low) * (i / partition_count)
high = _low + (_high - _low) * ((i + 1) / partition_count)
sorted_partitions.append(map_reduce_sort_recursive(filter_array(array, low, high), partition_counts, low, high))
return reduce_arrays(*sorted_partitions)
map_reduce_sort_recursive.to_graph(partition_counts=4).to_dot().write_png("map_reduce_sort_recursive.png")
Use the to_dict method to convert the generated graph to a dict graph.
import numpy as np
from distributed import Client
with Client() as client:
client.get(map_reduce_sort.to_graph(partition_count=4).to_dict(array=np.random.rand(20)))[0]
# [0.06253707 0.06795382 0.11492823 0.14512393 0.20183152 0.41109117
# 0.42613798 0.45156214 0.4714821 0.54000373 0.54902451 0.62671881
# 0.64402013 0.65147012 0.70903525 0.77846584 0.83861765 0.89170381
# 0.92492478 0.95370363]
Graph scheduler
Pargraph brings graph parallelization to parallel backends that may not support it out of the box. Think of it as a mini graph scheduler that lives in your program/application and sends out tasks concurrently to a parallel backend of your choice.
It implements Dask's get API and supports the same task graph format used by Dask making it a drop-in Dask replacement
for applications that don't need a fully-fledged graph scheduler.
If installed, graph scheduling is powered by GraphBLAS, a high-performance sparse matrix linear algebra library. It allows better scheduling performance for large and complex graphs (e.g. graphs with 100k+ nodes) compared to native Python implementations.
Usage
Initialize graph engine
from pargraph import GraphEngine
graph_engine = GraphEngine()
Choose a parallel backend
If you want to use a parallel backend other than the default local multiprocessing backend, you may initialize a
different parallel backend and pass it into GraphEngine's constructor.
Example with a dask backend
from distributed import Client
from distributed.cfexecutor import ClientExecutor
dask_client = Client(...)
graph_engine = GraphEngine(ClientExecutor(dask_client))
You may also implement your own parallel backend by implementing the submit method.
Example with a custom backend
from concurrent.futures import Future
class CustomBackend:
def __init__(self):
pass
def submit(self, fn, /, *args, **kwargs) -> Future:
future = Future()
future.set_result(fn(*args, **kwargs))
return future
backend = CustomBackend()
graph_engine = GraphEngine(backend)
Compute graph
Build the task graph and compute a key of your choice:
def inc(i):
return i + 1
def add(a, b):
return a + b
graph = {
"x": 1,
"y": (inc, "x"),
"z": (add, "y", 10)
}
graph_engine.get(graph, "z") # 12
You may also compute multiple keys if you like:
graph_engine.get(graph, ["x", "y", "z"]) # [1, 2, 10]
Contributing
Your contributions are at the core of making this a true open source project. Any contributions you make are greatly appreciated.
We welcome you to:
- Fix typos or touch up documentation
- Share your opinions on existing issues
- Help expand and improve our library by opening a new issue
Please review our community contribution guidelines and functional contribution guidelines to get started 👍.
Code of Conduct
We are committed to making open source an enjoyable and respectful experience for our community. See
CODE_OF_CONDUCT for more information.
License
This project is distributed under the Apache-2.0 License. See
LICENSE for more information.
Contact
If you have a query or require support with this project, raise an issue. Otherwise, reach out to opensource@citi.com.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pargraph-0.9.6.tar.gz.
File metadata
- Download URL: pargraph-0.9.6.tar.gz
- Upload date:
- Size: 146.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
67e3ec47bcf83da5fb5040df3d12015e7ac33bdb4f034a1c59c8ed9caab4a3c5
|
|
| MD5 |
71ae33ccaa079cda08e05bae0faaed8b
|
|
| BLAKE2b-256 |
9f26b8f0bfb306c240750604463daebb254b54906158bba35cd9000cae2c1cd5
|
Provenance
The following attestation bundles were made for pargraph-0.9.6.tar.gz:
Publisher:
pypi.yml on Citi/pargraph
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pargraph-0.9.6.tar.gz -
Subject digest:
67e3ec47bcf83da5fb5040df3d12015e7ac33bdb4f034a1c59c8ed9caab4a3c5 - Sigstore transparency entry: 196945118
- Sigstore integration time:
-
Permalink:
Citi/pargraph@e9b50f758f342e2a186d85fe3a4adb685f3cb57e -
Branch / Tag:
refs/tags/v0.9.6 - Owner: https://github.com/Citi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi.yml@e9b50f758f342e2a186d85fe3a4adb685f3cb57e -
Trigger Event:
release
-
Statement type:
File details
Details for the file pargraph-0.9.6-py3-none-any.whl.
File metadata
- Download URL: pargraph-0.9.6-py3-none-any.whl
- Upload date:
- Size: 27.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
279ff39df338326fb63ba40b3f51c7eda8f0a7718e9f6545ac08067e344ae912
|
|
| MD5 |
216852d1d3b678cb79f6547df2f90ba1
|
|
| BLAKE2b-256 |
2e57b997a0fc81a3c1d507bb2de0352b8851372e2e88e9ddbf4906b87bc97f22
|
Provenance
The following attestation bundles were made for pargraph-0.9.6-py3-none-any.whl:
Publisher:
pypi.yml on Citi/pargraph
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pargraph-0.9.6-py3-none-any.whl -
Subject digest:
279ff39df338326fb63ba40b3f51c7eda8f0a7718e9f6545ac08067e344ae912 - Sigstore transparency entry: 196945124
- Sigstore integration time:
-
Permalink:
Citi/pargraph@e9b50f758f342e2a186d85fe3a4adb685f3cb57e -
Branch / Tag:
refs/tags/v0.9.6 - Owner: https://github.com/Citi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi.yml@e9b50f758f342e2a186d85fe3a4adb685f3cb57e -
Trigger Event:
release
-
Statement type: