A general-purpose workflow manager
Project description
Pantarei
A general-purpose workflow manager - because everything flows
Quick start
Pantarei builds on three kinds of execution units:
- functions are stateless, Python callables
- tasks are stateful wrapped functions that cache execution results
- jobs are stateful wrapped tasks for distributed-memory parallel environments
To see it in action, say you have a Python function
def f(x):
import time
time.sleep(2)
return x
Wrap the function with a Task and call it with a range of arguments
from pantarei import *
task = Task(f)
for x in [1, 2]:
task(x=x)
The task's results are cached: a successive execution will just fetch the results
results = task(x=1)
We wrap the task with a Job and submit jobs to a local scheduler (like SLURM)
job = Job(task)
for x in [3, 4]:
job(x=x)
Once the jobs are done, we can get the results (which are cached too)
job.scheduler.wait()
results = job(x=3)
To see a summary of the jobs from the Python intepreter, add the following line at the end
pantarei()
From the command line, you can check the state of the jobs by changing the execution mode ('safe', 'brave', 'timid') like this
pantarei=timid python script.py
TODO
- parametrize scheduler commands other than slurm
- allow job submission within function
- add command line tool
- submit on remote cluster
- handle task dependencies
- add Workflow / Queue
- perhaps add signac-like view() or checkout() method to check out a view of cache as folders
Mockups
Handle task dependencies
def run(path):
pass
def analyze(path):
pass
# TODO: how to use results of dependent tasks?
run = Task(run, path='output.txt')
analyze = Task(analyze, depends=[run], path='output.txt')
for task in Workflow([run, analyze]):
task()
Jobs inherit task dependencies
run = Job(run, wall_time=24, cores=8)
analyze = Job(analyze, wall_time=1, cores=1)
for job in Workflow([run, analyze]):
job()
# Wait for analyze job to end
job.scheduler.wait(analyze.fully_qualified_name())
Remote scheduler
scheduler = Scheduler(host='login.m100.cineca.it', user='john_doe')
job = Job(f, scheduler=scheduler)
job(x=1)
job.scheduler.wait()
Documentation
Check out the tutorial for more examples and the public API for full details.
Installation
From pypi
pip install pantarei
Contributing
Contributions to the project are welcome. If you wish to contribute, check out these guidelines.
Authors
- Daniele Coslovich
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pantarei-0.5.0.tar.gz
.
File metadata
- Download URL: pantarei-0.5.0.tar.gz
- Upload date:
- Size: 50.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6f49859fb4dca21de7332628c3093c92514e705cbf2dd03cc02fc09722e4294c |
|
MD5 | 212c96fe65bc9360641d5360d1cc3291 |
|
BLAKE2b-256 | 20e0bdcf9b51f0a242a84ca4db774c89f5eeb07a8410f27acb45fa6869686282 |
File details
Details for the file pantarei-0.5.0-py3-none-any.whl
.
File metadata
- Download URL: pantarei-0.5.0-py3-none-any.whl
- Upload date:
- Size: 49.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6f95b14dcb4ef0d1529ce5861999d1c83439c9550250404798f8e4997a8357e1 |
|
MD5 | 3738aa8e7e9d0c265a887c567d98cae2 |
|
BLAKE2b-256 | 7bae0182afcdabc744a3278c68822acdf7e45d2dcfd974ddba1b87dc6a4423b9 |