A dataflow based workflow framework
Project description
A dataflow based workflow framework.
work in progress
Features
- Dataflow-like flow/task composing syntax inspired from
nextflow
's DSL2. - Pure python: No DSL, Import/Compose/Modify Task/Flow python objects at will.
- Extensible and interactive due to dynamic nature of Python.
- Task Cache.
- ...
- Extensible and interactive due to dynamic nature of Python.
- Distributable: Use Dask distributed as Task executor, can deploy in local, cluster, cloud.
- Hybrid model.
- Build Flow in Local python or web UI.
- Schedule/Monitor flow execution in remote server through python or web UI.
Web UI
Install
pip install flowsaber
Example
- A minimal working example consists most features and usages of
flowsaber
.
from flowsaber.api import *
@task
def add(self, num): # self is optional
return num + 1
@task
def multiply(num1, num2):
return num1 * num2
@shell
def write(num):
"""echo {num} > 1.txt"""
return '*.txt'
@task
def read(f: File):
return open(str(f)).readlines()
@flow
def sub_flow(num):
return add(num) | map_(lambda x: x ** 2) | add
@flow
def my_flow(num):
[sub_flow(num), sub_flow(num)] | multiply \
| write | read | flatten \
| map_(lambda x: int(x.strip())) \
| view
num_ch = Channel.values(1, 2, 3, 4, 5, 6, 7, 8)
# resolve dependencies
workflow = my_flow(num=num_ch)
run(workflow)
Example to run in remote
Start server(API endpoint)
In bash shell.
flowsaber server
Start agent(Flow dispatcher)
In bash shell.
flowsaber agent --server "http://127.0.0.1:8000" --id test
Create flow and schedule for running
In python script or IPython console.
from flowsaber.api import *
@task
def add(num):
print("This is meesage send by print to stdout in task")
print("This is meesage send by print to stderr in task", file= sys.stderr)
a = 1
for i in range(10000000):
a += 1
return num + 1
@flow
def myflow(num):
return num | add | add | view | add | view
num_ch = Channel.values(*list(range(10)))
f = myflow(num_ch)
run(f, server_address="http://127.0.0.1:8000", agent_id="test")
Test
python -m pytest tests -s -o log_cli=True -vvvv
TODO
- Pbs/Torque executor
- More cache mode.
- Supportrun in Cloud platform.
- Run CWL script, Convert between CWL and flowsaber flow.
Reference
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
flowsaber-0.1.3.6.9.tar.gz
(60.2 kB
view hashes)
Built Distribution
Close
Hashes for flowsaber-0.1.3.6.9-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 47975b75f4b3b2780f01c8e67fda0182edcb2287692034670352afddd9dd54a4 |
|
MD5 | 24e7e8fd73e5261ea1625cfd24886535 |
|
BLAKE2b-256 | d6ae241d2616fb57405083516dce5ec5a682c1961d26ae55b606d3c90364d7b7 |