Skip to main content

OpenLineage integration with Airflow

Project description

OpenLineage Airflow Integration

A library that integrates Airflow DAGs with OpenLineage for automatic metadata collection.

Features

Metadata

  • Task lifecycle
  • Task parameters
  • Task runs linked to versioned code
  • Task inputs / outputs

Lineage

  • Track inter-DAG dependencies

Built-in

  • SQL parser
  • Link to code builder (ex: GitHub)
  • Metadata extractors

Requirements

Installation

$ pip3 install openlineage-airflow

Note: You can also add openlineage-airflow to your requirements.txt for Airflow.

To install from source, run:

$ python3 setup.py install

Usage

Airflow 1.10+

To begin collecting Airflow DAG metadata with OpenLineage, use:

- from airflow import DAG
+ from openlineage.airflow import DAG

When enabled, the library will:

  1. On DAG start, collect metadata for each task using an Extractor if it exists for given operator.
  2. Collect task input / output metadata (source, schema, etc)
  3. Collect task run-level metadata (execution time, state, parameters, etc)
  4. On DAG complete, also mark the task as complete in OpenLineage

Airflow 2.1+ (experimental)

Set your LineageBackend in your airflow.cfg or via environmental variable AIRFLOW__LINEAGE__BACKEND to

openlineage.lineage_backend.OpenLineageBackend

In contrast to integration via subclassing DAG, LineageBackend based approach collects all metadata for task on each task completion.

OpenLineageBackend does not take into account manually configured inlets and outlets.

Support for Airflow 2.1+ is currently experimental, and has some caveats: it does not support tracking failed jobs, and job starts are registered only when job ends.

Configuration

HTTP Backend Environment Variables

openlineage-airflow uses OpenLineage client to push data to OpenLineage backend.

OpenLineage client depends on environment variables:

  • OPENLINEAGE_URL - point to service which will consume OpenLineage events
  • OPENLINEAGE_API_KEY - set if consumer of OpenLineage events requires Bearer authentication key
  • OPENLINEAGE_NAMESPACE - set if you are using something other than the default namespace for job namespace.

For backwards compatibility, openlineage-airflow also support configuration via MARQUEZ_URL, MARQUEZ_NAMESPACE and MARQUEZ_API_KEY variables.

MARQUEZ_URL=http://my_hosted_marquez.example.com:5000
MARQUEZ_NAMESPACE=my_special_ns

Extractors : Sending the correct data from your DAGs

If you do nothing, OpenLineage backend will receive the Job and the Run from your DAGs, but, unless you use few operators for which this integration provides extractor, input and output metadata will not be send.

openlineage-airflow allows you to do more than that by building "Extractors". Extractor is object suited to extract metadata from particular operator (or operators).

  1. Name : The name of the task
  2. Inputs : List of input datasets
  3. Outputs : List of output datasets
  4. Context : The Airflow context for the task

Bundled Extractors

openlineage-airflow provides extractors for

  • PostgresOperator
  • BigQueryOperator
  • SnowflakeOperator
  • GreatExpectationsOperator

Custom Extractors

If your DAGs contain additional operators from which you want to extract lineage data, fear not - you can always provide custom extractors. They should derive from BaseExtractor.

There are two ways to register them for use in openlineage-airflow.

First one, is to provide environment variable in pattern of

OPENLINEAGE_EXTRACTOR_<operator>=full.path.to.ExtractorClass

For example:

OPENLINEAGE_EXTRACTOR_PostgresOperator=openlineage.airflow.extractors.postgres_extractor.PostgresExtractor

Second one - working in Airflow 1.10.x only - is to register all additional operator-extractor pairings by providing lineage_custom_extractors argument in openlineage.airflow.DAG.

Great Expectations

Great Expectations integration works by providing OpenLineageValidationAction. You need to include it into your action_list in great_expectations.yml.

The following example illustrates example change in default configuration:

