Skip to main content

A dataflow based workflow framework

Project description

A dataflow based workflow framework.

work in progress

Features

  • Dataflow-like flow/task composing syntax inspired from nextflow 's DSL2.
  • Pure python: No DSL, Import/Compose/Modify Task/Flow python objects at will.
    • Extensible and interactive due to dynamic nature of Python.
      • Task Cache.
      • ...
  • Distributable: Use Dask distributed as Task executor, can deploy in local, cluster, cloud.
  • Hybrid model.
    • Build Flow in Local python or web UI.
    • Schedule/Monitor flow execution in remote server through python or web UI.

Web UI

sabermap

Install

pip install flowsaber

Example

  • A minimal working example consists most features and usages of flowsaber.
from flowsaber.api import *

@task
def add(self, num):  # self is optional
    return num + 1

@task
def multiply(num1, num2):
    return num1 * num2

@shell
def write(num):
    """echo {num} > {num}.txt"""
    return '*.txt'

@task
def read(f: File):
    return open(str(f)).readlines()

@flow
def sub_flow(num):
    return add(num) | map_(lambda x: x ** 2) | add

@flow
def my_flow(num):
    [sub_flow(num), sub_flow(num)] | multiply \
    | write | read | flatten \
    | map_(lambda x: int(x.strip())) \
    | view

num_ch = Channel.values(1, 2, 3, 4, 5, 6, 7, 8)
# resolve dependencies
workflow = my_flow(num=num_ch)
run(workflow)

Example to run in remote

Both server and agent need to be run in background before submitting flowruns.

Start server(API endpoint)

In bash shell.

flowsaber server
Start agent(Flow dispatcher)

In bash shell.

flowsaber agent --server "http://127.0.0.1:8000" --id test
Create flow and schedule for running

In python script or IPython console.

from flowsaber.api import *
@task
def add(num):
    print("This is meesage send by print to stdout in task")
    print("This is meesage send by print to stderr in task", file= sys.stderr)
    a = 1
    for i in range(10000000):
        a += 1
    return num + 1

@flow
def myflow(num):
    return num | add | add | view | add | view

num_ch = Channel.values(*list(range(10)))
f = myflow(num_ch)

run(f, server_address="http://127.0.0.1:8000", agent_id="test")

Test

python -m pytest tests -s -o log_cli=True -vvvv

TODO

  • Pbs/Torque executor
  • More cache mode.
  • Supportrun in Cloud platform.
  • Run CWL script, Convert between CWL and flowsaber flow.

Reference

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowsaber-0.1.3.7.0.tar.gz (65.1 kB view details)

Uploaded Source

Built Distribution

flowsaber-0.1.3.7.0-py3-none-any.whl (80.9 kB view details)

Uploaded Python 3

File details

Details for the file flowsaber-0.1.3.7.0.tar.gz.

File metadata

  • Download URL: flowsaber-0.1.3.7.0.tar.gz
  • Upload date:
  • Size: 65.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.5

File hashes

Hashes for flowsaber-0.1.3.7.0.tar.gz
Algorithm Hash digest
SHA256 9dd54585dcceab1fa91f61b15cd9966d28b18523fc915902d396055fe070e78a
MD5 53be5e0ae234d14ea964423d69617aba
BLAKE2b-256 6d358df658d4af89e311aea9173cc0e1034e80749710903e0183742b15ec24b0

See more details on using hashes here.

File details

Details for the file flowsaber-0.1.3.7.0-py3-none-any.whl.

File metadata

  • Download URL: flowsaber-0.1.3.7.0-py3-none-any.whl
  • Upload date:
  • Size: 80.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.5

File hashes

Hashes for flowsaber-0.1.3.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0eb695fe13b7497edd2bc00c177d1906e068a41bda5297f287b001456bb3eb23
MD5 6193204582e9100f84fdc1dfa5ebfc26
BLAKE2b-256 c0263e9668148c6a237771866e921c817677ab529fc15c9b8a27a49cd69dcc73

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page