Skip to main content

Like a makefile but in python, a stripped-down system of Airflow or Luigi

Project description

scriptflow

Small library that allows scheduling scripts asyncrhonously on different platforms. Think of it as a Make when you can write the dependencies as python code, and that can run locally, on an HPC or in the cloud (cloud is not implemented just yet).

The status is very experimental. I will likely be changing the interface as I go.

Goals:

  • works on windows / osx / linux
  • describe dependencies as python code (using await/async)
  • describe scripts with input/output as code
  • clean terminal feedback (using rich)
  • task retry
  • check that output was generated
  • notifications
  • send status to central web service
  • resume flows
  • clean output
  • named runs
  • store run information
  • output diagnostic / reporting (tracing how files were created)
  • simpler interface with default head executor and awaitable tasks
  • skip computation based on timestamp of inputs and outpus
  • load and store tasks results
  • executors :
    • local excutor using subprocess
    • HPC excutor (monitoring qsub)
    • docker Executor
    • aws executor (probably using Ray)
    • dask executor
  • cache flows in addition to caching tasks (avoid same task getting scheduled from 2 places)
  • add check on qsub return values
  • select flow by name from terminal
  • allow for glob output/input
  • ? scripts can create tasks, not sure how to await them.
  • reporting:
    • input and output hashes
    • start and end datetimes

Simple flow example:

Create a file sflow.py with:

import scriptflow as sf

# set main maestro
cr = sf.CommandRunner(3)
sf.set_main_maestro(cr)

def combine_file():
    with open('test_1.txt') as f:
        a = int(f.readlines()[0])
    with open('test_2.txt') as f:
        b = int(f.readlines()[0])
    with open('final.txt','w') as f:
        f.write("{}\n".format(a+b))

# define a flow called sleepit
async def flow_sleepit():

    i=1
    t1 = sf.Task(["python", "-c", f"import time; time.sleep(5); open('test_{i}.txt','w').write('5');"])
    t1.output(f"test_{i}.txt").uid(f"solve-{i}")

    i=2
    t2 = sf.Task(["python", "-c", f"import time; time.sleep(5); open('test_{i}.txt','w').write('4');"])
    t2.output(f"test_{i}.txt").uid(f"solve-{i}")

    await sf.bag(t1,t2)

    tfinal = sf.Task(["python", "-c", "import sflow; sflow.combine_file()"])
    tfinal.output(f"final.txt").uid(f"final").add_deps([t1.output_file,t2.output_file])
    await tfinal

then create a local env, activate, install and run!

python3 -m venv env
source env/bin/activate
pip install scriptflow
scritpflow run sleepit

Life cycle of a task

  1. the task object is created. All properties can be edited.
  2. the task is sent to an executor. At this point, the properties of the task are frozen. They can be read, copied but not changed. A unique ID id created from the task from its command and its inputs. The task can be sent by using the start() method, or it will be sent automatically when awaited.
  3. the task is awaited, and hence execution is blocked until the task is finished. Nothing can be done at that stage. Again, the task is automatically sent at this stage if it has not be done before. Also note that several tasks can be awaited in parallel by bagging them with sf.bag(...).
  4. the task is completed, the await returns. The task has now it's output attached to it, it can be used in the creation of other tasks.

Inspiration / Alternatives

I have tried to use the following three alternatives which are all truly excelent!

There were use cases that I could not implement cleanly in the dataflow model of nextflow. I didn't like that snakemake relied on file names to trigger rules, I was constently juggling complicated file names. Pydoit is really great, but I couldn't find how to extend it to build my own executor, and I always found myself confused writing new tasks and dealing with dependencies.

Developing

the package is managed using poetry, install poetry first then

poetry install
cd examples/simple-local
poetry run scriptflow run sleepit

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scriptflow-0.1.3.tar.gz (8.0 kB view hashes)

Uploaded Source

Built Distribution

scriptflow-0.1.3-py3-none-any.whl (7.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page