Skip to main content

A general-purpose workflow manager

Project description

Pantarei

license pipeline status coverage report

A general-purpose workflow manager - because everything flows

Quick start

Pantarei builds on three kinds of execution units:

  • functions are stateless, Python callables
  • tasks are stateful wrapped functions that cache execution results
  • jobs are stateful wrapped tasks for distributed-memory parallel environments

To see it in action, say you have a Python function

def f(x):
    import time
    time.sleep(2)
    return x

Wrap the function with a Task and call it with a range of arguments

from pantarei import *

task = Task(f)
for x in [1, 2]:
    task(x=x)

The task's results are cached: a successive execution will just fetch the results

results = task(x=1)

We wrap the task with a Job and submit jobs to a local scheduler (like SLURM)

job = Job(task)
for x in [3, 4]:
    job(x=x)

Once the jobs are done, we can get the results (which are cached too)

job.scheduler.wait()
results = job(x=3)

To see a summary of the jobs from the Python intepreter, add the following line at the end

pantarei()

From the command line, you can check the state of the jobs by changing the execution mode ('safe', 'brave', 'timid') like this

pantarei=timid python script.py

TODO

  • parametrize scheduler commands other than slurm
  • allow job submission within function
  • add command line tool
  • submit on remote cluster
  • handle task dependencies
  • add Workflow / Queue
  • perhaps add signac-like view() or checkout() method to check out a view of cache as folders

Mockups

Handle task dependencies

def run(path):
    pass
def analyze(path):
    pass

# TODO: how to use results of dependent tasks?
run = Task(run, path='output.txt')
analyze = Task(analyze, depends=[run], path='output.txt')

for task in Workflow([run, analyze]):
    task()

Jobs inherit task dependencies

run = Job(run, wall_time=24, cores=8)
analyze = Job(analyze, wall_time=1, cores=1)

for job in Workflow([run, analyze]):
    job()

# Wait for analyze job to end
job.scheduler.wait(analyze.fully_qualified_name())

Remote scheduler

scheduler = Scheduler(host='login.m100.cineca.it', user='john_doe')
job = Job(f, scheduler=scheduler)
job(x=1)
job.scheduler.wait()

Documentation

Check out the tutorial for more examples and the public API for full details.

Installation

From pypi

pip install pantarei

Contributing

Contributions to the project are welcome. If you wish to contribute, check out these guidelines.

Authors

  • Daniele Coslovich

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pantarei-0.3.0.tar.gz (46.3 kB view hashes)

Uploaded Source

Built Distribution

pantarei-0.3.0-py3-none-any.whl (45.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page