Skip to main content

A lightweight workflow management system and task graph

Project description

Model tasks

An opinionated and lightweight workflow management system and task graph. Born from the desire to have a reusable code skeleton for geoprocessing and data pipelines projects this package offers:

  • Write cleaner code code in separate tasks
  • Formulate task dependencies and automatically resolve those as a direct acyclical graph (DAG)
  • Display your models
  • Parallelize concurrent tasks
  • Task configuration
  • Caching and invalidation of intermediate task results based on configuration
  • Easily generate a task graph documentation

Quickstart

A model consists of individual tasks which formulate their dependence on other tasks. Together they build one or more acyclical directed graphs, which do not allow loops or task repetition. Tasks are implement as subclasses of the ModelTask class. They can be either all defined within one file or within a folder of Python modules. The latter is more practical if the model grows and consists of many tasks.

Create a one-file model

model.py

from modeltasks import Model
from modeltasks.task import ModelTask

my_model = Model(title='My Model', model_tasks=__file__)

class TaskA(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')
        
class TaskB(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running a B task in {workspace}')

Create a model with task modules

Prepare project structure

touch model.py
mkdir task_modules
touch task_modules/a_tasks.py
touch task_modules/b_tasks.py

model.py

from modeltasks import Model

my_model = Model(title='My Model', model_tasks='task_modules')

a_tasks.py

from modeltasks.task import ModelTask

class TaskA(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')

b_tasks.py

from modeltasks.task import ModelTask

class TaskB(ModelTask):
    def run (self, logger, workspace):
        logger.info(f'Running a B task in {workspace}')

The above code creates a model with two simple tasks. But not a task graph yet because both tasks have not defined:

  • inputs (dependencies)
  • outputs (results)
  • configuration

Putting tasks into a workflow graph is achieved by defining task inputs and outputs. Let's look at the two example tasks we just created and assume that TaskB requires TaskA to run first and then use its output.

a_tasks.py (With output)

from modeltasks.task import ModelTask
from modeltasks.io import VariableOutput

class TaskA(ModelTask):

    a_output: VariableOutput

    def run (self, logger, workspace):
        logger.info(f'Running an A task in {workspace}')
        self.a_output = 'First I ran task A.'

b_tasks.py (With dependency and output)

from modeltasks.task import ModelTask
from modeltasks.io import VariableInput, VariableOutput

class TaskB(ModelTask):

    a_input: VariableInput = 'a_tasks.TaskA'
    b_output: VariableOutput

    def run (self, logger, workspace):
        logger.info(f'Running an B task in {workspace}')
        self.b_output = f'{self.a_input} Then I ran task B.'

Run a model

To run a model, we need to specify an entry task. This is the task that will be run at the end after all of its required task dependencies have been resolved and their output gathered.

python3 model.py --run --task b_tasks.TaskB

Documentation

To learn more about supported input and output types, dependency definition, task schedulers, result caching, etc. head over to the package documentation

Contribution

Please leave feedback, questions, suggestions on the project's issue tracker.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

modeltasks-0.1.8-py3-none-any.whl (37.9 kB view details)

Uploaded Python 3

File details

Details for the file modeltasks-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: modeltasks-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 37.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.10.4

File hashes

Hashes for modeltasks-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 764fd2bc2e75c2d9a3d5701ef710d78bcf28e1e212ccd1037f37dbda3ab71a0b
MD5 6ba07f63e3334e83293921faaa648d6a
BLAKE2b-256 4b47fb470f40c7fc7bf2b5caec1f729d4890948b3038c704d1b91d4256968b88

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page