Dagster integration for OpenLineage
Project description
dagster-openlineage
Asset-centric OpenLineage emission for Dagster. Emits schema, column-lineage, data-quality-assertion, and partition-nominal-time facets alongside existing run/step events. Two emission mechanisms are available — pick exactly one per deployment.
Features
- Asset-centric emission — materializations, observations, check evaluations, and synthesized failures
- Schema + column lineage facets from
dagster/column_schemaanddagster/column_lineagemetadata - Data quality assertions placed on
InputDataset(spec-conformant) - Partition → nominal time heuristic (ISO date or date-hour)
- Multi-tenant namespaces via string templates (
{namespace},{tag:KEY}) - Bounded emit — synchronous, 2s default timeout, retries disabled, failures swallowed
- Pipeline / step events preserved (v0.1 surface unchanged)
Requirements
- Python 3.10+
- Dagster
>=1.11.6
Installation
pip install dagster-openlineage
Compatibility matrix
| Environment | Mechanism A (storage wrapper) | Mechanism B (sensor) |
|---|---|---|
| OSS Dagster (self-hosted) | ✅ | ✅ |
OSS Dagster, dagster dev |
✅ | ✅ |
| Dagster+ Hybrid | ❌ (operator doesn't control instance.yaml) |
✅ |
| Dagster+ Serverless | ❌ | ✅ |
| Dagster+ Branch Deployments | ❌ | ✅ |
Which mechanism do I want?
- You control
instance.yaml→ Mechanism A (push). Every event hits OpenLineage as it is persisted. No daemon dependency. Process-local failure synthesis (see Known limitations). - You run on Dagster+ Hybrid / Serverless / Branch Deployments → Mechanism B (polled). The sensor tails the event log and converts asset events into OpenLineage emissions.
- Both at once → don't. See Pick exactly one mechanism below.
Mechanism A — storage wrapper (push)
Configure OpenLineageEventLogStorage as your event log storage. It composes any inner EventLogStorage class and intercepts store_event / store_event_batch to emit OpenLineage events.
# instance.yaml
event_log_storage:
module: dagster_openlineage
class: OpenLineageEventLogStorage
config:
wrapped:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_url:
env: DAGSTER_PG_URL
namespace: my-company
# Optional:
# namespace_template: "{namespace}/{tag:tenant}"
# timeout: 2.0
# strict_assertion_mapping: false
Set OPENLINEAGE_URL (and optionally OPENLINEAGE_API_KEY) in the environment of any process that writes Dagster events — typically the run worker and the daemon.
Mechanism B — sensor (polled)
Add openlineage_sensor(include_asset_events=True) to your Definitions. v0.2 keeps include_asset_events=False as the default (v0.1 parity); v0.3 will flip it.
from dagster import Definitions
from dagster_openlineage import openlineage_sensor
defs = Definitions(
assets=[...],
sensors=[openlineage_sensor(include_asset_events=True)],
)
Environment variables go on the process that runs the Dagster daemon:
OPENLINEAGE_URL(required)OPENLINEAGE_API_KEY(optional)OPENLINEAGE_NAMESPACE(optional, defaultdagster)
Namespace templates
Provide namespace_template (wrapper) or let the adapter/sensor derive the default. v0.2 supports two token families:
{namespace}— the configured default namespace{tag:KEY}— the run tag namedKEY, empty if unset
Adjacent slashes collapse; trailing slashes strip. Unknown tokens raise NamespaceTemplateError at construction. {code_location} and {repository} are deferred to v0.3 — they do not reliably reach store_event time.
Example:
# Template
"{namespace}/{tag:tenant}"
# Run tags -> resolved namespace
{"tenant": "acme"} -> "dagster/acme"
{} -> "dagster" # tag unresolved, trailing slash stripped
Emit path
The adapter emits synchronously and swallows Exception; BaseException (shutdown signals, SystemExit, KeyboardInterrupt, MemoryError) propagates. The default transport disables retries to keep the per-event wall clock bounded by timeout (default 2s). Configure retry/async behavior via OpenLineage's own openlineage.yml or OPENLINEAGE_CONFIG if you need it.
Pick exactly one mechanism
Configuring Mechanism A and Mechanism B simultaneously produces duplicate OpenLineage events. OpenLineage has no client-side idempotency primitive and backend dedup is not spec-defined (Marquez dedupes on (runId, eventType, eventTime); DataHub and OpenMetadata do not). This is a deployment-time contract — the library does not enforce it at runtime. Mechanism B logs a WARN at sensor construction when opted in, reminding operators of this.
Migration from v0.1
- Pin
dagster>=1.11.6. - Namespace default is flat (
dagster). v0.1 usedrepository_nameas the namespace when reachable — existing OL-backend lineage keyed under that old namespace may need a one-time rename. SetOPENLINEAGE_NAMESPACEor configurenamespace_templateto match your prior layout. - If you relied on the
openlineage_sensorin v0.1, it still works unchanged — asset events are now supported via the newinclude_asset_events=Trueflag (default remainsFalse). OpenLineageEventListeneris removed; it was a dead stub with no call sites.
Sensor Configuration
| Option | Default | Description |
|---|---|---|
minimum_interval_seconds |
300 | Minimum seconds between sensor evaluations |
record_filter_limit |
30 | Max number of event logs to process per evaluation |
after_storage_id |
0 | Starting storage ID for event processing |
include_asset_events |
False (v0.2) |
Opt in to asset-level emission |
Not in v0.2
- Dagster+ Insights bridge / Catalog UI mirroring
- IO manager enrichment (
LOADED_INPUT/HANDLED_OUTPUT) - Spark / engine-specific facets
- SQL parsing or column-graph inference (reads
dagster/column_lineagemetadata if present) - Python-callable
naming_fn(string templating only; callable form deferred to v0.3) {code_location}and{repository}namespace tokens (deferred to v0.3)- Shipped JSON schema file for the custom
dagster_asset_checkrun facet (deferred to v0.2.1) - Wrapper-side synthesis reconciliation across process restarts (deferred to v0.2.1; use Mechanism B if you need crash-tolerant failure reporting)
Version Compatibility
This library supports Dagster >=1.11.6. End users get the constraint from pyproject.toml; CI and reproducible builds use the committed uv.lock.
Development
uv sync
make test
make ruff
make check
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_openlineage-0.2.0.tar.gz.
File metadata
- Download URL: dagster_openlineage-0.2.0.tar.gz
- Upload date:
- Size: 26.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.15 {"installer":{"name":"uv","version":"0.11.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
68c8f006df9195ac1fdf3d9bf006fad45a80515217ceb0db274f22c15ac42098
|
|
| MD5 |
87f13aed8323b0b6d6845cb96bb73652
|
|
| BLAKE2b-256 |
97de49cf2c69c0953e76f34e4874dcb47d92368ae13d4b6e349e71b5769f0ec0
|
File details
Details for the file dagster_openlineage-0.2.0-py3-none-any.whl.
File metadata
- Download URL: dagster_openlineage-0.2.0-py3-none-any.whl
- Upload date:
- Size: 28.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.15 {"installer":{"name":"uv","version":"0.11.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e9307cee97e386025c248bfe0a8308dd265975d085296a8cfced1aee0c0c59c5
|
|
| MD5 |
2d5fd1ba06972b57c819cd0c1a39cc95
|
|
| BLAKE2b-256 |
e8bc1fca3e0a80490b6f7ef37097a18782799f4a0410c6687bd1280ff1138433
|