Skip to main content

A simple task manager with slurm integration

Project description

depio

python-package.yml

A simple task manager with slurm integration.

How to use

We start with setting up a Pipeline:

from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor

defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())

To this pipeline object you can now add Tasks. There are two ways how you can add tasks. The first (1) is via decorators and the second (2) is a function interface. Before we consider the differences we start with parts that are similar for both.

(1) Use via decorators

To add tasks via decorators you need use the @task("datapipeline") decorator from depio.decorators.task:

import time
import pathlib
from typing import Annotated

from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Product, Dependency
from depio.decorators import task

defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())

BLD = pathlib.Path("build")
BLD.mkdir(exist_ok=True)

print("Touching an initial file")
(BLD/"input.txt").touch()

@task("datapipeline")
def slowfunction(output: Annotated[pathlib.Path, Product],
            input: Annotated[pathlib.Path, Dependency] = None,
            sec:int = 0
            ):
    print(f"A function that is reading from {input} and writing to {output} in {sec} seconds.")
    time.sleep(sec)
    with open(output,'w') as f:
        f.write("Hallo from depio")

defaultpipeline.add_task(slowfunction(BLD/"output1.txt",input=BLD/"input.txt", sec=2))
defaultpipeline.add_task(slowfunction(BLD/"final1.txt",BLD/"output1.txt", sec=1))

exit(defaultpipeline.run())

First, we add a folder build in which we want to produce our artifacts. Then, we create an initial artifact build/input.txt via touch. Thereafter, begins the interesting part: We define a function slowfunction that takes a couple of seconds to produce a output file from a given input file. We annotate function with the @task decorator and use the typing.Annotated type to tell depio which arguments are depencendies and which are product of the function. depion will parse this for us and setup the dependencies between the tasks. Finally, we add the function calls to the pipeline via add_task and run the pipeline.

(2) Use via the functional interface

import time
import pathlib
from typing import Annotated

from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Product, Dependency
from depio.Task import Task

defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())

BLD = pathlib.Path("build")
BLD.mkdir(exist_ok=True)

print("Touching an initial file")
(BLD/"input.txt").touch()

def slowfunction(output: Annotated[pathlib.Path, Product],
            input: Annotated[pathlib.Path, Dependency] = None,
            sec:int = 0
            ):
    print(f"A function that is reading from {input} and writing to {output} in {sec} seconds.")
    time.sleep(sec)
    with open(output,'w') as f:
        f.write("Hallo from depio")


t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t2 = defaultpipeline.add_task(Task("functionaldemo2", slowfunction, [2], depends_on=[t1]))
t3 = defaultpipeline.add_task(Task("functionaldemo3", slowfunction, [3], depends_on=[t2]))

exit(defaultpipeline.run())

However you can also define the DAG by yourself:

import time
import pathlib

from depio.Pipeline import Pipeline
from depio.Executors import ParallelExecutor
from depio.Task import Task

defaultpipeline = Pipeline(depioExecutor=ParallelExecutor())

def slowfunction(sec:int = 0):
    print(f"A function that is doing something for {sec} seconds.")
    time.sleep(sec)


t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t1 = defaultpipeline.add_task(Task("functionaldemo1", slowfunction, [1]))
t2 = defaultpipeline.add_task(Task("functionaldemo2", slowfunction, [2], depends_on=[t1]))
t3 = defaultpipeline.add_task(Task("functionaldemo3", slowfunction, [3], depends_on=[t2]))

exit(defaultpipeline.run())

How to develop

Create an editable egg and install it.

pip install -e .

How to test

Run

pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

depio-0.0.2rc2.tar.gz (26.8 kB view details)

Uploaded Source

Built Distribution

depio-0.0.2rc2-py3-none-any.whl (25.1 kB view details)

Uploaded Python 3

File details

Details for the file depio-0.0.2rc2.tar.gz.

File metadata

  • Download URL: depio-0.0.2rc2.tar.gz
  • Upload date:
  • Size: 26.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.20

File hashes

Hashes for depio-0.0.2rc2.tar.gz
Algorithm Hash digest
SHA256 ab8d57f5cd60751024b6d7089f7f263a8eed502b8d4003fe99709f32b2fa036c
MD5 881ffc832a533fbc3f6bea05f5797fbe
BLAKE2b-256 50157d94d35d33abe60c3c2351025b37fe4d97dc49f92836866c710eb84bd431

See more details on using hashes here.

File details

Details for the file depio-0.0.2rc2-py3-none-any.whl.

File metadata

  • Download URL: depio-0.0.2rc2-py3-none-any.whl
  • Upload date:
  • Size: 25.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.20

File hashes

Hashes for depio-0.0.2rc2-py3-none-any.whl
Algorithm Hash digest
SHA256 dcf82817995f437e4c60ab6f3f696825e4e93ef24d5a2852f486296830bbc466
MD5 d44ef0424d20cb12d5a792091d1f47a8
BLAKE2b-256 32489d890e854c63621a6cdda31813b7a8c524ad0d71359f050a517d64576bbd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page