A Python package that creates fine-grained Airflow tasks for dbt
Project description
dbt-airflow
A Python package that helps Data and Analytics engineers render dbt projects on Apache Airflow in a way that every single dbt resource (model, test, seed and snapshot) is represented by a single Airflow task.
Features
- Render dbt project as a TaskGroup in Airflow Tasks
- Every
model
,seed
andsnapshot
resource that has at least a single test, will also have a corresponding test task as a downstream. - Add extra tasks before or after the whole dbt project
- Add extra tasks after specific dbt tasks
How does it work
The library essentially builds on top of the metadata generated by dbt-core
and are stored in
the target/manifest.json
file in your dbt project directory.
Domain Requirements
- Every dbt project, when compiled, will generate a metadata file under
<dbt-project-dir>/target/manifest.json
- The manifest file contains information about the interdependencies of the project's data models
dbt-airflow
aims to extract these dependencies such that every dbt entity (snapshot, model, test and seed) has its own task in a Airflow DAG while entity dependencies are persisted- Snapshots are never an upstream dependency of any task
- The creation of snpashots on seeds does not make sense, and thus not handled not even sure if this is even possible on dbt side)
- Models may have tests
- Snapshots may have tests
- Seeds may have tests
Installation
The package is available on PyPI and can be installed through pip
:
pip install dbt-airflow
dbt
needs to connect to your target environment (database, warehouse etc.) and in order to do so, it makes use of
different adapters, each dedicated to a different technology (such as Postgres or BigQuery). Therefore, before running
dbt-airflow
you also need to ensure that the required adapter(s) are installed in your environment.
For the full list of available adapters please refer to the official dbt documentation.
Usage
dbt-airflow
can be used either as a normal Python package, or through the
command line interface.
Given that there are possibly many different ways for deploying Airflow and automating different aspects
of data workflows that involve Airflow, dbt and potentially other tools as well, we wanted to offer more
flexibility by providing different approaches for using dbt-airflow
.
Building an Airflow DAG using dbt-airflow
from datetime import datetime
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from dbt_airflow.core.task_group import DbtTaskGroup
from dbt_airflow.core.task import ExtraTask
with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
extra_tasks = [
ExtraTask(
task_id='test_task',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world'),
},
upstream_task_ids={
'model.example_dbt_project.int_customers_per_store',
'model.example_dbt_project.int_revenue_by_date'
}
),
ExtraTask(
task_id='another_test_task',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world 2!'),
},
upstream_task_ids={
'test.example_dbt_project.int_customers_per_store',
},
downstream_task_ids={
'snapshot.example_dbt_project.int_customers_per_store_snapshot',
}
),
ExtraTask(
task_id='test_task_3',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world 3!'),
},
downstream_task_ids={
'snapshot.example_dbt_project.int_customers_per_store_snapshot',
},
upstream_task_ids={
'model.example_dbt_project.int_revenue_by_date',
},
)
]
t1 = DummyOperator(task_id='dummy_1')
t2 = DummyOperator(task_id='dummy_2')
tg = DbtTaskGroup(
group_id='dbt-company',
dbt_manifest_path=Path('/opt/airflow/example_dbt_project/target/manifest.json'),
dbt_target='dev',
dbt_project_path=Path('/opt/airflow/example_dbt_project/'),
dbt_profile_path=Path('/opt/airflow/example_dbt_project/profiles'),
extra_tasks=extra_tasks,
create_sub_task_groups=True,
)
t1 >> tg >> t2
Contributing
If you would like to contribute to dbt-airflow
project, you will essentially need to follow the steps outlined below:
- Create a fork of the repository
- Set up the development environment on your local machine (see the detailed guide below)
- Write and test your contribution
- Create a Pull Request
Setting up your local development environment
TO BE FINALISED
docker-compose build
docker-compose up
# Access postgres db (changed to port 5433 given that we have an additional postgres instance for Airflow itsefl)
docker ps # get the container id of postgres-sakila
docker exec -it <container-id> /bin/bash
psql -U postgres -p 5433
# Install poetry
pip install poetry
# Install dependencies in poetry venv
poetry install
# Run tests
poetry run tests -rP -vv
# Run specific test(s)
poetry run tests -k "test_name_or_prefix" -rP -vv
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for dbt_airflow-0.2.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1ca7ca3f6b4a4c337350364aea6601328e8e2e0474803ad9bf5f38e7db86e74a |
|
MD5 | 32e2c4bb85c533e9533855b1c5156a1e |
|
BLAKE2b-256 | 5da8b937160c087e0f16da3771cb171296f50679de657257a81f35f8b6a325f0 |