A simple task manager with slurm integration
Project description
depio
A simple task manager with slurm integration.
How to use
We start with setting up a Pipeline:
from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())
To this pipeline object you can now add Tasks. There are two ways how you can add tasks. The first (1) is via decorators and the second (2) is a function interface. Before we consider the differences we start with parts that are similar for both.
(1) Use via decorators
To add tasks via decorators you need use the @task("datapipeline")
decorator from depio.decorators.task
:
import time
import pathlib
from typing import Annotated
from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Product, Dependency
from depio.decorators import task
defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())
BLD = pathlib.Path("build")
BLD.mkdir(exist_ok=True)
print("Touching an initial file")
(BLD/"input.txt").touch()
@task("datapipeline")
def slowfunction(output: Annotated[pathlib.Path, Product],
input: Annotated[pathlib.Path, Dependency] = None,
sec:int = 0
):
print(f"A function that is reading from {input} and writing to {output} in {sec} seconds.")
time.sleep(sec)
with open(output,'w') as f:
f.write("Hallo from depio")
defaultpipeline.add_task(slowfunction(BLD/"output1.txt",input=BLD/"input.txt", sec=2))
defaultpipeline.add_task(slowfunction(BLD/"final1.txt",BLD/"output1.txt", sec=1))
exit(defaultpipeline.run())
First, we add a folder build
in which we want to produce our artifacts.
Then, we create an initial artifact build/input.txt
via touch
.
Thereafter, begins the interesting part:
We define a function slowfunction
that takes a couple of seconds to produce a output file from a given input file.
We annotate function with the @task
decorator and use the typing.Annotated
type to tell depio which arguments are depencendies and which are product of the function.
depion will parse this for us and setup the dependencies between the tasks.
Finally, we add the function calls to the pipeline via add_task
and run
the pipeline.
(2) Use via the functional interface
import time
import pathlib
from typing import Annotated
from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Product, Dependency
from depio.Task import Task
defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())
BLD = pathlib.Path("build")
BLD.mkdir(exist_ok=True)
print("Touching an initial file")
(BLD/"input.txt").touch()
def slowfunction(output: Annotated[pathlib.Path, Product],
input: Annotated[pathlib.Path, Dependency] = None,
sec:int = 0
):
print(f"A function that is reading from {input} and writing to {output} in {sec} seconds.")
time.sleep(sec)
with open(output,'w') as f:
f.write("Hallo from depio")
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t2 = defaultpipeline.add_task(Task("functionaldemo2", slowfunction, [2], depends_on=[t1]))
t3 = defaultpipeline.add_task(Task("functionaldemo3", slowfunction, [3], depends_on=[t2]))
exit(defaultpipeline.run())
However you can also define the DAG by yourself:
import time
import pathlib
from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Task
defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())
def slowfunction(sec:int = 0):
print(f"A function that is doing something for {sec} seconds.")
time.sleep(sec)
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t2 = defaultpipeline.add_task(Task("functionaldemo2", slowfunction, [2], depends_on=[t1]))
t3 = defaultpipeline.add_task(Task("functionaldemo3", slowfunction, [3], depends_on=[t2]))
exit(defaultpipeline.run())
How to develop
Create an editable egg and install it.
pip install -e .
How to test
Run
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file depio-0.0.2rc2.tar.gz
.
File metadata
- Download URL: depio-0.0.2rc2.tar.gz
- Upload date:
- Size: 26.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.20
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ab8d57f5cd60751024b6d7089f7f263a8eed502b8d4003fe99709f32b2fa036c |
|
MD5 | 881ffc832a533fbc3f6bea05f5797fbe |
|
BLAKE2b-256 | 50157d94d35d33abe60c3c2351025b37fe4d97dc49f92836866c710eb84bd431 |
File details
Details for the file depio-0.0.2rc2-py3-none-any.whl
.
File metadata
- Download URL: depio-0.0.2rc2-py3-none-any.whl
- Upload date:
- Size: 25.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.20
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | dcf82817995f437e4c60ab6f3f696825e4e93ef24d5a2852f486296830bbc466 |
|
MD5 | d44ef0424d20cb12d5a792091d1f47a8 |
|
BLAKE2b-256 | 32489d890e854c63621a6cdda31813b7a8c524ad0d71359f050a517d64576bbd |