Skip to main content

Tools to work with luigi.

Project description

Build status License Documentation status

Luigi-tools

This package extends and adds new features to the luigi package. Here are a few examples of these features:

  • add a new BoolParameter that automatically switch to explicit parsing when the default value is True (otherwise it is not possible to set it to False using the CLI).
  • add several types of optional parameters.
  • add a OutputLocalTarget class to help building an output tree.
  • add a mixin that adds a --rerun parameter that forces a given task to run again even if its targets exist, and also rerun all the tasks that depend on this one.
  • add a mixin to remove the output of failed tasks which is likely to be corrupted or incomplete. This feature applies the default behaviour of a snakemake rule (Task).
  • add a new @copy_params mechanism to copy the parameters from a task to another (the @inherits gives the same object to all the inheriting tasks while @copy_params only copies the definition of the parameter so each inheriting task can be given a different value).
  • add functions to get and display the dependency graph of a given task.
  • add a mechanism to setup templates for the luigi.cfg files, so the user just has to update specific values instead of copying the entire luigi.cfg.

Please read the complete API documentation for more details.

Installation

This package should be installed using pip:

pip install luigi-tools

Usage

The Luigi package describes itself as follow:

Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.

The luigi-tools package is supposed to make luigi easier for developers. The following presents a few examples of the main features of the package.

Boolean parameter

The luigi.BoolParameter can be parsed in two ways: implicit or explicit. The explicit way requires the user to enter a value: True of False. On the contrary, the implicit way requires no value and will just set the value to True if the parameter is given. This is not compatible with a default value set to True, as it is not possible to set the value to False using the CLI in this case.

If you want to automatically set the parsing to explicit when the default value is True:

from luigi.task import Task
from luigi_tools import BoolParameter

class MyTask(Task):

    a_boolean_parameter = BoolParameter(default=True)

    def run(self):
        pass

Target with prefix

The Luigi workflows are based on Target object that represents the state of a step of the workflow. These targets can be anything but are often files in a result directory tree. In order to not having to specify the result directory to each target, one can use the OutputLocalTarget class and give it a prefix. So all targets based on this class will be located in the same directory.

from luigi.task import Task
from luigi_tools.target import OutputLocalTarget

class MyTask(Task):

    def run(self):
        pass

    def output(self):
        # The target will point to the file result_directory/filename.ext
        return OutputLocalTarget("filename.ext")

# Set the default prefix (it could also be called inside another Task)
OutputLocalTarget.set_default_prefix("result_directory")

# Run the task (the task can also be called with the CLI as usual)
luigi.build([MyTask()], local_scheduler=True)

Rerunable task

In Luigi, the states of the tasks are deducted from their targets. If the targets exist, the task is assumed to have already been completed and is thus skipped if the workflow is run again. This behavior is usually good to avoid performing computations that are already completed. Nevertheless, sometimes it is desirable to overwrite a former result, especially during the development process. For this reason, a mixin that adds a --rerun parameter to a task is introduced. When this parameter is set to True, all the targets of this task are deleted as well as the targets of the tasks that depend on this one. So when all the tasks that are related to this task will run again. As for any mixin, it must be go on the left of the Task class in the inheritance list.

from luigi.task import Task
from luigi_tools.task import RerunMixin

class MyTask(RerunMixin, Task):

    def run(self):
        pass

Now the task MyTask has a boolean parameter --rerun which can be called in the CLI:

luigi -m my_module mytask --rerun
luigi -m my_module another_task_that_depends_on_mytask --MyTask-rerun

Clear the output of failed tasks

When a task fails unexpectedly, it may leave an incomplete or corrupted output that leads to wrong results in the downstream. With the RemoveCorruptedOutputMixin, Luigi automatically removes the output targets of the tasks that failed. This is the default behaviour in other workflow management systems such as Snakemake.

from luigi_tools.task import RemoveCorruptedOutputMixin

    class TaskA(RemoveCorruptedOutputMixin, luigi.Task):
        """TaskA can remove its output upon failure."""
        pass

The clean_failed is false by default and it must explicitly be set to true. This allows users to set it to false to debug the output without changing the code.

luigi -m my_module TaskA --clean_failed true

Copy parameters

In some situations, several tasks have a few parameters in common. This can lead to painful situations, and luigi provides some dedicated tools to deal with this, as described here. Nevertheless, the tools provided by Luigi have a major drawback: all the tasks with the inherited parameter will have the same value for this parameter. In some situations, one want to be able to give different values to a task with an inherited parameter, especially during the development process. This is possible with the @copy_params decorator:

