OpenLineage integration with Dagster
Project description
OpenLineage Dagster Integration
A library that integrates Dagster with OpenLineage for automatic metadata collection. It provides an OpenLineage sensor, a Dagster sensor that tails Dagster event logs for tracking metadata. On each sensor evaluation, the function processes a batch of event logs, converts Dagster events into OpenLineage events, and emits them to an OpenLineage backend.
Features
Metadata
- Dagster job & op lifecycle
Requirements
Installation
$ python -m pip install openlineage-dagster
Usage
OpenLineage Sensor & Event Log Storage Requirements
Single OpenLineage sensor per Dagster instance
As it processes all event logs for a given Dagster instance, define and enable only a single OpenLineage sensor per instance.
Running multiple will result in emitting duplicate OpenLineage job runs for Dagster steps with different OpenLineage run ids that are dynamically generated during sensor runs.
Non-sharded Event Log Storage
For the sensor to handle all event logs across runs, use non-sharded event log storage.
If an event log storage sharded by run (i.e. default SqliteEventLogStorage
) is used, cursor that tracks the last processed event storage id may not update properly.
OpenLineage Sensor Setup
Get OpenLineage sensor definition from openlineage_sensor()
factory function and add it to your Dagster repository.
from dagster import repository
from openlineage.dagster.sensor import openlineage_sensor
@repository
def my_repository():
openlineage_sensor_def = openlineage_sensor()
return other_defs + [openlineage_sensor_def]
With parallel sensor run not supported at the time of writing, some tuning may be necessary to avoid affecting other sensors' performance.
See Dagster's documentation on Evaluation Interval
for more detail on minimum_interval_seconds
, which defaults to 30 seconds.
record_filter_limit
is the maximum number of event logs to process on each sensor evaluation, and it defaults to 30 records per evaluation.
Default values can be overridden as below.
@repository
def my_repository():
openlineage_sensor_def = openlineage_sensor(
minimum_interval_seconds=60,
record_filter_limit=60,
)
return other_defs + [openlineage_sensor_def]
OpenLineage sensor handles event logs in ascending order of storage id, and by default, starts with the first log.
Optionally, after_storage_id
can be specified to customize the starting point.
This is only applicable when cursor is undefined or has been deleted.
@repository
def my_repository():
openlineage_sensor_def = openlineage_sensor(
after_storage_id=100
)
return other_defs + [openlineage_sensor_def]
OpenLineage Adapter & Client Configuration
The sensor uses OpenLineage adapter and client to convert and push data to an OpenLineage backend, and they depend on the following environment variables.
If using User Repository Deployments, add the variables to the repository where the sensor is defined. Otherwise, add the variables to Dagster Daemon.
OPENLINEAGE_URL
- point to service which will consume OpenLineage eventsOPENLINEAGE_API_KEY
- set if consumer of OpenLineage events requiresBearer
authentication keyOPENLINEAGE_NAMESPACE
- set if you are using something other than thedefault
as the default namespace when Dagster repository is undefined
OpenLineage Namespace & Dagster Repository
For Dagster jobs organized in repositories, Dagster keeps track of the repository name for each pipeline run.
When the repository name is present, it is always used as the OpenLineage namespace name.
OPENLINEAGE_NAMESPACE
option is a way to fall back and provide some other static default value.
Development
To install all dependencies for local development:
$ python -m pip install -e .[dev] # or python -m pip install -e .\[dev\] in zsh
To run test suite:
$ pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for openlineage-dagster-0.7.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | ca27079dc691232e43d1f8f1b77130aded748c7da2f627c4e353687705531fdb |
|
MD5 | 45ecfd4f1197270b203757e25d29b9c9 |
|
BLAKE2b-256 | 3f89a7243f68ba1b6e0736151577fff3386a0d4d52b61d8bd9cccb4131b6c6f1 |
Hashes for openlineage_dagster-0.7.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b7fb7a4de2598fcd3897aea24122739d84061233c14ac0df6b1d41ee7361da96 |
|
MD5 | 3faf7a45ac3bcde5426161fa1acc035e |
|
BLAKE2b-256 | 2b18cf83adb31e276425f6b1db9e14fa2f9da92b72b69024647d80a490b34c6b |