Marquez integration with Airflow
Project description
marquez-airflow
A library that integrates Airflow DAGs
with Marquez for automatic metadata collection.
Status
This library is under active development at Datakin.
Requirements
Installation
$ pip3 install marquez-airflow
To install from source, run:
$ python3 setup.py install
Settings
Pointing to your Marquez service
marquez-airflow
needs to know where to talk to the Marquez server API. You can set these using environment variables to be read by your Airflow service.
You will also need to set the namespace if you are using something other than the default
namespace.
MARQUEZ_URL=http://my_hosted_marquez.example.com:5000
MARQUEZ_NAMESPACE=my_special_ns
NOTE: In the latest version of marquez-python
, the constructor requires a url
parameter for the host and port. Presumably, the logic to read the env vars will move into this library
Extractors : Sending the correct data from your DAGs
If you do nothing, Marquez will receive the Job
and the Run
from your DAGs, but sources and datasets will not be sent.
marquez-airflow
allows you to do more than that by building "Extractors". Extractors are in the process of changing right now, but they basically take a task and extract:
- Name : The name of the task
- Location : Location of the code for the task
- Inputs : List of input datasets
- Outputs : List of output datasets
- Context : The Airflow context for the task
It's important to understand the inputs and outputs are lists and relate directly to the Dataset
object in Marquez. Datasets also include a source which relates directly to the Source
object in Marquez.
A PostgresExtractor is currently in progress. When that's merged, it will represent a good example of how to write custom extractors
Usage
from datetime import datetime
from marquez_airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'datascience',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'email': ['datascience@datakin.com']
}
dag = DAG(
'orders_popular_day_of_week',
schedule_interval='@weekly',
default_args=default_args,
description='Determines the popular day of week orders are placed.'
)
t1 = PostgresOperator(
task_id='if_not_exists',
postgres_conn_id='food_delivery_db',
sql='''
CREATE TABLE IF NOT EXISTS popular_orders_day_of_week (
order_day_of_week VARCHAR(64) NOT NULL,
order_placed_on TIMESTAMP NOT NULL,
orders_placed INTEGER NOT NULL
);''',
dag=dag
)
t2 = PostgresOperator(
task_id='insert',
postgres_conn_id='food_delivery_db',
sql='''
INSERT INTO popular_orders_day_of_week (order_day_of_week, order_placed_on, orders_placed)
SELECT EXTRACT(ISODOW FROM order_placed_on) AS order_day_of_week,
order_placed_on,
COUNT(*) AS orders_placed
FROM top_delivery_times
GROUP BY order_placed_on;
''',
dag=dag
)
t1 >> t2
Contributing
See CONTRIBUTING.md for more details about how to contribute.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file marquez-airflow-0.2.2.tar.gz
.
File metadata
- Download URL: marquez-airflow-0.2.2.tar.gz
- Upload date:
- Size: 11.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.0.1 requests-toolbelt/0.8.0 tqdm/4.48.2 CPython/3.6.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8a0e5e9cc12366aa1cdf47a8c1e99d67ebe3dd7e13220917e3c5104e70cb6a94 |
|
MD5 | 8cbf847625b4d520f2465ad77c1c7bcc |
|
BLAKE2b-256 | 61e18efd7460973b07a8545f6ae32412bc328e083986ca8d35ca3c272f464fee |