Skip to main content

Create dagster pipelines by using reuseable modules and yaml configuration

Project description

Dagster Factory Pipelines

This python package allows to create dagster pipelines by using reuseable modules and yaml configuration.

Features

  • Provide base classes for own module, triggers, asset_checks creation and a simple way of registration for new models
  • Out of the boxes has predefined modules for http_requests, pandas operations, arcgis, csv store
  • Supports templating with jinja syntax
  • Configuration of a data pipeline via yaml
  • Provides simplify definition for job schedules, triggers, partitions, hooks, i/o manager

How to use dagster yaml pipelines in the project

pip install dagster-factory-pipelines

Define Dagsters defs

from dagster_factory_pipelines import DagsterPipeline

pipeline = DagsterPipeline()
pipeline.run()

defs = pipeline.get_definition()

By default DagsterPipelines looking for the main.yaml file in the root directory of the python project. It can be changed to any other files if needed.

How to create own module

Modules can be created in any python files. The most important part to make sure that module is registered during DagsterPipeline runtime.

Dagster asset module

There are two requirements for the module. It should use abstract class and it should be registered and accessible to pipeline module during runtime.

ModuleBase abstract class has important information about asset from the yaml configuration, such as asset ins, dependencies, partition, group name, asset name.

@register_module("questdb.api")
class QuestDbGet(ModuleBase):
    """
    Retrieves data from quest db api and store it as a pandas dataframe
    """

    endpoint: str
    query: str
    # auth staff later

    def create_asset(self) -> AssetsDefinition:
        @asset(
            kinds = ["python"],
            description="Gets data from QuestDB and returns as dataframe",
            **self.asset_args
        )
        def get_quest_db_data(context:OpExecutionContext) -> pd.DataFrame:
            self.create_pk(context)
            context.log.info(f"QuestDB endpoint:{self.endpoint}, Query: {self.query}")
            res = Request(self.endpoint, [], None, params={"query":self.query.format(**self.pk)})
            data = res.get_data()
            data = data.json()
            columns = [col["name"] for col in data["columns"]]
            return pd.DataFrame(pd.DataFrame(data.get("dataset"), columns=columns))
        return get_quest_db_data

create_asset method should be declared. In this method we simply create traditional dagster asset. We can use all params catched from the yaml file via **self.asset_args argument.

These are required values for the module. For the type checking the base pydantic module is used here.

    endpoint: str
    query: str

As a result we can start using this module directly in the configuration file.

- asset: quest_db_data
  module: questdb.api
  params:
    endpoint:
    query:

If any of required parameters are missing Dagster will notify user about missing values in the configuration.

Dagster sensor module

Dagster asset check module

Dagster pipeline

pipeline structured in the logical way for Dagster

environment:
  io_manager:
    s3:

resources:
  - resource: name of the registered resource
    name: name of the resource that will be used in dagster
    params: additional params for the defined resource

jobs:
  - template: template_path
    prefix: Optional prefix
    vars:
      var1: var1_value
      ....
      varnN: varN_value

  - job:
      triggers:
        - trigger: registered trigger name
          params:
            param1: param1
            paramN: paramN

      hooks:
        - hook: registered hook name

      partition:
        type: type of the date partition
        start: start date
        end: end date
        elements: list of static elements for the partition

      schedule:
        m: execution minute
        h: hour of execution
        active: state of the schedule
        cron: cron job based schedule required for non partition schedule

      assets:
        - asset: name of the asset
          ins: name of the in(only one possible)
          deps: list of dependencies. Name of the previous assets
          group: name of the group
          module: module name
          params: additional params for module
            param1: param1
            paramN: paramN
          checks: asset checks can be used for non partitioned asset
            - check: name of the registered check
              params:
                param1: param1
                paramN: paramN

        - template: path to asset template
            prefix: prefix for the template
            vars: variables defined in the template
              var1: var1
              varN: varN

Job

At the moment job definition is always required. It is impossible to create asset only pipeline.

Jobs can be defined in two ways. From a separate file via template or directly in the file.

jobs:
  - job:

  - template: jobs/template.yaml

Template

Templates support jinja syntax. This allows to create reuseable templates for the repetitive tasks.

templates should start jobs: -job not like job:

Correct syntax for the template

jobs:
  - job:

  - template:
