Skip to main content

Dagster is an opinionated pipeline runner.

Project description

Dagster

.. docs-include

Dagster is a library that helps you organize, run and test your data processing pipelines. Instead of writing ad-hoc data processing script, Dagster encourages separating data input, data processing and data output, as well as separating your processing in several steps.

The core abstraction in Dagster is a Solid, a logical unit of computation in a data pipeline. At its core a Solid is a coarse-grained pure function. It takes multiple inputs, performs a computation, produces an output. Inputs can either be received from outputs from upstream solids in the pipeline or external sources (e.g. files). Likewise, outputs from the transform can either be flowed to downstream solids or materialized (e.g. to a file in an object store) so that it can be inspected or accessed by an external system.

Solid formally separates the notion of inputs, outputs, the core transform. The core goal of this is cleanly and clearly separate the domain logic of each transform and its operational environments. This allows solids or pipelines of solids to easily be executable in a number of environments: unit-testing, local development, integration tests, CI/CD, staging environments, production environments, and so on and so forth.

Alongside with the core abstraction, Dagster provides helpers to create Solids that operate on Pandas dataframes and SQL databases.

Example

.. code-block:: python

import pandas as pd import dagster.core from dagster.core.definitions import ( InputDefinition, OutputDefinition ) from dagster.core.decorators import ( solid, source, materialization, )

Sources define how to get data. Let's define a source that

reads a CSV file and returns a pandas dataframe

