A general-purpose workflow manager
Project description
Pantarei
A general-purpose workflow manager - because everything flows
Quick start
Pantarei builds on three kinds of execution units:
- functions are stateless, Python callables
- tasks are stateful wrapped functions that cache execution results
- jobs are stateful wrapped tasks for distributed-memory parallel environments
To see it in action, say you have a Python function
def f(x):
import time
time.sleep(2)
return x
Wrap the function with a Task and call it with a range of arguments
from pantarei import *
task = Task(f)
for x in [1, 2]:
task(x=x)
The task's results are cached: a successive execution will just fetch the results
results = task(x=1)
We wrap the task with a Job and submit jobs to a local scheduler (like SLURM)
job = Job(task)
for x in [3, 4]:
job(x=x)
Once the jobs are done, we can get the results (which are cached too)
job.scheduler.wait()
results = job(x=3)
To see a summary of the jobs from the Python intepreter, add the following line at the end
pantarei()
From the command line, you can check the state of the jobs by changing the execution mode ('safe', 'brave', 'timid') like this
pantarei=timid python script.py
TODO
- parametrize scheduler commands other than slurm
- add command line tool
- handle task dependencies
- add Workflow / Queue
- allow job submission within function
- submit on remote cluster
- perhaps add signac-like view() or checkout() method to check out a view of cache as folders
- handle same function name from multiple modules/scripts
Mockups
Handle task dependencies
def run(path):
pass
def analyze(path):
pass
# TODO: how to use results of dependent tasks?
run = Task(run, path='output.txt')
analyze = Task(analyze, depends=[run], path='output.txt')
for task in Workflow([run, analyze]):
task()
Jobs inherit task dependencies
run = Job(run, wall_time=24, cores=8)
analyze = Job(analyze, wall_time=1, cores=1)
for job in Workflow([run, analyze]):
job()
# Wait for analyze job to end
job.scheduler.wait(analyze.fully_qualified_name())
Remote scheduler
scheduler = Scheduler(host='login.m100.cineca.it', user='john_doe')
job = Job(f, scheduler=scheduler)
job(x=1)
job.scheduler.wait()
Documentation
Check out the tutorial for more examples and the public API for full details.
Installation
From pypi
pip install pantarei
Contributing
Contributions to the project are welcome. If you wish to contribute, check out these guidelines.
Authors
- Daniele Coslovich
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pantarei-0.2.tar.gz.
File metadata
- Download URL: pantarei-0.2.tar.gz
- Upload date:
- Size: 41.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fe1f3aed49d792b46cb4845e2cf0769f8aa914f4069de985666b9f8cae0100f0
|
|
| MD5 |
80f89ad017e9971f7fc2030dc5763a0f
|
|
| BLAKE2b-256 |
bf6ff3bdb31964e49dbad41882dffa83badcc5a25b60513701ef81c923dd555b
|
File details
Details for the file pantarei-0.2.0-py3-none-any.whl.
File metadata
- Download URL: pantarei-0.2.0-py3-none-any.whl
- Upload date:
- Size: 40.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d27f00032a6860f6f8c4bb9d71a8d524785c539ebce908bbfd70966c6da5660f
|
|
| MD5 |
384fdf01b71a2cfd0758dfe4e953a68d
|
|
| BLAKE2b-256 |
091191791bb32ad9f30e84cb54875f965206d82922924fca1e304a1dac8eddb4
|