Skip to main content

A general purpose framework for building and running computational graphs.

Project description

zincware Coverage Status PyPI version Binder

ZnFlow

The ZnFlow package provides a basic structure for building computational graphs based on functions or classes. It is designed as a lightweight abstraction layer to

  • learn graph computing.
  • build your own packages on top of it.

Installation

pip install znflow

Usage

Connecting Functions

With ZnFlow you can connect functions to each other by using the @nodify decorator. Inside the znflow.DiGraph the decorator will return a FunctionFuture object that can be used to connect the function to other nodes. The FunctionFuture object will also be used to retrieve the result of the function. Outside the znflow.DiGraph the function behaves as a normal function.

import znflow

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

print(compute_mean(2, 8))
# >>> 5

with znflow.DiGraph() as graph:
    mean = compute_mean(2, 8)

graph.run()
print(mean.result)
# >>> 5

with znflow.DiGraph() as graph:
    n1 = compute_mean(2, 8)
    n2 = compute_mean(13, 7)
    n3 = compute_mean(n1, n2)

graph.run()
print(n3.result)
# >>> 7.5

Connecting Classes

It is also possible to connect classes. They can be connected either directly or via class attributes. This is possible by returning znflow.Connections inside the znflow.DiGraph context manager. Outside the znflow.DiGraph the class behaves as a normal class.

In the following example we use a dataclass, but it works with all Python classes that inherit from znflow.Node.

import znflow
import dataclasses

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

@dataclasses.dataclass
class ComputeMean(znflow.Node):
    x: float
    y: float

    results: float = None

    def run(self):
        self.results = (self.x + self.y) / 2

with znflow.DiGraph() as graph:
    n1 = ComputeMean(2, 8)
    n2 = compute_mean(13, 7)
    # connecting classes and functions to a Node
    n3 = ComputeMean(n1.results, n2)

graph.run()
print(n3.results)
# >>> 7.5

Dask Support

ZnFlow comes with support for Dask to run your graph:

All you need to do is install ZnFlow with Dask pip install znflow[dask]. We can then extend the example from above. This will run n1 and n2 in parallel. You can investigate the graph on the Dask dashboard (typically http://127.0.0.1:8787/graph or via the client object in Jupyter.)

import znflow
import dataclasses
from dask.distributed import Client

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

@dataclasses.dataclass
class ComputeMean(znflow.Node):
    x: float
    y: float

    results: float = None

    def run(self):
        self.results = (self.x + self.y) / 2


client = Client()
deployment = znflow.deployment.DaskDeployment(client=client)


with znflow.DiGraph(deployment=deployment) as graph:
    n1 = ComputeMean(2, 8)
    n2 = compute_mean(13, 7)
    # connecting classes and functions to a Node
    n3 = ComputeMean(n1.results, n2)

graph.run()

print(n3)
# >>> ComputeMean(x=5.0, y=10.0, results=7.5)

Working with lists

ZnFlow supports some special features for working with lists. In the following example we want to combine two lists.

import znflow

@znflow.nodify
def arange(size: int) -> list:
    return list(range(size))

print(arange(2) + arange(3))
>>> [0, 1, 0, 1, 2]

with znflow.DiGraph() as graph:
    lst = arange(2) + arange(3)

graph.run()
print(lst.result)
>>> [0, 1, 0, 1, 2]

This functionality is restricted to lists. There are some further features that allow combining data: list[list] by either using data: list = znflow.combine(data) which has an optional attribute=None argument to be used in the case of classes or you can simply use data: list = sum(data, []).

Attributes Access

Inside the with znflow.DiGraph() context manager, accessing class attributes yields znflow.Connector objects. Sometimes, it may be required to obtain the actual attribute value instead of a znflow.Connector object. It is not recommended to run class methods inside the with znflow.DiGraph() context manager since it should be exclusively used for building the graph and not for actual computation.

In the case of properties or other descriptor-based attributes, it might be necessary to access the actual attribute value. This can be achieved using the znflow.get_attribute method, which supports all features from getattr and can be imported as such:

from znflow import get_attribute as getattr

Here's an example of how to use znflow.get_attribute:

import znflow

class POW2(znflow.Node):
    """Compute the square of x."""
    x_factor: float = 0.5
    results: float = None
    _x: float = None

    @property
    def x(self):
        return self._x

    @x.setter
    def x(self, value):
        # using "self._x = value * self.x_factor" inside "znflow.DiGraph()" would run
        # "value * Connector(self, "x_factor")" which is not possible (TypeError)
        # therefore we use znflow.get_attribute.
        self._x = value * znflow.get_attribute(self, "x_factor")

    def run(self):
        self.results = self.x**2

with znflow.DiGraph() as graph:
    n1 = POW2()
    n1.x = 4.0

graph.run()
assert n1.results == 4.0

Instead, you can also use the znflow.disable_graph decorator / context manager to disable the graph for a specific block of code or the znflow.Property as a drop-in replacement for property.

Groups

It is possible to create groups of znflow.nodify or znflow.Nodes independent from the graph structure. To create a group you can use with graph.group(<name>). To access the group members, use graph.get_group(<name>) -> znflow.Group.

import znflow

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

graph = znflow.DiGraph()

with graph.group("grp1"):
    n1 = compute_mean(2, 4)

assert n1.uuid in graph.get_group("grp1")

Supported Frameworks

ZnFlow includes tests to ensure compatibility with:

  • "Plain classes"
  • dataclasses
  • ZnInit
  • attrs
  • pydantic (experimental)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

znflow-0.2.1.tar.gz (19.8 kB view details)

Uploaded Source

Built Distribution

znflow-0.2.1-py3-none-any.whl (22.3 kB view details)

Uploaded Python 3

File details

Details for the file znflow-0.2.1.tar.gz.

File metadata

  • Download URL: znflow-0.2.1.tar.gz
  • Upload date:
  • Size: 19.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.5 Linux/6.5.0-1025-azure

File hashes

Hashes for znflow-0.2.1.tar.gz
Algorithm Hash digest
SHA256 a273421add51454887cbbbb3bcd0f31f59596f16b3bf21a6217b6235f2584c15
MD5 2d16dce4ab4829e94e92428e2c545203
BLAKE2b-256 3118d4730e55a02f5c3963f2b30319cae113973aa792f3e19ed2aca65fd5128e

See more details on using hashes here.

File details

Details for the file znflow-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: znflow-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 22.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.5 Linux/6.5.0-1025-azure

File hashes

Hashes for znflow-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a4451a202f9b82856e28a64e75615e93fe6a9e54da3e7873103c1bf193e8f759
MD5 f91e2b4ea5aba1d92826bbc4685ff094
BLAKE2b-256 772d989d72e40b58caf48a79e1794f1fc305c54967b49e3d718ff5f4b6d66570

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page