Parameter Type Required Default Value Description
prefix String No None Prefix should be used when template is used between many different jobs. Prefix is added to the end of each asset name in a template. The root prefix will overwrite all other defined prefixes if they are present. This flow provides the consistent names for the assets
vars Dict[String] No None User can define own variables in a template by using Jinja syntax. It allows to make template reuseable across multiple pipelines.
template String Yes None Path to the template. Relative path to code location.

It is possible define new template in templates.

Partition

The partition can be defined on a job level. All partitions will be shared across assets inside a job. Many assets provides additional features for partitioned assets. http module can use stat date and element. It allows dynamically inject partition keys to the API endpoint.

partition:
    type: "daily"
    start: "2024-07-20"
    end: "2024-07-21"
    elements:
    - "320669904"
Parameter Type Required Default Value Description
type String required for time based partition None Possible values are hourly, daily, weekly
start String Yes None Format of the date YYYY-mm-dd
end String No None Should be set if partition will not be using schedules, set time boundary if needed
elements List[String] No None Creates category based partition, can be used together with date partition, in this case a multi partition will be created

Triggers

Triggers allow to run a job based on required conditions. To create triggers Dagster Sensor component is used.

Triggers are allowed only on job level. At the moment trigger simply triggers another job, if the previous was successful.

  triggers:
    - trigger: on_job
Parameter Type Required Default Value Description
trigger String Yes None The name of the registered trigger
params Dict[String] No None Required parameters for the selected trigger

Schedule

It is possible to define a schedule for a job. I it is working with both partitioned and non partitioned jobs. In a partition based job the following configuration should be used. For non partition the cron syntax should be used.

Parameter Type Required Default Value Description
m Integer Yes None Minute of the execution
h Integer Yes None Hour of the execution
active Bool No None Should schedule be active or not by default
cron String Yes(non partition job) None Schedule defined via cronjob syntax

Examples:

Partition job

schedule:
  m: 00
  h: 3
  active: true

At the moment active does not work as it should be. Ordinary job

schedule:
  cron: *****

Hooks

Hooks allows to do a certain action based on the asset execution outcomes.

hooks:
  - hook: name of the hook
Parameter Type Required Default Value Description
hooks String Yes None Name of the registered hook

Assets

All assets defined in the assets block will belong to the job they are defined.

assets:
  - asset: name_1
  - template: path_1

Asset

Assets as a Job can be defined in a two ways. Via asset or template. The syntax for the template a bit of different compared to the Jobs template.

assets:
  - asset:

Asset template has the same variables as a Job template.

Parameter Type Required Default Value Description
asset String Yes None Name of the asset
ins String No None Name of the asset, which return output the asset should consume
deps List[String] No None List of the assets on which the current asset depends
group String No None Name of the group where asset should belong
module String Yes None Name of the registered module used for the asset logic
params Dict[String] No None parameters that are required by the used module
checks List[Dict] No None Checks that should be used after asset execution. Works for non partitioned assets only.

Asset Checks

In some cases it is possible to use asset checks to validate data quality.

Parameter Type Required Default Value Description
check String Yes None Name of the asset check
params String No None Required parameters for the asset check

When to create a standalone module?

The answer it depends. If its becoming harder to use default modules for example it is hard to achieve desired state via original modules and several modules are involved in order to make a very simple operation, than it is a good sign that own module is required.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_factory_pipelines-0.1.15.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_factory_pipelines-0.1.15-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file dagster_factory_pipelines-0.1.15.tar.gz.

File metadata

File hashes

Hashes for dagster_factory_pipelines-0.1.15.tar.gz
Algorithm Hash digest
SHA256 0226bd25af082a1830795e35439b5e1bfd8d8078fa98d63447033d76d6a3673e
MD5 5d6a8edffe3b285fce203284054d0825
BLAKE2b-256 b3faee64579f4355808aaddb0a61bb4d9eb749c2a815d74995a79e7de1ee19ba

See more details on using hashes here.

File details

Details for the file dagster_factory_pipelines-0.1.15-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_factory_pipelines-0.1.15-py3-none-any.whl
Algorithm Hash digest
SHA256 a8b77a39398616b3d8527e0450508cef718419f91ed51b789eff394380ceb539
MD5 9f06be046a7dee0fcb3e26cbcd793240
BLAKE2b-256 480fba5d274cc3cb0fbf4e1de3e354c752a6c53f1ea3a1f42213de5c14285b15

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page