Skip to main content

A simple language to describe dataflow between entries, implemented in Python.

Project description

a small and simple language within the project sblog.


pip install sdataflow


sdataflow provides:

  • A small and simple language to define the relation of entities. An entity is a logic unit defined by user(i.e. a data processing function), it generates some kind of outcome as a respond to some kind of input outcome(which might be generated by other Entity). Relations of entities forms a dataflow.

  • An command line program sdataflow generates html file for debugging.

  • A scheduler automatically runs entities and ships outcome to its destination.



Let’s start with a simplest case(one-to-one relation):

A --> B

where entity B accepts outcome of A as its input.

To define a one-to-more or more-to-one relation:

# one-to-more
A --> B
A --> C
A --> D

# more-to-one
B --> A
C --> A
D --> A

where in the one-to-more case, copies of outcome of A could be passed to B, C and D. In the more-to-one case, outcomes of B, C and D would be passed to A.

And here’s the form of outcome dispatching, that is, a mechanism of sending different kinds of outcome of an entity to different destinations. For instance, entity A generates two kinds of outcome, say [type1] and [type2], and pass outcomes of [type1] to B, outcomes of [type2] to C:

# one way.
A --> [type1]
A --> [type2]
[type1] --> B
[type2] --> C

# another way.
A --[type1]--> B
A --[type2]--> C

where identifier embraced in brackets(i.e. [type1]) represents the name of outcome. In contrast to the form of outcome dispatching, A --> B would simple pass outcome of A, with default name A(the name of entity generates the outcome), to entity B. Essentially, above form(statement contains brackets) overrides the name of outcome, and acts like a filter for outcome dispatching.

Outcome could be used to define one-to-more, more-to-one relations as well, in the same way discussed above:

# one-to-more example.
A --> [type1]
A --> [type2]
[type1] --> B
[type1] --> C
[type2] --> D
[type2] --> E

# more-to-one example.
A --> [type1]
B --> [type1]
[type1] --> C

After loading all user defined dataflow, there are basically two steps of analysis will be applied:

  1. Build a DAG of dataflow. Break if error happens(i.e. syntax error, cyclic path).

  2. Apply topology sort to DAG to get the linear ordering of entity invocation.

Lexical Rules

ARROW          : re.escape('-->')
DOUBLE_HYPHENS : re.escape('--')
BRACKET_LEFT   : re.escape('[')
BRACKET_RIGHT  : re.escape(']')
ID             : r'\w+'

The effect of above rules would be equivalent as if passing such rules to Python’s re module with the flag UNICODE being set.


start : stats

stats : stats single_stat
      | empty

single_stat : entity_to_entity
            | entity_to_outcome
            | outcome_to_entity

entity_to_entity : ID general_arrow ID

general_arrow : ARROW
              | DOUBLE_HYPHENS outcome ARROW


entity_to_outcome : ID ARROW outcome

outcome_to_entity : outcome ARROW ID

Command-line program

After install sdataflow through pip, user can invoke a command-line program sdataflow. Synopsis of sdataflow is simple:

    sdataflow <file>

User could pass the file path of datafow definition to sdataflow, then the program will parse the file, analyse the dataflow and finally generate a html file. Ues a browser to open such html file(based on project mermaid), and then, you get a graphic representation of your dataflow!

An example is given for illustration:

$ cat
A --[odd]--> B
A --[even]--> C
B --> D
C --> D
$ sdataflow
$ ls

Ues a browser to open example.html:

<figure> screen shot 2015-04-28 at 12 02 58 am <figcaption>

screen shot 2015-04-28 at 12 02 58 am

</figcaption> </figure>


Form of Callback

As mentioned above, an entity stands for a user defined logic unit. Hence, after defining the relations of entities in the language discussed above, user should defines a set of callbacks, corresponding to each entity in the definition.

A callback is a callable(function, generator, bound method) that returns None(i.e. a function with no return statement), or an iterable object of which the element is a (key, value) tuple, with key as the name of outcome and value as user defined object. Argument list of such callable could be:

  1. An empty list, meaning that such callback accept no data.

  2. An one-element list.

Code fragment for illustration:

# normal function returns `None`, with empty argument list.
def func1():

# normal function return `None`, with one-element argument list.
def func2(items):
    for name_of_outcome, obj in items:
        # do something.

# normal function return elements, with one-element argument list.
def func3(items):
    # ignore `items`
    data = [('some outcome name', i) for i in range(10)]
    return data

