Skip to main content

A general purpose framework for building and running computational graphs.

Project description

zincware Coverage Status PyPI version Binder

ZnFlow

The ZnFlow package provides a basic structure for building computational graphs based on functions or classes. It is designed as a lightweight abstraction layer to

  • learn graph computing.
  • build your own packages on top of it.

Installation

pip install znflow

Usage

Connecting Functions

With ZnFlow you can connect functions to each other by using the @nodify decorator. Inside the znflow.DiGraph the decorator will return a FunctionFuture object that can be used to connect the function to other nodes. The FunctionFuture object will also be used to retrieve the result of the function. Outside the znflow.DiGraph the function behaves as a normal function.

import znflow

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

print(compute_mean(2, 8))
# >>> 5

with znflow.DiGraph() as graph:
    mean = compute_mean(2, 8)

graph.run()
print(mean.result)
# >>> 5

with znflow.DiGraph() as graph:
    n1 = compute_mean(2, 8)
    n2 = compute_mean(13, 7)
    n3 = compute_mean(n1, n2)

graph.run()
print(n3.result)
# >>> 7.5

Connecting Classes

It is also possible to connect classes. They can be connected either directly or via class attributes. This is possible by returning znflow.Connections inside the znflow.DiGraph context manager. Outside the znflow.DiGraph the class behaves as a normal class.

In the following example we use a dataclass, but it works with all Python classes that inherit from znflow.Node.

import znflow
import dataclasses

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

@dataclasses.dataclass
class ComputeMean(znflow.Node):
    x: float
    y: float

    results: float = None

    def run(self):
        self.results = (self.x + self.y) / 2

with znflow.DiGraph() as graph:
    n1 = ComputeMean(2, 8)
    n2 = compute_mean(13, 7)
    # connecting classes and functions to a Node
    n3 = ComputeMean(n1.results, n2)

graph.run()
print(n3.results)
# >>> 7.5

Dask Support

ZnFlow comes with support for Dask to run your graph:

All you need to do is install ZnFlow with Dask pip install znflow[dask]. We can then extend the example from above. This will run n1 and n2 in parallel. You can investigate the graph on the Dask dashboard (typically http://127.0.0.1:8787/graph or via the client object in Jupyter.)

import znflow
import dataclasses
from dask.distributed import Client

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

@dataclasses.dataclass
class ComputeMean(znflow.Node):
    x: float
    y: float

    results: float = None

    def run(self):
        self.results = (self.x + self.y) / 2


client = Client()
deployment = znflow.deployment.DaskDeployment(client=client)


with znflow.DiGraph(deployment=deployment) as graph:
    n1 = ComputeMean(2, 8)
    n2 = compute_mean(13, 7)
    # connecting classes and functions to a Node
    n3 = ComputeMean(n1.results, n2)

graph.run()

print(n3)
# >>> ComputeMean(x=5.0, y=10.0, results=7.5)

Working with lists

ZnFlow supports some special features for working with lists. In the following example we want to combine two lists.

import znflow

@znflow.nodify
def arange(size: int) -> list:
    return list(range(size))

print(arange(2) + arange(3))
>>> [0, 1, 0, 1, 2]

with znflow.DiGraph() as graph:
    lst = arange(2) + arange(3)

graph.run()
print(lst.result)
>>> [0, 1, 0, 1, 2]

This functionality is restricted to lists. There are some further features that allow combining data: list[list] by either using data: list = znflow.combine(data) which has an optional attribute=None argument to be used in the case of classes or you can simply use data: list = sum(data, []).

Attributes Access

Inside the with znflow.DiGraph() context manager, accessing class attributes yields znflow.Connector objects. Sometimes, it may be required to obtain the actual attribute value instead of a znflow.Connector object. It is not recommended to run class methods inside the with znflow.DiGraph() context manager since it should be exclusively used for building the graph and not for actual computation.

In the case of properties or other descriptor-based attributes, it might be necessary to access the actual attribute value. This can be achieved using the znflow.get_attribute method, which supports all features from getattr and can be imported as such:

from znflow import get_attribute as getattr

Here's an example of how to use znflow.get_attribute:

import znflow

class POW2(znflow.Node):
    """Compute the square of x."""
    x_factor: float = 0.5
    results: float = None
    _x: float = None

    @property
    def x(self):
        return self._x

    @x.setter
    def x(self, value):
        # using "self._x = value * self.x_factor" inside "znflow.DiGraph()" would run
        # "value * Connector(self, "x_factor")" which is not possible (TypeError)
        # therefore we use znflow.get_attribute.
        self._x = value * znflow.get_attribute(self, "x_factor")

    def run(self):
        self.results = self.x**2

with znflow.DiGraph() as graph:
    n1 = POW2()
    n1.x = 4.0

graph.run()
assert n1.results == 4.0

Instead, you can also use the znflow.disable_graph decorator / context manager to disable the graph for a specific block of code or the znflow.Property as a drop-in replacement for property.

Groups

It is possible to create groups of znflow.nodify or znflow.Nodes independent from the graph structure. To create a group you can use with graph.group(<name>). To access the group members, use graph.get_group(<name>) -> znflow.Group.

import znflow

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

graph = znflow.DiGraph()

with graph.group("grp1"):
    n1 = compute_mean(2, 4)

assert n1.uuid in graph.get_group("grp1")

Supported Frameworks

ZnFlow includes tests to ensure compatibility with:

  • "Plain classes"
  • dataclasses
  • ZnInit
  • attrs
  • pydantic (experimental)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

znflow-0.2.0.tar.gz (19.8 kB view details)

Uploaded Source

Built Distribution

znflow-0.2.0-py3-none-any.whl (22.3 kB view details)

Uploaded Python 3

File details

Details for the file znflow-0.2.0.tar.gz.

File metadata

  • Download URL: znflow-0.2.0.tar.gz
  • Upload date:
  • Size: 19.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.4 Linux/6.5.0-1025-azure

File hashes

Hashes for znflow-0.2.0.tar.gz
Algorithm Hash digest
SHA256 bad8b5135b676a6568cba6665aea4d4641ccb1c2cc0e545f428d46f449c08960
MD5 c59f00cadb8907209ff7480d729e4469
BLAKE2b-256 0bf0e8f438caef640e0f0ef991386be4bfd48d440d2e1cae9bcf56ba4bddf790

See more details on using hashes here.

File details

Details for the file znflow-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: znflow-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 22.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.4 Linux/6.5.0-1025-azure

File hashes

Hashes for znflow-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b129aed56dbf08bd54bf70f3545564d64e0e8fe0779cedac09833611b9c55e87
MD5 19ecf42559c5460b2dd5430da96c7674
BLAKE2b-256 4d42d0a97fec888e19784293c2251e216bb270567c70b74d6d79559e903d3ded

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page