validation_operators:
  action_list_operator:
    # To learn how to configure sending Slack notifications during evaluation
    # (and other customizations), read: https://docs.greatexpectations.io/en/latest/autoapi/great_expectations/validation_operators/index.html#great_expectations.validation_operators.ActionListValidationOperator
    class_name: ActionListValidationOperator
    action_list:
      - name: store_validation_result
        action:
          class_name: StoreValidationResultAction
      - name: store_evaluation_params
        action:
          class_name: StoreEvaluationParametersAction
      - name: update_data_docs
        action:
          class_name: UpdateDataDocsAction
+     - name: openlineage
+       action:
+         class_name: OpenLineageValidationAction
+         module_name: openlineage.common.provider.great_expectations.action
      # - name: send_slack_notification_on_validation_result
      #   action:
      #     class_name: SlackNotificationAction
      #     # put the actual webhook URL in the uncommitted/config_variables.yml file
      #     slack_webhook: ${validation_notification_slack_webhook}
      #     notify_on: all # possible values: "all", "failure", "success"
      #     renderer:
      #       module_name: great_expectations.render.renderer.slack_renderer
      #       class_name: SlackRenderer

If you're using GreatExpectationsOperator, you need to set validation_operator_name to operator that includes OpenLineageValidationAction. Setting it in great_expectations.yml files isn't enough - the operator overrides it with default name if it's not provided.

To see example of working configuration, you can see DAG and Great Expectations configuration in integration tests.

Triggering Child Jobs

Commonly, Airflow DAGs will trigger processes on remote systems, such as an Apache Spark or Apache Beam job. Those systems may have their own OpenLineage integration and report their own job runs and dataset inputs/outputs. To propagate the job hierarchy, tasks must send their own run id so that the downstream process can report the ParentRunFacet with the proper run id.

The lineage_run_id macro exists to inject the run id of a given task into the arguments sent to a remote processing job's Airflow operator. The macro requires the DAG run_id and the task to access the generated run id for that task. For example, a Spark job can be triggered using the DataProcPySparkOperator with the correct parent run id using the following configuration:

t1 = DataProcPySparkOperator(
    task_id=job_name,
    #required pyspark configuration,
    job_name=job_name,
    dataproc_pyspark_properties={
        'spark.driver.extraJavaOptions':
            f"-javaagent:{jar}={os.environ.get('OPENLINEAGE_URL')}/api/v1/namespaces/{os.getenv('OPENLINEAGE_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{lineage_run_id(run_id, task)}}}}?api_key={os.environ.get('OPENLINEAGE_API_KEY')}"
        dag=dag)

Development

To install all dependencies for local development:

# Bash
$ pip3 install -e .[dev]
# escape the brackets in zsh
$ pip3 install -e .\[dev\]

To run the entire test suite, you'll first want to initialize the Airflow database:

$ airflow initdb

Then, run the test suite with:

$ pytest

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openlineage-airflow-0.4.0.tar.gz (17.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openlineage_airflow-0.4.0-py3-none-any.whl (24.6 kB view details)

Uploaded Python 3

File details

Details for the file openlineage-airflow-0.4.0.tar.gz.

File metadata

  • Download URL: openlineage-airflow-0.4.0.tar.gz
  • Upload date:
  • Size: 17.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.6.15

File hashes

Hashes for openlineage-airflow-0.4.0.tar.gz
Algorithm Hash digest
SHA256 8bbf2931a31eac1e75aa4ba33d52bf4de0ee30badd604b159f2c7beb41570928
MD5 8bff63efd39052657237d3b3601d0a87
BLAKE2b-256 2fecc4f8e83a440a3898e9bfcc5c73dabde5e4a525ef9d6daae74105d86d79e2

See more details on using hashes here.

File details

Details for the file openlineage_airflow-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: openlineage_airflow-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 24.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.6.15

File hashes

Hashes for openlineage_airflow-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1e82d019a38dc56d3a3b542c348bd197a9034afe87babc1553e46fcca2c78655
MD5 258506796656dcc5ecaab4b282a62522
BLAKE2b-256 cd8b6db503e5bca32e577cd959c8347caa8c9955c1615eb057ece52c4d75bf82

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page