Skip to main content

A lightweight workflow management system and task graph

Project description

Model tasks

An opinionated and lightweight workflow management system and task graph. Born from the desire to have a reusable code skeleton for geoprocessing and data pipelines projects this package offers:

  • Write cleaner code code in separate tasks
  • Formulate task dependencies and automatically resolve those as a direct acyclical graph (DAG)
  • Display your models
  • Parallelize concurrent tasks
  • Task configuration
  • Caching and invalidation of intermediate task results based on configuration
  • Easily generate a task graph documentation

Quickstart

A model consists of individual tasks which formulate their dependence on other tasks. Together they build one or more acyclical directed graphs, which do not allow loops or task repetition. Tasks are implement as subclasses of the ModelTask class. They can be either all defined within one file or within a folder of Python modules. The latter is more practical if the model grows and consists of many tasks.

Create a one-file model

model.py

from modeltasks import Model, ModelTask


my_model = Model(title='My Model', model_tasks=__file__)


class TaskA(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')
  
        
class TaskB(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running a B task in {workspace}')

Create a model with task modules

Prepare project structure

touch model.py
mkdir task_modules
touch task_modules/a_tasks.py
touch task_modules/b_tasks.py

model.py

from modeltasks import Model


my_model = Model(title='My Model', model_tasks='task_modules')

a_tasks.py

from modeltasks import ModelTask


class TaskA(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')

b_tasks.py

from modeltasks import ModelTask


class TaskB(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running a B task in {workspace}')

The above code creates a model with two simple tasks. But not a task graph yet because both tasks have not defined:

  • inputs (dependencies)
  • outputs (results)
  • configuration

Putting tasks into a workflow graph is achieved by defining task inputs and outputs. Let's look at the two example tasks we just created and assume that TaskB requires TaskA to run first and then use its output.

a_tasks.py (With output)

from modeltasks import ModelTask
from modeltasks.data import VariableOutput


class TaskA(ModelTask):

    a_output: VariableOutput

    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')
        self.a_output = 'First I ran task A.'

b_tasks.py (With dependency and output)

from modeltasks import ModelTask
from modeltasks.data import VariableInput, VariableOutput


class TaskB(ModelTask):

    a_input: VariableInput = 'a_tasks.TaskA'
    b_output: VariableOutput

    def run (self, logger, workspace):
        logger.info(f'Running an B task in {workspace}')
        self.b_output = f'{self.a_input} Then I ran task B.'

Run a model

To run a model, we need to specify an entry task. This is the task that will be run at the end after all of its required task dependencies have been resolved and their output gathered.

python3 model.py --run --task b_tasks.TaskB

Model visualization

Sometimes it is helpful to see a visual representation of all the task interdependencies. To render such a visual task graph call your model with:

python3 model.py --graph --output=mermaid.md (Mermaid file)
python3 model.py --graph --output=graph.png (Image file)

Documentation

To learn more about supported input and output types, dependency definition, task schedulers, result caching, etc. head over to the package documentation

Contribution

Please leave feedback, questions, suggestions on the project's issue tracker.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

modeltasks-0.1.18.tar.gz (36.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

modeltasks-0.1.18-py3-none-any.whl (40.5 kB view details)

Uploaded Python 3

File details

Details for the file modeltasks-0.1.18.tar.gz.

File metadata

  • Download URL: modeltasks-0.1.18.tar.gz
  • Upload date:
  • Size: 36.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.1

File hashes

Hashes for modeltasks-0.1.18.tar.gz
Algorithm Hash digest
SHA256 39aa81c3130a2f9ae89e28e4f7c5cbc1893083bae9bf3b8f81c43de12d877f42
MD5 03c507b6184bc994693e852aaec1cb17
BLAKE2b-256 27791d3d548f46d4b5e4c0c90b19da98bf720fbdec482a72088c76820c8c7e3b

See more details on using hashes here.

File details

Details for the file modeltasks-0.1.18-py3-none-any.whl.

File metadata

  • Download URL: modeltasks-0.1.18-py3-none-any.whl
  • Upload date:
  • Size: 40.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.1

File hashes

Hashes for modeltasks-0.1.18-py3-none-any.whl
Algorithm Hash digest
SHA256 a1172ff521442debd0ba5a5f7d6c05d467eb8ea34911a8263e18cfa452d255a1
MD5 dd34a168d0e1f16eee840c40805ac87b
BLAKE2b-256 84a8657a5cc2765363456d57646c133d46145351d5dad4e2a458fd1f96775e1f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page