from luigi.task import Task
from luigi_tools.task import copy_params

class TaskA(Task):
        a = luigi.Parameter(default="default_value_a")

@luigi_tools.task.copy_params(
    a=luigi_tools.task.ParamRef(TaskA)
)
class TaskB(Task):
    b = luigi.Parameter(default="b")

Here the class TaskB has two parameters:

  • a with default_value_a as default value.
  • b with b as default value.

It also possible to change the name of the parameter or to change the default value:

from luigi.task import Task
from luigi_tools.task import copy_params

class TaskA(Task):
        a = luigi.Parameter(default="default_value_a")

@luigi_tools.task.copy_params(
    a=luigi_tools.task.ParamRef(TaskA),
    aa=luigi_tools.task.ParamRef(TaskA, "a"),
    a_default=luigi_tools.task.ParamRef(TaskA, "a", "given_default_value"),
    a_none=luigi_tools.task.ParamRef(TaskA, "a", None),
)
class TaskB(Task):
    b = luigi.Parameter(default="b")

In this case the class TaskB has 5 parameters:

  • a with default_value_a as default value.
  • aa with a as default value.
  • a_default with given_default_value as default value.
  • a_none with None as default value.
  • b with b as default value.

Note that the second parameter of ParamRef is the name of the inherited parameter in the parent class. If it is not given, it is supposed that the parameter has the same name in both the inheriting and the parent classes.

Global parameters

In addition to the @copy_params decorator, it is possible to use the GlobalParamMixin mixin. A task with this mixin has a new feature for the parameters inherited using @copy_params: if the default value is not changed in ParamRef and if no specific value is given for the task, then the task would take the same value as one of the inherited parameter. This combination of the @copy_params decorator and GlobalParamMixin allows many ways of dealing with the parameters.

from luigi.task import Task
from luigi_tools.task import copy_params
from luigi_tools.task import GlobalParamMixin

class TaskA(Task):
        a = luigi.Parameter(default="default_value_a")

@luigi_tools.task.copy_params(
    a=luigi_tools.task.ParamRef(TaskA)
)
class TaskB(GlobalParamMixin, Task):
    b = luigi.Parameter(default="b")

Now if TaskB is called with the following configuration:

[TaskA]
a = "value for a"

[TaskB]
b = "value for b"

then the parameter a of TaskB has the value value for a. If TaskB did not inherit from GlobalParamMixin, then it would have the value default_value_a.

Dependency graph

The luigi-tools package provides several functions to get the dependency graph of a task and to render it using GraphViz. This can be very useful to show how the tasks of a workflow are organized.

Funding & Acknowledgment

The development of this software was supported by funding to the Blue Brain Project, a research center of the École polytechnique fédérale de Lausanne (EPFL), from the Swiss government’s ETH Board of the Swiss Federal Institutes of Technology.

For license and authors, see LICENSE.txt and AUTHORS.md respectively.

Copyright © 2021-2022 Blue Brain Project/EPFL

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

luigi-tools-0.3.4.tar.gz (60.9 kB view details)

Uploaded Source

Built Distribution

luigi_tools-0.3.4-py3-none-any.whl (23.3 kB view details)

Uploaded Python 3

File details

Details for the file luigi-tools-0.3.4.tar.gz.

File metadata

  • Download URL: luigi-tools-0.3.4.tar.gz
  • Upload date:
  • Size: 60.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for luigi-tools-0.3.4.tar.gz
Algorithm Hash digest
SHA256 a249be5cda54fdde5bc18dc32b21a1c6096e742ffc1d36bd2b5b58dff5f9c131
MD5 0e1da039050707d26b8b18f8d9803061
BLAKE2b-256 f1ef7f95172f2b8684339b2ff32d2a743bd380f382a107d175477fd12d06d41d

See more details on using hashes here.

File details

Details for the file luigi_tools-0.3.4-py3-none-any.whl.

File metadata

  • Download URL: luigi_tools-0.3.4-py3-none-any.whl
  • Upload date:
  • Size: 23.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for luigi_tools-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 0d80281dcefafd50157cfc2df9c3653651194433787361f6d56399c49f0e0cc7
MD5 424caf14bf3fef0c7dd6366ba5e04ba4
BLAKE2b-256 c75d271fea6d0c3fa645de0825557d7908e83b0704152e15516131e7c561b268

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page