OpenLineage integration with Airflow
Project description
OpenLineage Airflow Integration
A library that integrates Airflow DAGs with OpenLineage for automatic metadata collection.
Features
Metadata
- Task lifecycle
- Task parameters
- Task runs linked to versioned code
- Task inputs / outputs
Lineage
- Track inter-DAG dependencies
Built-in
- SQL parser
- Link to code builder (ex: GitHub)
- Metadata extractors
Requirements
- Python 3.7
- Airflow 1.10.12+
- (experimental) Airflow 2.1+
Installation
$ pip3 install openlineage-airflow
Note: You can also add
openlineage-airflowto yourrequirements.txtfor Airflow.
To install from source, run:
$ python3 setup.py install
Setup
Airflow 2.3+
Integration automatically registers itself for Airflow 2.3 if it's installed on Airflow worker's python. This means you don't have to do anything besides configuring it, which is described in Configuration section.
Airflow 2.1 - 2.2
This method has limited support: it does not support tracking failed jobs, and job starts are registered only when job ends.
Set your LineageBackend in your airflow.cfg or via environmental variable AIRFLOW__LINEAGE__BACKEND
to
openlineage.lineage_backend.OpenLineageBackend
In contrast to integration via subclassing DAG, LineageBackend based approach collects all metadata
for task on each task completion.
OpenLineageBackend does not take into account manually configured inlets and outlets.
Airflow 1.10+
To begin collecting Airflow DAG metadata with OpenLineage, use:
- from airflow import DAG
+ from openlineage.airflow import DAG
When enabled, the library will:
- On DAG start, collect metadata for each task using an
Extractorif it exists for given operator. - Collect task input / output metadata (
source,schema, etc) - Collect task run-level metadata (execution time, state, parameters, etc)
- On DAG complete, also mark the task as complete in OpenLineage
Configuration
HTTP Backend Environment Variables
openlineage-airflow uses OpenLineage client to push data to OpenLineage backend.
OpenLineage client depends on environment variables:
OPENLINEAGE_URL- point to service which will consume OpenLineage eventsOPENLINEAGE_API_KEY- set if consumer of OpenLineage events requiresBearerauthentication keyOPENLINEAGE_NAMESPACE- set if you are using something other than thedefaultnamespace for job namespace.OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE- set toFalseif you want source code of callables provided in PythonOperator to be send in OpenLineage events
For backwards compatibility, openlineage-airflow also support configuration via
MARQUEZ_URL, MARQUEZ_NAMESPACE and MARQUEZ_API_KEY variables.
MARQUEZ_URL=http://my_hosted_marquez.example.com:5000
MARQUEZ_NAMESPACE=my_special_ns
Extractors : Sending the correct data from your DAGs
If you do nothing, OpenLineage backend will receive the Job and the Run from your DAGs, but,
unless you use few operators for which this integration provides extractor, input and output metadata will not be send.
openlineage-airflow allows you to do more than that by building "Extractors". Extractor is object
suited to extract metadata from particular operator (or operators).
- Name : The name of the task
- Inputs : List of input datasets
- Outputs : List of output datasets
- Context : The Airflow context for the task
Bundled Extractors
openlineage-airflow provides extractors for
PostgresOperatorMySqlOperatorBigQueryOperatorSnowflakeOperatorGreatExpectationsOperatorPythonOperator
SQL Operators utilize SQL parser. There is experimental SQL parser, activated if you install openlineage-sql on your Airflow worker.
Custom Extractors
If your DAGs contain additional operators from which you want to extract lineage data, fear not - you can always
provide custom extractors. They should derive from BaseExtractor.
There are two ways to register them for use in openlineage-airflow.
First one, is to add them to OPENLINEAGE_EXTRACTORS environment variable, separated by comma (;)
OPENLINEAGE_EXTRACTORS=full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
Second one - working in Airflow 1.10.x only - is to register all additional operator-extractor pairings by
providing lineage_custom_extractors argument in openlineage.airflow.DAG.
Great Expectations
Great Expectations integration works by providing OpenLineageValidationAction. You need to include it into your action_list in great_expectations.yml.
The following example illustrates example change in default configuration:
validation_operators:
action_list_operator:
# To learn how to configure sending Slack notifications during evaluation
# (and other customizations), read: https://docs.greatexpectations.io/en/latest/autoapi/great_expectations/validation_operators/index.html#great_expectations.validation_operators.ActionListValidationOperator
class_name: ActionListValidationOperator
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: store_evaluation_params
action:
class_name: StoreEvaluationParametersAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
+ - name: openlineage
+ action:
+ class_name: OpenLineageValidationAction
+ module_name: openlineage.common.provider.great_expectations.action
# - name: send_slack_notification_on_validation_result
# action:
# class_name: SlackNotificationAction
# # put the actual webhook URL in the uncommitted/config_variables.yml file
# slack_webhook: ${validation_notification_slack_webhook}
# notify_on: all # possible values: "all", "failure", "success"
# renderer:
# module_name: great_expectations.render.renderer.slack_renderer
# class_name: SlackRenderer
If you're using GreatExpectationsOperator, you need to set validation_operator_name to operator that includes OpenLineageValidationAction.
Setting it in great_expectations.yml files isn't enough - the operator overrides it with default name if it's not provided.
To see example of working configuration, you can see DAG and Great Expectations configuration in integration tests.
Triggering Child Jobs
Commonly, Airflow DAGs will trigger processes on remote systems, such as an Apache Spark or Apache Beam job. Those systems may have their own OpenLineage integration and report their own job runs and dataset inputs/outputs. To propagate the job hierarchy, tasks must send their own run id so that the downstream process can report the ParentRunFacet with the proper run id.
The lineage_run_id and lineage_parent_id macros exists to inject the run id or whole parent run information
of a given task into the arguments sent to a remote processing job's Airflow operator. The macro requires the
DAG run_id and the task to access the generated run id for that task. For example, a Spark job can be triggered
using the DataProcPySparkOperator with the correct parent run id using the following configuration:
Airflow 1.10:
t1 = DataProcPySparkOperator(
task_id=job_name,
#required pyspark configuration,
job_name=job_name,
dataproc_pyspark_properties={
'spark.driver.extraJavaOptions':
f"-javaagent:{jar}={os.environ.get('OPENLINEAGE_URL')}/api/v1/namespaces/{os.getenv('OPENLINEAGE_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{lineage_run_id(run_id, task)}}}}?api_key={os.environ.get('OPENLINEAGE_API_KEY')}"
dag=dag)
Airflow 2.0+:
t1 = DataProcPySparkOperator(
task_id=job_name,
#required pyspark configuration,
job_name=job_name,
dataproc_pyspark_properties={
'spark.driver.extraJavaOptions':
f"-javaagent:{jar}={os.environ.get('OPENLINEAGE_URL')}/api/v1/namespaces/{os.getenv('OPENLINEAGE_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{macros.OpenLineagePlugin.lineage_run_id(run_id, task)}}}}?api_key={os.environ.get('OPENLINEAGE_API_KEY')}"
dag=dag)
Secrets redaction
Integration uses Airflow SecretsMasker to hide secrets from produced metadata events. As not all fields in the metadata should be redacted RedactMixin is used to pass information which fields should be skipped from the process.
Typically you should subclass RedactMixin and use attribute _skip_redact as a list of names of fields to be skipped.
However, all facets inheriting from BaseFacet should use _additional_skip_redact attribute as addition to common list of ['_producer', '_schemaURL'].
Development
To install all dependencies for local development:
Airflow integration depends on openlineage.sql, openlineage.common & openlineage.client.python. You should install them first independently or try to install with following command:
Airflow 1.10:
$ pip install -r dev-requirements-1.x.txt
Airflow 2.0+:
$ pip install -r dev-requirements-2.x.txt
There is also bash script that can run arbitrary Airflow image with OpenLineage integration build from current branch. Run it as
$ AIRFLOW_IMAGE=<airflow_image_with_tag> ./scripts/run-dev-airflow.sh [--rebuild]
Rebuild option forces docker images to be rebuilt.
Unit tests
To run the entire unit test suite use below command:
$ tox
or choose one of the environments, e.g.:
$ tox -e py-airflow214
You can also skip using tox and run pytest on your own dev environment.
Integration tests
Integration tests require usage of docker compose. There are scripts prepared to make build images and run tests easier.
Airflow 1.10:
$ ./tests/integration/docker/up.sh
Airflow 2.0+:
$ AIRFLOW_IMAGE=<name-of-airflow-image> ./tests/integration/docker/up-2.sh
e.g.
$ AIRFLOW_IMAGE=apache/airflow:2.3.1-python3.7 ./tests/integration/docker/up-2.sh
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file openlineage-airflow-0.13.0.tar.gz.
File metadata
- Download URL: openlineage-airflow-0.13.0.tar.gz
- Upload date:
- Size: 33.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.7.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
48653e08f5cdff50409fa7414a19e28246c490af14b472b1772cf941ea4d4ecf
|
|
| MD5 |
713a48ff513de9d67d44443e2492478f
|
|
| BLAKE2b-256 |
4be4d81c3a0e957ad50e211d3531d374ceddeb85ab9eef8c7c0957d46b262f3a
|
File details
Details for the file openlineage_airflow-0.13.0-py3-none-any.whl.
File metadata
- Download URL: openlineage_airflow-0.13.0-py3-none-any.whl
- Upload date:
- Size: 41.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.7.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
13fc9f354e8e08717c9d0da811c0e0ab6a8bd48f56fdb658571085acc5e2b08b
|
|
| MD5 |
26e496f696767cbf9e4fbb2050f9906b
|
|
| BLAKE2b-256 |
26ec90367ee5c9bd90600e69b50175071d75c1354ae0d76b8abbdd3b63d43182
|