Marquez integration with Airflow
Project description
marquez-airflow
A library that integrates Airflow DAGs
with Marquez for automatic metadata collection.
Features
Metadata
- Task lifecycle
- Task parameters
- Task runs linked to versioned code
- Task inputs / outputs
Lineage
- Track inter-DAG dependencies
Built-in
- SQL parser
- Link to code builder (ex: GitHub)
- Metadata extractors
Requirements
Installation
$ pip3 install marquez-airflow
Note: You can also add
marquez-airflow
to yourrequirements.txt
for Airflow.
To install from source, run:
$ python3 setup.py install
Configuration
The library depends on a backend. A Backend
is configurable and lets the library know where to write dataset, job, and run metadata.
Backends
HTTP
: Write metadata to MarquezFILE
: Write metadata to a file (asjson
) under/tmp/marquez
LOG
: Simply just logs the metadata to the console
By default, the HTTP
backend will be used (see next sections on configuration). To override the default backend and write metadata to a file, use MARQUEZ_BACKEND
:
MARQUEZ_BACKEND=FILE
Note: Metadata will be written to
/tmp/marquez/client.requests.log
, but the location can be overridden withMARQUEZ_FILE
.
HTTP
Backend Authentication
The HTTP
backend supports using API keys to authenticate requests via Bearer
auth. To include a key when making an API request, use MARQUEZ_API_KEY
:
MARQUEZ_BACKEND=HTTP
MARQUEZ_API_KEY=[YOUR_API_KEY]
HTTP
Backend Environment Variables
marquez-airflow
needs to know where to talk to the Marquez server API. You can set these using environment variables to be read by your Airflow service.
You will also need to set the namespace if you are using something other than the default
namespace.
MARQUEZ_BACKEND=HTTP
MARQUEZ_URL=http://my_hosted_marquez.example.com:5000
MARQUEZ_NAMESPACE=my_special_ns
Extractors : Sending the correct data from your DAGs
If you do nothing, Marquez will receive the Job
and the Run
from your DAGs, but sources and datasets will not be sent.
marquez-airflow
allows you to do more than that by building "Extractors". Extractors are in the process of changing right now, but they basically take a task and extract:
- Name : The name of the task
- Location : Location of the code for the task
- Inputs : List of input datasets
- Outputs : List of output datasets
- Context : The Airflow context for the task
It's important to understand the inputs and outputs are lists and relate directly to the Dataset
object in Marquez. Datasets also include a source which relates directly to the Source
object in Marquez.
Usage
To begin collecting Airflow DAG metadata with Marquez, use:
- from airflow import DAG
+ from marquez_airflow import DAG
When enabled, the library will:
- On DAG start, collect metadata for each task using an
Extractor
(the library defines a default extractor to use otherwise) - Collect task input / output metadata (
source
,schema
, etc) - Collect task run-level metadata (execution time, state, parameters, etc)
- On DAG complete, also mark the task as complete in Marquez
To enable logging, set the environment variable MARQUEZ_LOG_LEVEL
to DEBUG
, INFO
, or ERROR
:
$ export MARQUEZ_LOG_LEVEL=INFO
Triggering Child Jobs
Commonly, Airflow DAGs will trigger processes on remote systems, such as an Apache Spark or Apache Beam job. Those systems may have their own OpenLineage integration and report their own job runs and dataset inputs/outputs. To propagate the job hierarchy, tasks must send their own run id so that the downstream process can report the ParentRunFacet with the proper run id.
The lineage_run_id
macro exists to inject the run id of a given task into the arguments sent to a
remote processing job's Airflow operator. The macro requires the DAG run_id and the task to access
the generated run id for that task. For example, a Spark job can be triggered using the
DataProcPySparkOperator
with the correct parent run id using the following configuration:
t1 = DataProcPySparkOperator(
task_id=job_name,
#required pyspark configuration,
job_name=job_name,
dataproc_pyspark_properties={
'spark.driver.extraJavaOptions':
f"-javaagent:{jar}={os.environ.get('MARQUEZ_URL')}/api/v1/namespaces/{os.getenv('MARQUEZ_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{lineage_run_id(run_id, task)}}}}?api_key={os.environ.get('MARQUEZ_API_KEY')}"
dag=dag)
Development
To install all dependencies for local development:
# Bash
$ pip3 install -e .[dev]
# escape the brackets in zsh
$ pip3 install -e .\[dev\]
To run the entire test suite, you'll first want to initialize the Airflow database:
$ airflow initdb
Then, run the test suite with:
$ pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file marquez-airflow-0.15.2.tar.gz
.
File metadata
- Download URL: marquez-airflow-0.15.2.tar.gz
- Upload date:
- Size: 25.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.5.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.6.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 229d6a52b853e2d53431767573c89b2ec143aff1152fc46c57cdb189233a7e4b |
|
MD5 | 6f8dd9fdecabb0b5d109061cf6f1ba62 |
|
BLAKE2b-256 | 55cf0b2e9426c7bb0e33bba34cbc922a6079a8aaeeb8df4464b8cf9276b95345 |
File details
Details for the file marquez_airflow-0.15.2-py3-none-any.whl
.
File metadata
- Download URL: marquez_airflow-0.15.2-py3-none-any.whl
- Upload date:
- Size: 34.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.5.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.6.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8190d3fa64075cb6d91707b429a527ab95f96b8dba47272dab9ed9276dc0d61a |
|
MD5 | a8aafb4645db301c051e9cc22b15a5e9 |
|
BLAKE2b-256 | f15b5194d3c4ea98300a3fac118b25d7c788bb5f51d6a84c10f7e6d292d07416 |