Skip to main content

A lightweight workflow management system and task graph

Project description

Model tasks

An opinionated and lightweight workflow management system and task graph. Born from the desire to have a reusable code skeleton for geoprocessing and data pipelines projects this package offers:

  • Write cleaner code code in separate tasks
  • Formulate task dependencies and automatically resolve those as a direct acyclical graph (DAG)
  • Display your models
  • Parallelize concurrent tasks
  • Task configuration
  • Caching and invalidation of intermediate task results based on configuration
  • Easily generate a task graph documentation

Quickstart

A model consists of individual tasks which formulate their dependence on other tasks. Together they build one or more acyclical directed graphs, which do not allow loops or task repetition. Tasks are implement as subclasses of the ModelTask class. They can be either all defined within one file or within a folder of Python modules. The latter is more practical if the model grows and consists of many tasks.

Create a one-file model

model.py

from modeltasks import Model, ModelTask


my_model = Model(title='My Model', model_tasks=__file__)


class TaskA(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')
  
        
class TaskB(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running a B task in {workspace}')

Create a model with task modules

Prepare project structure

touch model.py
mkdir task_modules
touch task_modules/a_tasks.py
touch task_modules/b_tasks.py

model.py

from modeltasks import Model


my_model = Model(title='My Model', model_tasks='task_modules')

a_tasks.py

from modeltasks import ModelTask


class TaskA(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')

b_tasks.py

from modeltasks import ModelTask


class TaskB(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running a B task in {workspace}')

The above code creates a model with two simple tasks. But not a task graph yet because both tasks have not defined:

  • inputs (dependencies)
  • outputs (results)
  • configuration

Putting tasks into a workflow graph is achieved by defining task inputs and outputs. Let's look at the two example tasks we just created and assume that TaskB requires TaskA to run first and then use its output.

a_tasks.py (With output)

from modeltasks import ModelTask
from modeltasks.data import VariableOutput


class TaskA(ModelTask):

    a_output: VariableOutput

    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')
        self.a_output = 'First I ran task A.'

b_tasks.py (With dependency and output)

from modeltasks import ModelTask
from modeltasks.data import VariableInput, VariableOutput


class TaskB(ModelTask):

    a_input: VariableInput = 'a_tasks.TaskA'
    b_output: VariableOutput

    def run (self, logger, workspace):
        logger.info(f'Running an B task in {workspace}')
        self.b_output = f'{self.a_input} Then I ran task B.'

Run a model

To run a model, we need to specify an entry task. This is the task that will be run at the end after all of its required task dependencies have been resolved and their output gathered.

python3 model.py --run --task b_tasks.TaskB

Model visualization

Sometimes it is helpful to see a visual representation of all the task interdependencies. To render such a visual task graph call your model with:

python3 model.py --graph --output=mermaid.md (Mermaid file)
python3 model.py --graph --output=graph.png (Image file)

Documentation

To learn more about supported input and output types, dependency definition, task schedulers, result caching, etc. head over to the package documentation

Contribution

Please leave feedback, questions, suggestions on the project's issue tracker.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

modeltasks-0.1.19.tar.gz (36.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

modeltasks-0.1.19-py3-none-any.whl (40.5 kB view details)

Uploaded Python 3

File details

Details for the file modeltasks-0.1.19.tar.gz.

File metadata

  • Download URL: modeltasks-0.1.19.tar.gz
  • Upload date:
  • Size: 36.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for modeltasks-0.1.19.tar.gz
Algorithm Hash digest
SHA256 c9cbf6592d301163a535e7611af78658a8aeae25615df4e3323510834c2fe691
MD5 66ea44d9b9f38d706559357feed46ed9
BLAKE2b-256 4f5a420f77b7f2b80eaee52b58fae9342887fbf4181ae3dbeddca526854d14d8

See more details on using hashes here.

File details

Details for the file modeltasks-0.1.19-py3-none-any.whl.

File metadata

  • Download URL: modeltasks-0.1.19-py3-none-any.whl
  • Upload date:
  • Size: 40.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for modeltasks-0.1.19-py3-none-any.whl
Algorithm Hash digest
SHA256 d9ed9b166aef9950c8029322cd211cd81a367e34a16dad2d66a6e76d1a1baf87
MD5 60f77e7fe47e88174919e25b6b3696c6
BLAKE2b-256 b5f54428e0cb066765138e3837915a7d744383a421c0c8f9f07cb1a89f46c5a0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page