Skip to main content

Lightweight flow-based programming framework.

Project description

Version Build Status Codacy_Badge_Grade Codacy_Badge_Coverage

Flow-based Pipeline

A lightweight framework for flow-based programming in python.

Install

pip install flowpipe

Example: Implement a world clock with Flowpipe

This is very simple and in the end it is just a one liner in python, but a simple example makes it easier to demonstrate the idea behind flowpipe.

Let's import the necessary classes:

from datetime import datetime
import logging
from time import time

from flowpipe import logger
from flowpipe.node import INode, Node
from flowpipe.plug import InputPlug, OutputPlug
from flowpipe.graph import Graph

Then the desired functionality has to be implemented into Nodes. We need two nodes, one to get the current time and one to convert it to a timezone.

@Node(outputs=['time'])
def CurrentTime():
    return {'time': time()}

The above Node is created via the shortcut decorator.

For more complex Nodes, the INode interface can be implemented directly.

class ConvertTime(INode):

    def __init__(self, time=None, timezone=0, city=None, **kwargs):
        super(ConvertTime, self).__init__(**kwargs)
        InputPlug('time', self)
        InputPlug('timezone', self, timezone)
        InputPlug('city', self, city)
        OutputPlug('converted_time', self)

    def compute(self, time, timezone, city):
        return {'converted_time': [time + timezone * 60 * 60, city]}

The World Clock will receive the times and locations and display it.

@Node()
def WorldClock(time1, time2, time3):
    print('-- World Clock -------------------')
    print('It is now {0} in {1}'.format(
        datetime.fromtimestamp(time1[0]).strftime("%H:%M"), time1[1]))
    print('It is now {0} in {1}'.format(
        datetime.fromtimestamp(time2[0]).strftime("%H:%M"), time2[1]))
    print('It is now {0} in {1}'.format(
        datetime.fromtimestamp(time3[0]).strftime("%H:%M"), time3[1]))
    print('----------------------------------')

Now we can create the Graph that represents the world clock:

graph = Graph(name="WorldClockGraph")

Now we create all the necessary Nodes:

current_time = CurrentTime(graph=graph)
van = ConvertTime(city='Vancouver', timezone=-8, graph=graph)
ldn = ConvertTime(city='London', timezone=0, graph=graph)
muc = ConvertTime(city='Munich', timezone=1, graph=graph)
world_clock = WorldClock(graph=graph)

By specifying the "graph" attribute on the Nodes get added to the Graph automatically.

The Nodes can now be wired together. The bitshift operator is used as a shorthand to connect the plugs.

current_time.outputs['time'] >> van.inputs['time']
current_time.outputs['time'] >> ldn.inputs['time']
current_time.outputs['time'] >> muc.inputs['time']
van.outputs['converted_time'] >> world_clock.inputs['time1']
ldn.outputs['converted_time'] >> world_clock.inputs['time2']
muc.outputs['converted_time'] >> world_clock.inputs['time3']

The Graph can be visualized.

print(graph)
 0 | +----------------+          +-------------------+          +---------------+
 1 | |  CurrentTime   |          |    ConvertTime    |          |  WorldClock   |
 2 | |----------------|          |-------------------|          |---------------|
 3 | |           time o-----+    o city<"Vancouver>  |     +--->o time1<>       |
 4 | +----------------+     +--->o time<>            |     |--->o time2<>       |
 5 |                        |    o timezone<-8>      |     |--->o time3<>       |
 6 |                        |    |    converted_time o-----+    +---------------+
 7 |                        |    +-------------------+     |
 8 |                        |    +-------------------+     |
 9 |                        |    |    ConvertTime    |     |
10 |                        |    |-------------------|     |
11 |                        |    o city<"Munich">    |     |
12 |                        |--->o time<>            |     |
13 |                        |    o timezone<1>       |     |
14 |                        |    |    converted_time o-----|
15 |                        |    +-------------------+     |
16 |                        |    +-------------------+     |
17 |                        |    |    ConvertTime    |     |
18 |                        |    |-------------------|     |
19 |                        |    o city<"London">    |     |
20 |                        +--->o time<>            |     |
21 |                             o timezone<0>       |     |
22 |                             |    converted_time o-----+
23 |                             +-------------------+

The Graph is can now be evaluated.

graph.evaluate()
-- World Clock -------------------
It is now 14:55 in Vancouver
It is now 22:55 in London
It is now 23:55 in Munich
----------------------------------

For a more verbose output, set the flowpipe logger to DEBUG.

import logging
from flowpipe import logger

logger.setLevel(logging.DEBUG)

graph.evaluate()
flowpipe DEBUG: Evaluating c:\projects\flowpipe\flowpipe\graph.pyc -> Graph.compute(**{})
flowpipe DEBUG: Evaluating C:\PROJECTS\flowpipe\tests\test_a.py -> CurrentTime.compute(**{})
flowpipe DEBUG: Evaluation result for C:\PROJECTS\flowpipe\tests\test_a.py -> CurrentTime: {
  "time": 1528040105.439
}
flowpipe DEBUG: Evaluating C:\PROJECTS\flowpipe\tests\test_a.py -> ConvertTime.compute(**{
  "city": "Munich",
  "time": 1528040105.439,
  "timezone": 1
})
flowpipe DEBUG: Evaluation result for C:\PROJECTS\flowpipe\tests\test_a.py -> ConvertTime: {
  "converted_time": [
    1528043705.439,
    "Munich"
  ]
}
flowpipe DEBUG: Evaluating C:\PROJECTS\flowpipe\tests\test_a.py -> ConvertTime.compute(**{
  "city": "Vancouver",
  "time": 1528040105.439,
  "timezone": -8
})
flowpipe DEBUG: Evaluation result for C:\PROJECTS\flowpipe\tests\test_a.py -> ConvertTime: {
  "converted_time": [
    1528011305.439,
    "Vancouver"
  ]
}
flowpipe DEBUG: Evaluating C:\PROJECTS\flowpipe\tests\test_a.py -> ConvertTime.compute(**{
  "city": "London",
  "time": 1528040105.439,
  "timezone": 0
})
flowpipe DEBUG: Evaluation result for C:\PROJECTS\flowpipe\tests\test_a.py -> ConvertTime: {
  "converted_time": [
    1528040105.439,
    "London"
  ]
}
flowpipe DEBUG: Evaluating C:\PROJECTS\flowpipe\tests\test_a.py -> WorldClock.compute(**{
  "time1": [
    1528011305.439,
    "Vancouver"
  ],
  "time2": [
    1528040105.439,
    "London"
  ],
  "time3": [
    1528043705.439,
    "Munich"
  ]
})
-- World Clock -------------------
It is now 08:35 in Vancouver
It is now 16:35 in London
It is now 17:35 in Munich
----------------------------------
flowpipe DEBUG: Evaluation result for C:\PROJECTS\flowpipe\tests\test_a.py -> WorldClock: {}
flowpipe DEBUG: Evaluation result for c:\projects\flowpipe\flowpipe\graph.pyc -> Graph: {}

Planned Features

  • Visual Editor
  • Celery Integration
  • API simplifications

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for flowpipe, version 0.5.0
Filename, size File type Python version Upload date Hashes
Filename, size flowpipe-0.5.0-py3-none-any.whl (14.6 kB) File type Wheel Python version py3 Upload date Hashes View hashes
Filename, size flowpipe-0.5.0.tar.gz (11.9 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page