# generator yield element, with one-element argument list.
def gen1(items):
    # ignore `items`
    for i in range(10):
        yield 'some outcome name', i

class ExampleClass(object):

    def method1(cls):

    def method2(cls, items):

    def method3(self):

    def method4(self, items):

# class bound method, with empty argument list.
# class bound method, with one-element argument list.

example_instance = ExampleClass()
# class bound method, with empty argument list.
# class bound method, with one-element argument list.

Note that the name of outcome is the string embraced in brackets(not including the brackets).

All In One Interface

sdataflow provides a class sdataflow.DataflowHandler to parse doc(a string represents the relations of entities), register callbacks and schedule the execution of callbacks.

class DataflowHandler
    __init__(self, doc, name_callback_mapping=None)
        `doc`: unicode or utf-8 encoded binary data.
        `name_callback_mapping`: a dict of (`name`, `callback`) pairs. `name`
        could be unicode or utf-8 encoded binary data. `callback` is a function
        or generator. `name_callback_mapping` could be `None`, since callback
        can be registered by function decorator(see next section).

        Automatically execute all registered callbacks.


from sdataflow import DataflowHandler
from sdataflow.callback import create_data_wrapper

doc = ('A --[odd]--> B '
       'A --[even]--> C '
       'B --> D '
       'C --> D ')

def a():
    odd = create_data_wrapper('odd')
    even = create_data_wrapper('even')
    for i in range(1, 10):
        if i % 2 == 0:
            yield even(i)
            yield odd(i)

def b(items):
    default = create_data_wrapper('B')
    # remove 1.
    for outcome_name, number in items:
        if number == 1:
        yield default(number)

def c(items):
    default = create_data_wrapper('C')
    # remove 2.
    for outcome_name, number in items:
        if number == 2:
        yield default(number)

def d(items):
    numbers = {i for _, i in items}
    assert set(range(3, 10)) == numbers

name_callback_mapping = {
    'A': a,
    'B': b,
    'C': c,
    'D': d,

# parse `doc`, register `a`, `b`, `c`, `d`.
handler = DataflowHandler(doc, name_callback_mapping)

# execute callbacks.

In above example, A generates numbers in the range of 1 to 9, of which the odd numbers(1, 3, 5, 7, 9) are sent to B, the even numbers(2, 4, 6, 8) are sent to C. Then B removes number 1 and sends the rest(3, 5, 7, 9) to D, while C removes number 2 and sends the rest(4, 6, 8) to D. Finally, D receives outcomes of both C and D, and make sure that is equal to set(range(3, 10)).

Use Decorator To Register Normal Function

sdataflow.callback.register_callback is a function decorator with signature:

register_callback(entity_name, *outcome_names)

where entity_name could be an unicode or utf-8 encoded binary string, indicating the entity to which the function should be registered. If outcome_names is given, the decorator would inject several sdataflow.callback.create_data_wrapper generated data wrapper to the function being decorated.


def zero_arg():
    return 0

def one_arg(items):
    return 1


where zero_arg is registered to entity A, one_arg is registered to entity B. Note that as mentioned above, second parameter of DataflowHandler can be ignored.

When names of decorator registered callback conflict with names of name_callback_mapping, the second parameter of DataflowHandler, callbacks in name_callback_mapping will be accepted, and callbacks registered by function decorator will be discarded. For example:

def zero_arg():
    return 0

def should_not_be_registered(items):
    return 1

def one_arg(items):
    return 42

DataflowHandler(doc, {'C': one_arg})

where one_arg will be registered instead of should_not_be_registered.

Example of function injection:

@register_callback('A', 'type1', 'type2')
def func():
    return func.type1(1), func.type2(2)

assert (
    ('type1', 1),
    ('type2', 2),
) == func()

Be careful to apply register_callback to things other than function, let’s say, you want to register a class method:

class Example(object):

    # wrong, `classmethod` is not bound.
    def func(cls):

# try following code instead.

Pure Interface of sdataflow Language

sdataflow.lang.parse can be used to parse the definition of dataflow:

    input: `doc` with type of six.binary_type or six.text_type.
    output: linear ordering and root nodes of dataflow.

parse returns a 2-tuple, with the first element is a list of linear ordering of dataflow, and the second element is a list of root nodes of the forest.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sdataflow-0.3.tar.gz (259.1 kB view hashes)

Uploaded source

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page