Skip to main content

lib to build dagster pipelines from standalone notebooks

Project description

Dsml4s8e

Dsml4s8e is a Python library that extends Dagster to help manage the ML workflow around Jupyter notebooks development.

Dsml4s8e addresses issues:

  1. Building of pipelines from standalone notebooks
  2. Standardizing a structure of ML/DS pipeline projects to easy share and continuous improve them
  3. Managing constantly evolving pipelines

Dsml4s8e designed to support the following workflow:

  1. Define a project structure and a structure of the pipeline data catalog for your pipeline by using class Storage Catalog ABC
  2. Develop standalone Jupyter notebooks corresponding specification requirements
  3. Difine a pipeline -- a sequence of notebooks and deloy the pipelien in vary environments(experimental/test/prod)
  4. Execute pipelines many times with different configurations in vary environments and on vary infrastructure

A job is a set of notebooks arranged into a DAG. Dsml4s8e simplify jobs definition based on a set of Jupyter notebooks.

# dag.py

from dsml4s8e.define_job import define_job
from dagstermill import local_output_notebook_io_manager
from dagster import job

from pathlib import Path


@job(
    name='simple_pipeline',
    tags={"cdlc_stage": "dev"},
    resource_defs={
        "output_notebook_io_manager": local_output_notebook_io_manager,
    }
)
def dagstermill_pipeline():
    module_path = Path(__file__)
    define_job(
        root_path=module_path.parent.parent,
        nbs_sequence=[
            "data_load/nb_0.ipynb",
            "data_load/nb_1.ipynb",
            "data_load/nb_2.ipynb"
        ]
    )

In this code block, we use define_job to automate a dagstermil job definition: As a result, a dagster pipeline will be built from standalone notebooks:

Simple Pipeline

Dsml4s8e proper for build a cloud agnostic DSML platform.

You can play with a demo pipeline skeleton project.

Installation of local dev container

# Create a work directory to clone a git repository. The work directory will be mounted to the container
mkdir work
#Step into the work directory
cd work
# Clone repository
git clone https://github.com/dsml4/dsml4s8e.git
# Step into a derictory with a Dockerfile
cd dsml4s8e/images/dev
# Build a image
docker build -t dsml4s8e .
# Go back into the work directory to a correct using pwd command inside the next docker run instruction
cd ../../../
# Create and run a container staying in the work directory.
docker run --rm --name my_dag -p 3000:3000 -p 8888:8888 -v $(pwd):/home/jovyan/work -e DAGSTER_HOME=/home/jovyan/work/daghome dsml4s8e bash setup_pipeline.sh

Open JupyterLab in a browser: http://localhost:8888/lab

Open Dagster in a browser: http://localhost:3000/

simple_pipeline_Overview

The standalone notebook specification

A standalone notebook is a main building block in our pipelines development flow.

To transform a standalone notebook to Dagster Op(a pipeline step) we need to add 4 specific cells to the notebook. Next, we will discuss what concerns are addressed each of the cells and what library classes are responsible for each one.

notebook_4_cell_specification

Cell 1: Op parameters defenition

In a cell with the tag op_parameters defines parameters which will be transformed and passed to define_dagstermill_op as arguments on a job definition stage. On the Dagster job definition stage this cell will be called and parametrs needed to define Op will be passed from the cell 'op' variabel to the function define_dagstermill_op from the dagstermill library. Then, these parameters will be avalible in Dagster Launchpad to edit run configuration in the launge stage.

A definition of a Dagster op in a standalone notebook in JupyterLab:

op_parameters

Configure op in Dagster Launchpad:

simple_pipeline

Cell 2: Op context initialization

Define variabel with 'contex' name in a cell wiht the tag parametrs.

A method get_context passes default values from config_schema in cell op_parameters to context variable.

context = op.get_context()

Now, if a notebook is executed by Dagster as an Op then it is replaced the parameters cell with the injected-parameters cell. Thus, the variable context can be used in the notebook body when the notebook is executed in standalone mode and when the notebook is executed as Dagster Op.

To be clear, let's look at one of the notebooks executed by Dagster:

runs open_path

Notice that the injected-parameters cell in your output notebook defines a variable called context.

out_nb_2

Cell 3: data catalog initialization

A data catalog is a set of paths. A catalog path is a unique name of a data entity in pipelines namespaces. Catalog paths are used for linking ops into pipeline.

op = NbOp(
    config_schema={
        "b": Field(
            int,
            description='this is the paramentr description',
            default_value=20
            )
    },
    ins={
         'simple_pipeline.data_load.nb_0.data0':'path_nb_0_data0',
         'simple_pipeline.data_load.nb_1.data1':'path_nb_1_data1',
         'simple_pipeline.data_load.nb_1.data3':'path_nb_1_data3',
         },
    outs=['data2']
)

In this code block, we use the ins argumet to link outputs produced by nb_0 and nb_1 with inputs of nb_2. The keys if the ins dict are catalog paths. And the values are names of paths variables which are used in a notebook. Catalog paths and paths variables names are generated automatically by the outs list, the class NbOp is responsible for generating.

A name of a path variable must be unique for each notebook namespace where the variable is used, thus a name of path variable could be shorter than the corresponding catalog path.

The method get_catalog() use 'DagsterStorageCatalog' derived from StorageCatalogABC to create paths in a file system (storage) catalog. Creating of file system paths for each the data entity name in outs list must be implemented in the method get_outs_data_paths.

class StorageCatalogABC(ABC):

    @abstractmethod
    def __init__(self, runid):
        self.runid = runid

    @abstractmethod
    def is_valid(self) -> bool:
        ...

    @abstractmethod
    def get_outs_data_paths(
        self,
        catalog: DataCatalogPaths
    ) -> Dict[str, str]:
        """
        A map of catalog paths to storage paths
        """
        ...

A catalog structure in file system is fully customizable using StorageCatalogABC.

Initialize_catalog_outs

Cell 4: passing notebook outputs to next pipeline steps

In the last cell we inform next steps of the pipeline where data produced by the current notebook is stored. Now we can link this notebook(step) with the next notebooks representing pipeline steps. To do this we need to declare this notebook outputs as inputs for the next netbooks.

op.pass_outs_to_next_step()

Format of output message returning by pass_outs_to_next_step() allow as to copy strings from the cell output and paste them to NbOp declarations in the cells op_parametrs of the pipeline next steps notebooks. Look at the illustration below:

copy_passte_ins_variables

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dsml4s8e-0.1.5.tar.gz (12.3 kB view hashes)

Uploaded Source

Built Distribution

dsml4s8e-0.1.5-py2.py3-none-any.whl (12.5 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page