Skip to main content

A lightweight framework for flow-based programming in python.

Project description

Version

Coverage

Coverage Report
FileStmtsMissCover
TOTAL9400100%

License: MIT PyPI - Python Version Documentation Status Black

Flowpipe Logo

Flow-based Programming

A lightweight framework for flow-based programming in python.

+-------------------+          +---------------------+
|   Invite People   |          |   Birthday Party    |
|-------------------|          |---------------------|
o amount<4>         |   +----->o attendees<>         |
|            people o---+ +--->o cake<>              |
+-------------------+     |    +---------------------+
                          |
+-------------------+     |
|    Bake a cake    |     |
+-------------------+     |
o type<"Chocolate"> |     |
|              cake o-----+
+-------------------+

Benefits:

  • Visualize code
  • Re-usability
  • Streamlined code design
  • Built-in concurrency
  • Represent workflows one to one in the code

Quick Example

Consider this simple example on how to represent the construction of a house with Flowpipe:

from flowpipe import Graph, INode, Node, InputPlug, OutputPlug


class HireWorkers(INode):
    """A node can be derived from the INode interface.

    The plugs are defined in the init method.
    The compute method received the inputs from any connected upstream nodes.
    """

    def __init__(self, amount=None, **kwargs):
        super(HireWorkers, self).__init__(**kwargs)
        InputPlug('amount', self, amount)
        OutputPlug('workers', self)

    def compute(self, amount):
        workers = ['John', 'Jane', 'Mike', 'Michelle']
        print('{0} workers are hired to build the house.'.format(amount))
        return {'workers.{0}'.format(i): workers[i] for i in range(amount)}


@Node(outputs=['workers'])
def Build(workers, section):
    """A node can also be created by the Node decorator.outputs

    The inputs to the function are turned into InputsPlugs, otuputs are defined
    in the decorator itself. The wrapped function is used as the compute method.
    """
    print('{0} are building the {1}'.format(', '.join(workers.values()), section))
    return {'workers.{0}'.format(i): worker for i, worker in workers.items()}


@Node()
def Party(attendees):
    print('{0} and {1} are having a great party!'.format(
        ', '.join(list(attendees.values())[:-1]), list(attendees.values())[-1]))


# Create a graph with the necessary nodes
graph = Graph(name='How to build a house')
workers = HireWorkers(graph=graph, amount=4)
build_walls = Build(graph=graph, name='Build Walls', section='walls')
build_roof = Build(graph=graph, name='Build Roof', section='roof')
party = Party(graph=graph, name='Housewarming Party')

# Wire up the connections between the nodes
workers.outputs['workers']['0'].connect(build_walls.inputs['workers']['0'])
workers.outputs['workers']['1'].connect(build_walls.inputs['workers']['1'])
workers.outputs['workers']['2'].connect(build_roof.inputs['workers']['0'])
workers.outputs['workers']['3'].connect(build_roof.inputs['workers']['1'])
build_walls.outputs['workers']['0'] >> party.inputs['attendees']['0']
build_walls.outputs['workers']['1'] >> party.inputs['attendees']['2']
build_roof.outputs['workers']['0'] >> party.inputs['attendees']['1']
build_roof.outputs['workers']['1'] >> party.inputs['attendees']['3']
party.inputs['attendees']['4'].value = 'Homeowner'

Visualize the code as a graph or as a listing:

print(graph.name)
print(graph)
print(graph.list_repr())

Output:

How to build a house
+------------------------+          +------------------------+          +---------------------------+
|      HireWorkers       |          |       Build Roof       |          |    Housewarming Party     |
|------------------------|          |------------------------|          |---------------------------|
o amount<4>              |          o section<"roof">        |          % attendees                 |
|                workers %          % workers                |     +--->o  attendees.0<>            |
|             workers.0  o-----+--->o  workers.0<>           |     |--->o  attendees.1<>            |
|             workers.1  o-----|--->o  workers.1<>           |     |--->o  attendees.2<>            |
|             workers.2  o-----|    |                workers %     |--->o  attendees.3<>            |
|             workers.3  o-----|    |             workers.0  o-----|    o  attendees.4<"Homeowner>  |
+------------------------+     |    |             workers.1  o-----|    +---------------------------+
                               |    +------------------------+     |
                               |    +------------------------+     |
                               |    |      Build Walls       |     |
                               |    |------------------------|     |
                               |    o section<"walls">       |     |
                               |    % workers                |     |
                               +--->o  workers.0<>           |     |
                               +--->o  workers.1<>           |     |
                                    |                workers %     |
                                    |             workers.0  o-----+
                                    |             workers.1  o-----+
                                    +------------------------+

