A dataflow based workflow framework
Project description
A dataflow based workflow framework.
work in progress
Features
- Dataflow-like flow/task composing syntax inspired from nextflow 's DSL2.
- Pure python: No DSL, Import/Compose/Modify Task/Flow python objects at will.
- Extensible and interactive due to dynamic nature of Python.
- Task Cache.
- ...
- Extensible and interactive due to dynamic nature of Python.
- Distributable: Use Dask distributed as Task executor, can deploy in local, cluster, cloud.
- Support hybrid model inspired from prefect.
- Build Flow in Local python or web UI(In progress).
- Schedule/Monitor flow execution in remote server through python or web UI(In progress).
Install
pip install flowsaber
Example
- A minimal working example consists most features and usages of
flowsaber
.
from flowsaber.api import *
@task
def add(self, num): # self is optional
return num + 1
@task
def multiply(num1, num2):
return num1 * num2
@shell
def write(num):
"""echo {num} > 1.txt"""
return '*.txt'
@task
def read(f: File):
return open(str(f)).readlines()
@flow
def sub_flow(num):
return add(num) | map_(lambda x: x ** 2) | add
@flow
def my_flow(num):
[sub_flow(num), sub_flow(num)] | multiply \
| write | read | flatten \
| map_(lambda x: int(x.strip())) \
| view
num_ch = Channel.values(1, 2, 3, 4, 5, 6, 7, 8)
# resolve dependencies
workflow = my_flow(num=num_ch)
run(workflow)
Example to run in remote
Start server(API endpoint)
In bash shell.
flowsaber server
Start agent(Flow dispatcher)
In bash shell.
flowsaber agent --server "http://127.0.0.1:8000" --id test
Create flow and schedule for running
In python script or IPython console.
from flowsaber.api import *
@task
def add(num):
print("This is meesage send by print to stdout in task")
print("This is meesage send by print to stderr in task", file= sys.stderr)
a = 1
for i in range(10000000):
a += 1
return num + 1
@flow
def myflow(num):
return num | add | add | view | add | view
num_ch = Channel.values(*list(range(10)))
f = myflow(num_ch)
run(f, server_address="http://127.0.0.1:8000", agent_id="test")
Test
python -m pytest tests -s -o log_cli=True -vvvv
TODO
- Web interface.
- Pbs/Torque executor
- More cache mode.
- Supportrun in Cloud platform.
- Run CWL script, Convert between CWL and flowsaber flow.
Reference
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
flowsaber-0.1.3.6.6.tar.gz
(59.0 kB
view hashes)
Built Distribution
Close
Hashes for flowsaber-0.1.3.6.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | a1d88a209f876665293bfbcf8dc34620e5a1a8912970746c03816947a935dca3 |
|
MD5 | b733b36dd40fc035a4cd7d3ce509227e |
|
BLAKE2b-256 | 2e07e0724161180aec9d0df88fc350ecfb4ec9913a8c251b14db5e8084a871ea |