A general purpose framework for building and running computational graphs.
Project description
ZnFlow
The ZnFlow package provides a basic structure for building computational
graphs based on functions or classes. It is designed as a lightweight
abstraction layer to
- learn graph computing.
- build your own packages on top of it.
Installation
pip install znflow
Usage
Connecting Functions
With ZnFlow you can connect functions to each other by using the @nodify
decorator. Inside the znflow.DiGraph the decorator will return a
FunctionFuture object that can be used to connect the function to other nodes.
The FunctionFuture object will also be used to retrieve the result of the
function. Outside the znflow.DiGraph the function behaves as a normal
function.
import znflow
@znflow.nodify
def compute_mean(x, y):
return (x + y) / 2
print(compute_mean(2, 8))
# >>> 5
with znflow.DiGraph() as graph:
mean = compute_mean(2, 8)
graph.run()
print(mean.result)
# >>> 5
with znflow.DiGraph() as graph:
n1 = compute_mean(2, 8)
n2 = compute_mean(13, 7)
n3 = compute_mean(n1, n2)
graph.run()
print(n3.result)
# >>> 7.5
Connecting Classes
It is also possible to connect classes. They can be connected either directly or
via class attributes. This is possible by returning znflow.Connections inside
the znflow.DiGraph context manager. Outside the znflow.DiGraph the class
behaves as a normal class.
In the following example we use a dataclass, but it works with all Python
classes that inherit from znflow.Node.
import znflow
import dataclasses
@znflow.nodify
def compute_mean(x, y):
return (x + y) / 2
@dataclasses.dataclass
class ComputeMean(znflow.Node):
x: float
y: float
results: float = None
def run(self):
self.results = (self.x + self.y) / 2
with znflow.DiGraph() as graph:
n1 = ComputeMean(2, 8)
n2 = compute_mean(13, 7)
# connecting classes and functions to a Node
n3 = ComputeMean(n1.results, n2)
graph.run()
print(n3.results)
# >>> 7.5
Dask Support
ZnFlow comes with support for Dask to run your graph:
- in parallel.
- through e.g. SLURM (see https://jobqueue.dask.org/en/latest/api.html).
- with a nice GUI to track progress.
All you need to do is install ZnFlow with Dask pip install znflow[dask]. We
can then extend the example from above. This will run n1 and n2 in parallel.
You can investigate the graph on the Dask dashboard (typically
http://127.0.0.1:8787/graph or via the client object in Jupyter.)
import znflow
import dataclasses
from dask.distributed import Client
@znflow.nodify
def compute_mean(x, y):
return (x + y) / 2
@dataclasses.dataclass
class ComputeMean(znflow.Node):
x: float
y: float
results: float = None
def run(self):
self.results = (self.x + self.y) / 2
client = Client()
deployment = znflow.deployment.DaskDeployment(client=client)
with znflow.DiGraph(deployment=deployment) as graph:
n1 = ComputeMean(2, 8)
n2 = compute_mean(13, 7)
# connecting classes and functions to a Node
n3 = ComputeMean(n1.results, n2)
graph.run()
print(n3)
# >>> ComputeMean(x=5.0, y=10.0, results=7.5)
Working with lists
ZnFlow supports some special features for working with lists. In the following
example we want to combine two lists.
import znflow
@znflow.nodify
def arange(size: int) -> list:
return list(range(size))
print(arange(2) + arange(3))
>>> [0, 1, 0, 1, 2]
with znflow.DiGraph() as graph:
lst = arange(2) + arange(3)
graph.run()
print(lst.result)
>>> [0, 1, 0, 1, 2]
This functionality is restricted to lists. There are some further features that
allow combining data: list[list] by either using
data: list = znflow.combine(data) which has an optional attribute=None
argument to be used in the case of classes or you can simply use
data: list = sum(data, []).
Attributes Access
Inside the with znflow.DiGraph() context manager, accessing class attributes
yields znflow.Connector objects. Sometimes, it may be required to obtain the
actual attribute value instead of a znflow.Connector object. It is not
recommended to run class methods inside the with znflow.DiGraph() context
manager since it should be exclusively used for building the graph and not for
actual computation.
In the case of properties or other descriptor-based attributes, it might be
necessary to access the actual attribute value. This can be achieved using the
znflow.get_attribute method, which supports all features from getattr and
can be imported as such:
from znflow import get_attribute as getattr
Here's an example of how to use znflow.get_attribute:
import znflow
class POW2(znflow.Node):
"""Compute the square of x."""
x_factor: float = 0.5
results: float = None
_x: float = None
@property
def x(self):
return self._x
@x.setter
def x(self, value):
# using "self._x = value * self.x_factor" inside "znflow.DiGraph()" would run
# "value * Connector(self, "x_factor")" which is not possible (TypeError)
# therefore we use znflow.get_attribute.
self._x = value * znflow.get_attribute(self, "x_factor")
def run(self):
self.results = self.x**2
with znflow.DiGraph() as graph:
n1 = POW2()
n1.x = 4.0
graph.run()
assert n1.results == 4.0
Instead, you can also use the znflow.disable_graph decorator / context manager
to disable the graph for a specific block of code or the znflow.Property as a
drop-in replacement for property.
Groups
It is possible to create groups of znflow.nodify or znflow.Nodes independent
from the graph structure. To create a group you can use
with graph.group(<name>). To access the group members, use
graph.get_group(<name>) -> znflow.Group.
import znflow
@znflow.nodify
def compute_mean(x, y):
return (x + y) / 2
graph = znflow.DiGraph()
with graph.group("grp1"):
n1 = compute_mean(2, 4)
assert n1.uuid in graph.get_group("grp1")
Supported Frameworks
ZnFlow includes tests to ensure compatibility with:
- "Plain classes"
dataclassesZnInitattrspydantic(experimental)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file znflow-0.2.7.tar.gz.
File metadata
- Download URL: znflow-0.2.7.tar.gz
- Upload date:
- Size: 236.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
12f3584c6ba93250e93e2e63b7d600cc68bb39b6dd19e0f605a7430118863d91
|
|
| MD5 |
90350734292c97a8f15d448683c7bad7
|
|
| BLAKE2b-256 |
3c50c3f3738ae15f6e542939db41393105f0266889d4bd6ae550e7cc23c7001a
|
File details
Details for the file znflow-0.2.7-py3-none-any.whl.
File metadata
- Download URL: znflow-0.2.7-py3-none-any.whl
- Upload date:
- Size: 23.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cd2e45edc2e78a50c465a7045370ddd0a28faa67962d44dbcabd49540b11e72d
|
|
| MD5 |
26d557dbcf6127216ac215a72b46facb
|
|
| BLAKE2b-256 |
e567680655ae0444a3710f67259dcdbb3583ed0b93634aa82ffd18cf582e3150
|