@source( # Name of the source name="CSV", # What arguments it should get from environment argument_def_dict={'path': ArgumentDefinition(dagster.core.types.Path) }) ) def csv_to_dataframe_source(path): return pd.read_csv(path)

Materializations define how to convert a transform result

into an artifact, for example a file. Let's define one that

outputs a CSV file. Note that materializations that are

defined for solids are optional, and are only triggered if

environment says so

@materialization( # Name of the materialization name="CSV", # What arguments it should get from environment argument_def_dict={'path': ArgumentDefinition(dagster.core.types.Path) }) ) def dataframe_to_csv_materialization(data, path): data.to_csv(path)

Solids can be created by annotating transform function with

a decorator

@solid( # Solid inputs define arguments passed to transform function inputs=[ InputDefinition( name='num', # Inputs can have sources. While inputs define # what transform should receive, sources define # how this data should be retrieved sources=[csv_to_dataframe_source] ) ], # Solid output determines what solid should return. output=OutputDefinition(materializations=[ dataframe_to_csv_materialization, ]) ) def sum_solid(num): sum_df = num.copy() # Here we add a new column to dataframe to sum up num1 and # num2 columns sum_df['sum'] = sum_df['num1'] + sum_df['num2'] return sum_df

@solid( inputs=[ # This solid is a dependency. It depends on a result # of previous solid as one of the inputs InputDefinition(name="sum", depends_on=sum_solid) ], output=OutputDefinition(materializations=[ dataframe_to_csv_materialization, ]) ) def sum_sq_solid(sum): sum_sq = sum.copy() sum_sq['sum_sq'] = sum['sum']**2 return sum_sq

After definining a solid, we are grouping them into a pipeline

pipeline = dagster.core.pipeline( name='pandas_hello_world', solids=[ sum_solid, sum_sq_solid, ], )

You might notice that there is no actual CSV file specified as inputs. This is because such parameters are passed in environment. This allows you to customize it in runtime. To run your solid, we'll pass that environment to the execution function.

.. code-block:: python

environment = config.Environment( sources={ 'sum_solid' : { 'num' : config.Source(name='CSV', args={'path': 'path/to/num.csv'}) } } )

pipeline_result = dagster.execute_pipeline( dagster.ExecutionContext(), pipeline, environment )

We can simplify the above example by using built-in dagster pandas inputs and outputs.

.. code-block:: python

import dagster.core from dagster import config from dagster.core.decorators import solid, with_context import dagster.pandas_kernel as dagster_pd

@solid( inputs=[ # We are using a pre-made input that should be a dataframe dagster_pd.dataframe_input( 'num', sources=[ # A built-in pandas csv-dataframe source reads # a CSV file andproduces a pandas dataframe dagster_pd.csv_dataframe_source() ] ) ], # This built-in dataframe knows how to materialize dataframes # out of the box output=dagster_pd.dataframe_output() ) def sum_solid(num): sum_df = num.copy() # Here we add a new column to dataframe to sum up num1 and num2 columns sum_df['sum'] = sum_df['num1'] + sum_df['num2'] return sum_df

@solid( inputs=[ # This input will check that the source solid outputs a # dataframe dagster_pd.dataframe_dependency(name="sum", solid=sum_solid) ], output=dagster_pd.dataframe_output() ) def sum_sq_solid(sum): sum_sq = sum.copy() sum_sq['sum_sq'] = sum['sum']**2 return sum_sq

After definining a solid, we are grouping them into a pipeline

pipeline = dagster.core.pipeline( name='pandas_hello_world', solids=[ sum_solid, sum_sq_solid, ], )

We can specify in order to get artifacts for the results. We can materialize output from any solid, this can be useful to see if intermediate results make sense.

.. code-block:: python

environment = config.Environment(
    sources={
      'sum' : {
        'num' : config.Source(name='CSV', args={'path': 'path/to/num.csv'})
      }
    }
)

materializations = [ config.Materialization( solid='sum', name='CSV', args={'path': 'path/to/output.csv'}, ), config.Materialization( solid='sum_sq', name='CSV', args={'path': 'path/to/output.csv'}, ) ]

pipeline_result = dagster.execute_pipeline( dagster.ExecutionContext(), pipeline, environment, materializations, )

Dagster CLI

In addition to programmatic API, you can also use dagster CLI to run the pipelines. In that case the environment is specified through yaml configuration files.

The folder structure should be as follows.

.. code-block

pipeline_project_name/ pipelines.yml pipeline_module_1/ env.yml pipeline_module_2/ env.yml

Pipelines yml specify the pipelines that are present in current project. Env specifies environment for each particular pipeline.

.. code-block:: yaml

pipelines: - module: pipeline_project_name.pipeline_module_1.pipeline fn: define_pipeline - module: pipeline_project_name.pipeline_module_2.pipeline fn: define_pipeline

.. code-block:: yaml

environment: inputs: - input_name: num source: CSV args: path: "input/num.csv"

materializations: - solid: sum type: CSV args: path: 'sum.csv' - solid: sum_sq type: CSV args: path: 'sum_sq.csv'

.. code-block:: sh

pip install dagster
# List pipelines
dagster pipeline list
# Print info about pipeline solids
dagster pipeline print pipeline1
# Execute pipeline
dagster pipeline execute pipeline1
# Execute pipeline from intermediate step
dagster pipeline execute pipeline1 --from-solid SOLID_NAME

Concepts

Transform

This is core, user-defined transform that performs the logical data computation. In this case the transform is hello_world_transform_fn and it is passed as parameter into the solid. It takes one or more inputs and produces an output. All other concepts in a solid are the metadata and structure that surround this core computation

Inputs

For each argument to the transform function, there is one InputDefinition object. It has a name, which must match the parameters to the transform function. The input definitions define a name, a dependency for the input (what upstream solid produces its value, see below) and a number of sources. An input definition must specify at least a dependency or a source. The input can have any number of sources.

Sources ^^^^^^^

Sources are the the way that one can create an input to a transform from external resources. A source is a function that takes a set of arguments and produces a value that is passed to the transform for processing. In this case, a CSV file is the only source type. One can imagine adding other source types to create pandas dataframes for Json, Parquet, and so forth. End users will typically rely on pre-existing libraries to specify sources.

Sources also declare what arguments they expect. These are inspectable and could be used to render information for web or command line tools, to verify the validity of confie files, and other tooling contexts. The framework verifies when solids or pipelines of solids are executed, that the arguments abide by the declaration. These arguments are then sent to the source function in the arg_dict parameter.

Output

The OutputDefinition represents the output of the transform function.

Materializations ^^^^^^^^^^^^^^^^

Materializations are the other end of source. This specifies the way the output of a transform can be materialized. In this example which uses pandas dataframes, the sources and materializations will be symmetrical. In the above example we specified a single materialization, a CSV. One could expand this to include JSON, Parquet, or other materialiations as appropriate.

However in other contexts that might be true. Take solids that operate on SQL-based data warehouses or similar. The core transform would be the SQL command, but the materialization would specify a CREATE TABLE AS, a table append, or a partition creation, as examples.

Higher-level APIs

These definitions will typically be composed with higher level APIs. For example, the above solid could be expressed using APIs provided by the pandas kernel. (Note: the "kernel" terminology is not settled)

.. code-block:: python

import dagster
import dagster.pandas_kernel as dagster_pd

def sum_transform(num_df):
    num_df['sum'] = num_df['num1'] + num_df['num2']
    return num_df

sum_solid = SolidDefinition(
    name='sum',
    description='This computes the sum of two numbers.'
    inputs=[dagster_pd.dataframe_csv_input(name='num_df')],
    transform_fn=sum_transform,
    output=dagster_pd.dataframe_output(),
)

Execution

These are useless without being able to execute them. In order to execute a solid, you need to package it up into a pipeline.

.. code-block:: python

pipeline = dagster.PipelineDefinition(name='hello_world', solids=[sum_solid])

Then you an execute it by providing an environment. You must provide enough source data to create all the inputs necessary for the pipeline.

.. code-block:: python

environment = config.Environment(
    sources={
      'sum' : {
        'num_df' : config.Source(name='CSV', args={'path': 'path/to/input.csv'})
      }
    }
)

pipeline_result = dagster.execute_pipeline(
    dagster.ExecutionContext(),
    pipeline,
    environment
)

print(pipeline_result.result_named('sum').transformed_value)

Execute pipeline does a purely in-memory transform, materializing nothing. This is useful in testing and CI/CD contexts.

Materialization

In order to produce outputs that are available to external systems, you must materialize them. In this case, that means producing files. In addition to your environment, you must specify your materializations.

.. code-block:: python

materializations = [
    config.Materialization(
        solid='sum',
        name='CSV',
        args={'path': 'path/to/output.csv'},
    )
]

dagster.execute_pipeline(
    dagster.ExecutionContext(),
    pipeline,
    environment,
    materializations,
)

Dependencies

So far we have demonstrated a single stage pipeline, which is obviously of limited value.

Imagine we wanted to add another stage which took the sum we produced and squared that value. (Fancy!)

.. code-block:: python

def sum_sq_transform(sum_df):
    sum_df['sum_sq'] = sum_df['sum'] * sum_df['sum']
    return sum_df

# Fully expanded InputDefintion. Should be wrapped in higher-level
# but this is here for explanatory code.
sum_sq_input = InputDefinition(
    name='sum_df',
    sources=[
        SourceDefinition(
            source_type='CSV',
            argument_def_dict={'path': ArgumentDefinition(types.Path)},
            source_fn=lambda arg_dict: pd.read_csv(arg_dict['path']),
        ),
    ],
    depends_on=sum_solid,
)

sum_sq_solid = SolidDefinition(
    name='sum_sq',
    inputs=[sum_sq_input],
    transform_fn=sum_sq_transform,
    output=dagster_pd.dataframe_output(),
)

Note that input specifies that dependency. This means that the input value passed to the transform can be generated by an upstream dependency OR by an external source. This allows for the solid to be executable in isolation or in the context of a pipeline.

.. code-block:: python

pipeline = dagster.PipelineDefinition(solids=[sum_solid, sum_sq_solid])

environment = config.Environment(
    sources={
      'sum' : {
          'num_df' : config.Source(name='CSV', args={'path': 'path/to/num.csv'})
      }
    }
)

pipeline_result = dagster.execute_pipeline(
    dagster.ExecutionContext(),
    pipeline,
    environment
)

The above executed both solids, even though one input was provided. The input into sum_sq_solid was provided by the upstream result from the output of sum_solid.

You can also execute subsets of the pipeline. Given the above pipeline, you could specify that you only want to specify the first solid:

.. code-block:: python

environment = config.Environment(
    sources={
        'num_df' : config.Source(name='CSV', args={'path': 'path/to/num.csv'})
    }
)

pipeline_result = dagster.execute_pipeline(
    dagster.ExecutionContext(),
    pipeline,
    environment,
    through=['sum'],
)

Or you could specify just the second solid. In that case the environment would have to be changed.

.. code-block:: python

environment = config.Environment(
    sources={
      'sum_sq' : { 
          'sum_df' : config.Source(name='CSV', args={'path': 'path/to/sum.csv'})
      }
    }
)

pipeline_result = dagster.execute_pipeline(
    dagster.ExecutionContext(),
    pipeline,
    environment,
    from=['sum_sq'],
    through=['sum_sq'],
)

Expectations

Expectations are another reason to introduce logical seams between data computations. They are a way to perform data quality tests or statistical process control on data pipelines.

TODO: Complete this section when the APIs and functionality are more fleshed out.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster-0.1.7.tar.gz (64.4 kB view details)

Uploaded Source

Built Distribution

dagster-0.1.7-py3-none-any.whl (86.3 kB view details)

Uploaded Python 3

File details

Details for the file dagster-0.1.7.tar.gz.

File metadata

  • Download URL: dagster-0.1.7.tar.gz
  • Upload date:
  • Size: 64.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.19.1 setuptools/40.1.0 requests-toolbelt/0.8.0 tqdm/4.24.0 CPython/3.6.6

File hashes

Hashes for dagster-0.1.7.tar.gz
Algorithm Hash digest
SHA256 3e56b1a4d89b5921ec0ae0bbcf40fdc075222d8b94b2ef8a9395fc6f89a437ed
MD5 4c543350e415813c2462098aac46e03b
BLAKE2b-256 dd47f595270c93b6e0b6f4c309cc650d248cdb0662d1a5ea1051ceb7418a8fe2

See more details on using hashes here.

File details

Details for the file dagster-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: dagster-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 86.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.19.1 setuptools/40.1.0 requests-toolbelt/0.8.0 tqdm/4.24.0 CPython/3.6.6

File hashes

Hashes for dagster-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 22b652a1d21d176cf932642a17af0dba5a83c5b865e30b8892c58427455d7f1f
MD5 8eb264ba34834a855bd40d795c30bdd8
BLAKE2b-256 da1c1a6aed8222f555c485caccb8ff996bb206d51d73c0b49223f2820bb8ab3f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page