Skip to main content

OpenLineage integration with Dagster

Project description

Note
New integration maintainers are needed! Please open an issue to get started.

OpenLineage Dagster Integration

A library that integrates Dagster with OpenLineage for automatic metadata collection. It provides an OpenLineage sensor, a Dagster sensor that tails Dagster event logs for tracking metadata. On each sensor evaluation, the function processes a batch of event logs, converts Dagster events into OpenLineage events, and emits them to an OpenLineage backend.

Features

Metadata

  • Dagster job & op lifecycle

Requirements

Installation

$ python -m pip install openlineage-dagster

Usage

OpenLineage Sensor & Event Log Storage Requirements

Single OpenLineage sensor per Dagster instance
As the OpenLineage sensor processes all event logs for a given Dagster instance, define and enable only one sensor per instance. Running multiple sensors will result in duplicate OpenLineage job runs being emitted for Dagster steps with different OpenLineage run IDs. These are dynamically generated during sensor runs.

Non-sharded Event Log Storage
For the sensor to handle all event logs across runs, use non-sharded event log storage. If an event log storage sharded by run (i.e., the default SqliteEventLogStorage) is used, the cursor that tracks the last processed event storage ID may not update properly.

OpenLineage Sensor Setup

Get a OpenLineage sensor definition from the openlineage_sensor() factory function and add it to your Dagster repository.

from dagster import repository
from openlineage.dagster.sensor import openlineage_sensor


@repository
def my_repository():
    openlineage_sensor_def = openlineage_sensor()
    return other_defs + [openlineage_sensor_def]

Given that parallel sensor runs are not supported at the time of writing, some tuning may be necessary to avoid affecting other sensors' performance.

See Dagster's documentation on Evaluation Interval for more detail on minimum_interval_seconds, which defaults to 30 seconds. record_filter_limit is the maximum number of event logs to process on each sensor evaluation, and it defaults to 30 records per evaluation. Default values can be overridden:

@repository
def my_repository():
    openlineage_sensor_def = openlineage_sensor(
        minimum_interval_seconds=60,
        record_filter_limit=60,
    )
    return other_defs + [openlineage_sensor_def]

The OpenLineage sensor handles event logs in ascending order of storage ID and starts with the first log by default. Optionally, after_storage_id can be specified to customize the starting point. This is only applicable when the cursor is undefined or has been deleted.

@repository
def my_repository():
    openlineage_sensor_def = openlineage_sensor(
        after_storage_id=100
    )
    return other_defs + [openlineage_sensor_def]

OpenLineage Adapter & Client Configuration

The sensor uses an OpenLineage adapter and client to convert and push data to an OpenLineage backend. These depend on environment variables.

If using User Repository Deployments, add the below variables to the repository where the sensor is defined. Otherwise, add the variables to the Dagster Daemon.

  • OPENLINEAGE_URL - point to the service which will consume OpenLineage events.
  • OPENLINEAGE_API_KEY - set if the consumer of OpenLineage events requires a Bearer authentication key.
  • OPENLINEAGE_NAMESPACE - set if you are using something other than the default as the default namespace when a Dagster repository is undefined.

OpenLineage Namespace & Dagster Repository

For Dagster jobs organized in repositories, Dagster keeps track of the repository name for each pipeline run. When the repository name is present, it is always used as the OpenLineage namespace name. OPENLINEAGE_NAMESPACE option is a way to fall back and provide some other static default value.

Development

To install all dependencies for local development:

$ python -m pip install -e .[dev]  # or python -m pip install -e .\[dev\] in zsh 

To run the test suite:

$ pytest

SPDX-License-Identifier: Apache-2.0
Copyright 2018-2023 contributors to the OpenLineage project

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openlineage-dagster-1.11.3.tar.gz (12.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openlineage_dagster-1.11.3-py3-none-any.whl (9.0 kB view details)

Uploaded Python 3

File details

Details for the file openlineage-dagster-1.11.3.tar.gz.

File metadata

  • Download URL: openlineage-dagster-1.11.3.tar.gz
  • Upload date:
  • Size: 12.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.8.19

File hashes

Hashes for openlineage-dagster-1.11.3.tar.gz
Algorithm Hash digest
SHA256 b2a181e8e440faa0430d06aca7603dcbde2862a2ba9511aeaea1ef1c6fa4b91f
MD5 bf365ed6ca48211ba69b36b9897596b2
BLAKE2b-256 9ad789ad4ed125cb6a41e7e1fd6824271e2dc0a9ece07e028cb1d9ff64bedcda

See more details on using hashes here.

File details

Details for the file openlineage_dagster-1.11.3-py3-none-any.whl.

File metadata

File hashes

Hashes for openlineage_dagster-1.11.3-py3-none-any.whl
Algorithm Hash digest
SHA256 0bc1c29c546c50a8cb95943a94084c588a8307b6053e368961bf9124e8dce574
MD5 6238857d55e0755ad5269cd3887a7a3c
BLAKE2b-256 ade5d62508bafb45f0a6ca77aa8847dfad71177eb7d1f6f22dec20e21c1dbda2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page