Build a House
 HireWorkers
  [i] amount: 4
  [o] workers
   [o] workers.0 >> Build Walls.workers.0
   [o] workers.1 >> Build Walls.workers.1
   [o] workers.2 >> Build Roof.workers.0
   [o] workers.3 >> Build Roof.workers.1
 Build Roof
  [i] section: "roof"
  [i] workers
   [i] workers.0 << HireWorkers.workers.2
   [i] workers.1 << HireWorkers.workers.3
  [o] workers
   [o] workers.0 >> Housewarming Party.attendees.1
   [o] workers.1 >> Housewarming Party.attendees.3
 Build Walls
  [i] section: "walls"
  [i] workers
   [i] workers.0 << HireWorkers.workers.0
   [i] workers.1 << HireWorkers.workers.1
  [o] workers
   [o] workers.0 >> Housewarming Party.attendees.0
   [o] workers.1 >> Housewarming Party.attendees.2
 Housewarming Party
  [i] attendees
   [i] attendees.0 << Build Walls.workers.0
   [i] attendees.1 << Build Roof.workers.0
   [i] attendees.2 << Build Walls.workers.1
   [i] attendees.3 << Build Roof.workers.1
   [i] attendees.4: "Homeowner"

Now build the house:

graph.evaluate(mode='threading')  # Options are linear, threading and multiprocessing

Output:

4 workers are hired to build the house.
Michelle, Mike are building the roof
Jane, John are building the walls
Mike, John, Michelle, Jane and Homeowner are having a great party!

(Note: for more elaborate evaluation schemes, see Evaluators)

We now know how to throw a party, so let's invite some people and re-use these skills for a birthday:

graph = Graph(name='How to throw a birthday party')

@Node(outputs=['people'])
def InvitePeople(amount):
    people = ['John', 'Jane', 'Mike', 'Michelle']
    d = {'people.{0}'.format(i): people[i] for i in range(amount)}
    d['people'] = {people[i]: people[i] for i in range(amount)}
    return d

invite = InvitePeople(graph=graph, amount=4)
birthday_party = Party(graph=graph, name='Birthday Party')
invite.outputs['people'] >> birthday_party.inputs['attendees']

print(graph.name)
print(graph)
graph.evaluate()

Output:

How to throw a birthday party
+-------------------+          +---------------------+
|   InvitePeople    |          |   Birthday Party    |
|-------------------|          |---------------------|
o amount<4>         |     +--->o attendees<>         |
|            people o-----+    +---------------------+
+-------------------+

Jane, Michelle, Mike and John are having a great party!

More Examples

There are more examples for common use cases of flowpipe:

The code for these examples: house_and_birthday.py!

Another simple example: world_clock.py!

How to make use of nested subgraphs: nested_graphs.py!

Using the command pattern with flowpipe successfully: workflow_design_pattern.py!

Use flowpipe on a remote cluster of machines, commonly refered to as a "render farm" in the VFX/Animation industry: vfx_render_farm_conversion.py!

An example graph showcasing a common workflow encountered in the VFX/Animation industry: vfx_rendering.py!

VFX Pipeline

If you are working in the VFX/Animation industry, please check out this extensive guide on how to use flowpipe in a vfx pipeline!

Evaluators

If your nodes just need sequential, threaded or multiprocessing evaluation, the Graph.evaluate() method will serve you just fine. If you want to take more control over the way your Graph is being evaluated, Evaluators are for you. This can also be used to add, e.g. logging or tracing to node evaluation.

Evaluators allow you to take control of node evaluation order, or their scheduling. See flowpipe/evaluator.py to see the Graph.evaluate() method's evaluation schemes.

To use a custom evaluator, subclass flowpipe.evaluator.Evaluator, and provide at least an _evaluate_nodes(self, nodes) method. This method should take a list of nodes and call their respective node.evalaute() methods (along with any other task you want to do for each node being evaluated). To use a cusom evaluator, create it and call its Evalator.evaluate() method with the Graph to evaluate as an argument:

from flowpipe.evaluators import LinearEvaluator

# assuming you created a graph to evaluate above, called `graph`
lin_eval = LinearEvaluator()
lin_eval.evaluate(graph)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowpipe-1.0.4.tar.gz (24.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowpipe-1.0.4-py3-none-any.whl (25.6 kB view details)

Uploaded Python 3

File details

Details for the file flowpipe-1.0.4.tar.gz.

File metadata

  • Download URL: flowpipe-1.0.4.tar.gz
  • Upload date:
  • Size: 24.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for flowpipe-1.0.4.tar.gz
Algorithm Hash digest
SHA256 b548e54cff97e2c5babb5e6b1c12810cafb7e39db6c8cf6b248567c53b5aede6
MD5 d193f14b5c81dc49b25ee1f0270e4bc9
BLAKE2b-256 46baa42350eaab5cf2232057873b6f77386b367a5a744f6dcd72780d39de1e98

See more details on using hashes here.

File details

Details for the file flowpipe-1.0.4-py3-none-any.whl.

File metadata

  • Download URL: flowpipe-1.0.4-py3-none-any.whl
  • Upload date:
  • Size: 25.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for flowpipe-1.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 fb8e334bbf5357a8587e62d45f9964465326de57e33b23ff835638785f1b8b6b
MD5 0642074c587105c95da0a7c37f248209
BLAKE2b-256 68194d19728f0a6af0ccf91ba374048e8430417773e60533ccc1c4fcbe2937df

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page