A simple language to describe dataflow between entries, implemented in Python.
Project description
a small and simple language within the project sblog.
Install
pip install sdataflow
Concepts
sdataflow provides:
A small and simple language to define the relation of entities. An entity is a logic unit defined by user(i.e. a data processing function), it generates some kind of outcome as a respond to some kind of input outcome(which might be generated by other Entity). Relations of entities forms a dataflow.
An command line program sdataflow generates html file for debugging.
A scheduler automatically runs entities and ships outcome to its destination.
Language
Tutorial
Let’s start with a simplest case(one-to-one relation):
A --> B
where entity B accepts outcome of A as its input.
To define a one-to-more or more-to-one relation:
# one-to-more A --> B A --> C A --> D # more-to-one B --> A C --> A D --> A
where in the one-to-more case, copies of outcome of A could be passed to B, C and D. In the more-to-one case, outcomes of B, C and D would be passed to A.
And here’s the form of outcome dispatching, that is, a mechanism of sending different kinds of outcome of an entity to different destinations. For instance, entity A generates two kinds of outcome, say [type1] and [type2], and pass outcomes of [type1] to B, outcomes of [type2] to C:
# one way. A --> [type1] A --> [type2] [type1] --> B [type2] --> C # another way. A --[type1]--> B A --[type2]--> C
where identifier embraced in brackets(i.e. [type1]) represents the name of outcome. In contrast to the form of outcome dispatching, A --> B would simple pass outcome of A, with default name A(the name of entity generates the outcome), to entity B. Essentially, above form(statement contains brackets) overrides the name of outcome, and acts like a filter for outcome dispatching.
Outcome could be used to define one-to-more, more-to-one relations as well, in the same way discussed above:
# one-to-more example. A --> [type1] A --> [type2] [type1] --> B [type1] --> C [type2] --> D [type2] --> E # more-to-one example. A --> [type1] B --> [type1] [type1] --> C
After loading all user defined dataflow, there are basically two steps of analysis will be applied:
Build a DAG of dataflow. Break if error happens(i.e. syntax error, cyclic path).
Apply topology sort to DAG to get the linear ordering of entity invocation.
Lexical Rules
ARROW : re.escape('-->') DOUBLE_HYPHENS : re.escape('--') BRACKET_LEFT : re.escape('[') BRACKET_RIGHT : re.escape(']') ID : r'\w+'
The effect of above rules would be equivalent as if passing such rules to Python’s re module with the flag UNICODE being set.
CFGs
start : stats stats : stats single_stat | empty single_stat : entity_to_entity | entity_to_outcome | outcome_to_entity entity_to_entity : ID general_arrow ID general_arrow : ARROW | DOUBLE_HYPHENS outcome ARROW outcome : BRACKET_LEFT ID BRACKET_RIGHT entity_to_outcome : ID ARROW outcome outcome_to_entity : outcome ARROW ID
Command-line program
After install sdataflow through pip, user can invoke a command-line program sdataflow. Synopsis of sdataflow is simple:
Usage: sdataflow <file>
User could pass the file path of datafow definition to sdataflow, then the program will parse the file, analyse the dataflow and finally generate a html file. Ues a browser to open such html file(based on project mermaid), and then, you get a graphic representation of your dataflow!
An example is given for illustration:
$ cat example.sd A --[odd]--> B A --[even]--> C B --> D C --> D $ sdataflow example.sd $ ls example.html example.sd
Ues a browser to open example.html:
API
Form of Callback
As mentioned above, an entity stands for a user defined logic unit. Hence, after defining the relations of entities in the language discussed above, user should defines a set of callbacks, corresponding to each entity in the definition.
A callback is a callable(function, generator, bound method) that returns None(i.e. a function with no return statement), or an iterable object of which the element is a (key, value) tuple, with key as the name of outcome and value as user defined object. Argument list of such callable could be:
An empty list, meaning that such callback accept no data.
An one-element list.
Code fragment for illustration:
# normal function returns `None`, with empty argument list.
def func1():
pass
# normal function return `None`, with one-element argument list.
def func2(items):
for name_of_outcome, obj in items:
# do something.
# normal function return elements, with one-element argument list.
def func3(items):
# ignore `items`
data = [('some outcome name', i) for i in range(10)]
return data
# generator yield element, with one-element argument list.
def gen1(items):
# ignore `items`
for i in range(10):
yield 'some outcome name', i
class ExampleClass(object):
@classmethod
def method1(cls):
pass
@classmethod
def method2(cls, items):
pass
def method3(self):
pass
def method4(self, items):
pass
# class bound method, with empty argument list.
ExampleClass.method1
# class bound method, with one-element argument list.
ExampleClass.method2
example_instance = ExampleClass()
# class bound method, with empty argument list.
example_instance.method3
# class bound method, with one-element argument list.
example_instance.method4
Note that the name of outcome is the string embraced in brackets(not including the brackets).
All In One Interface
sdataflow provides a class sdataflow.DataflowHandler to parse doc(a string represents the relations of entities), register callbacks and schedule the execution of callbacks.
class DataflowHandler __init__(self, doc, name_callback_mapping=None) `doc`: unicode or utf-8 encoded binary data. `name_callback_mapping`: a dict of (`name`, `callback`) pairs. `name` could be unicode or utf-8 encoded binary data. `callback` is a function or generator. `name_callback_mapping` could be `None`, since callback can be registered by function decorator(see next section). run(self) Automatically execute all registered callbacks.
Example:
from sdataflow import DataflowHandler
from sdataflow.callback import create_data_wrapper
doc = ('A --[odd]--> B '
'A --[even]--> C '
'B --> D '
'C --> D ')
def a():
odd = create_data_wrapper('odd')
even = create_data_wrapper('even')
for i in range(1, 10):
if i % 2 == 0:
yield even(i)
else:
yield odd(i)
def b(items):
default = create_data_wrapper('B')
# remove 1.
for outcome_name, number in items:
if number == 1:
continue
yield default(number)
def c(items):
default = create_data_wrapper('C')
# remove 2.
for outcome_name, number in items:
if number == 2:
continue
yield default(number)
def d(items):
numbers = {i for _, i in items}
assert set(range(3, 10)) == numbers
name_callback_mapping = {
'A': a,
'B': b,
'C': c,
'D': d,
}
# parse `doc`, register `a`, `b`, `c`, `d`.
handler = DataflowHandler(doc, name_callback_mapping)
# execute callbacks.
handler.run()
In above example, A generates numbers in the range of 1 to 9, of which the odd numbers(1, 3, 5, 7, 9) are sent to B, the even numbers(2, 4, 6, 8) are sent to C. Then B removes number 1 and sends the rest(3, 5, 7, 9) to D, while C removes number 2 and sends the rest(4, 6, 8) to D. Finally, D receives outcomes of both C and D, and make sure that is equal to set(range(3, 10)).
Use Decorator To Register Normal Function
sdataflow.callback.register_callback is a function decorator with signature:
register_callback(entity_name, *outcome_names)
where entity_name could be an unicode or utf-8 encoded binary string, indicating the entity to which the function should be registered. If outcome_names is given, the decorator would inject several sdataflow.callback.create_data_wrapper generated data wrapper to the function being decorated.
Example:
@register_callback('A')
def zero_arg():
return 0
@register_callback('C')
def one_arg(items):
return 1
DataflowHandler(doc)
where zero_arg is registered to entity A, one_arg is registered to entity B. Note that as mentioned above, second parameter of DataflowHandler can be ignored.
When names of decorator registered callback conflict with names of name_callback_mapping, the second parameter of DataflowHandler, callbacks in name_callback_mapping will be accepted, and callbacks registered by function decorator will be discarded. For example:
@register_callback('A')
def zero_arg():
return 0
@register_callback('C')
def should_not_be_registered(items):
return 1
def one_arg(items):
return 42
DataflowHandler(doc, {'C': one_arg})
where one_arg will be registered instead of should_not_be_registered.
Example of function injection:
@register_callback('A', 'type1', 'type2')
def func():
return func.type1(1), func.type2(2)
assert (
('type1', 1),
('type2', 2),
) == func()
Be careful to apply register_callback to things other than function, let’s say, you want to register a class method:
class Example(object):
# wrong, `classmethod` is not bound.
@register_callback('A')
@classmethod
def func(cls):
pass
# try following code instead.
register_callback('A')(Example.func)
Pure Interface of sdataflow Language
sdataflow.lang.parse can be used to parse the definition of dataflow:
parse(doc) input: `doc` with type of six.binary_type or six.text_type. output: linear ordering and root nodes of dataflow.
parse returns a 2-tuple, with the first element is a list of linear ordering of dataflow, and the second element is a list of root nodes of the forest.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file sdataflow-0.3.tar.gz
.
File metadata
- Download URL: sdataflow-0.3.tar.gz
- Upload date:
- Size: 259.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6d610195a0ce882f63a835939bca9a4d1579791548d9ad4f3726919e16ec8634 |
|
MD5 | 7c243278d43f195ffd803a3e619a84f5 |
|
BLAKE2b-256 | 35bd6915da863cb8fafa86ef25ae92a945e0e78f1b407a237f2c924c4dd5ba65 |