Skip to main content

A Python package that creates fine-grained Airflow tasks for dbt

Project description

dbt-airflow

A Python package that helps Data and Analytics engineers render dbt projects on Apache Airflow in a way that every single dbt resource (model, test, seed and snapshot) is represented by a single Airflow task.

Features

  • Render dbt project as a TaskGroup in Airflow Tasks
  • Every model, seed and snapshot resource that has at least a single test, will also have a corresponding test task as a downstream.
  • Add extra tasks before or after the whole dbt project
  • Add extra tasks after specific dbt tasks

How does it work

The library essentially builds on top of the metadata generated by dbt-core and are stored in the target/manifest.json file in your dbt project directory.

Domain Requirements

  • Every dbt project, when compiled, will generate a metadata file under <dbt-project-dir>/target/manifest.json
  • The manifest file contains information about the interdependencies of the project's data models
  • dbt-airflow aims to extract these dependencies such that every dbt entity (snapshot, model, test and seed) has its own task in a Airflow DAG while entity dependencies are persisted
  • Snapshots are never an upstream dependency of any task
  • The creation of snpashots on seeds does not make sense, and thus not handled not even sure if this is even possible on dbt side)
  • Models may have tests
  • Snapshots may have tests
  • Seeds may have tests

Installation

The package is available on PyPI and can be installed through pip:

pip install dbt-airflow

dbt needs to connect to your target environment (database, warehouse etc.) and in order to do so, it makes use of different adapters, each dedicated to a different technology (such as Postgres or BigQuery). Therefore, before running dbt-airflow you also need to ensure that the required adapter(s) are installed in your environment.

For the full list of available adapters please refer to the official dbt documentation.


Usage

dbt-airflow can be used either as a normal Python package, or through the command line interface.

Given that there are possibly many different ways for deploying Airflow and automating different aspects of data workflows that involve Airflow, dbt and potentially other tools as well, we wanted to offer more flexibility by providing different approaches for using dbt-airflow.

Building an Airflow DAG using dbt-airflow

from datetime import datetime
from pathlib import Path

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

from dbt_airflow.core.task_group import DbtTaskGroup
from dbt_airflow.core.task import ExtraTask


with DAG(
    dag_id='test_dag',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    extra_tasks = [
        ExtraTask(
            task_id='test_task',
            operator=PythonOperator,
            operator_args={
                'python_callable': lambda: print('Hello world'),
            },
            upstream_task_ids={
                'model.example_dbt_project.int_customers_per_store',
                'model.example_dbt_project.int_revenue_by_date'
            }
        ),
        ExtraTask(
            task_id='another_test_task',
            operator=PythonOperator,
            operator_args={
                'python_callable': lambda: print('Hello world 2!'),
            },
            upstream_task_ids={
                'test.example_dbt_project.int_customers_per_store',
            },
            downstream_task_ids={
                'snapshot.example_dbt_project.int_customers_per_store_snapshot',
            }
        ),
        ExtraTask(
            task_id='test_task_3',
            operator=PythonOperator,
            operator_args={
                'python_callable': lambda: print('Hello world 3!'),
            },
            downstream_task_ids={
                'snapshot.example_dbt_project.int_customers_per_store_snapshot',
            },
            upstream_task_ids={
                'model.example_dbt_project.int_revenue_by_date',
            },
        )
    ]

    t1 = DummyOperator(task_id='dummy_1')
    t2 = DummyOperator(task_id='dummy_2')

    tg = DbtTaskGroup(
        group_id='dbt-company',
        dbt_manifest_path=Path('/opt/airflow/example_dbt_project/target/manifest.json'),
        dbt_target='dev',
        dbt_project_path=Path('/opt/airflow/example_dbt_project/'),
        dbt_profile_path=Path('/opt/airflow/example_dbt_project/profiles'),
        extra_tasks=extra_tasks,
        create_sub_task_groups=True,
    )

    t1 >> tg >> t2

Contributing

If you would like to contribute to dbt-airflow project, you will essentially need to follow the steps outlined below:

  1. Create a fork of the repository
  2. Set up the development environment on your local machine (see the detailed guide below)
  3. Write and test your contribution
  4. Create a Pull Request

Setting up your local development environment

TO BE FINALISED

docker-compose build

docker-compose up

# Access postgres db (changed to port 5433 given that we have an additional postgres instance for Airflow itsefl)
docker ps  # get the container id of postgres-sakila
docker exec -it <container-id> /bin/bash
psql -U postgres -p 5433 
# Install poetry
pip install poetry

# Install dependencies in poetry venv
poetry install 

# Run tests
poetry run tests -rP -vv

# Run specific test(s)
poetry run tests -k "test_name_or_prefix" -rP -vv

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbt_airflow-0.1.0.tar.gz (11.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbt_airflow-0.1.0-py3-none-any.whl (11.0 kB view details)

Uploaded Python 3

File details

Details for the file dbt_airflow-0.1.0.tar.gz.

File metadata

  • Download URL: dbt_airflow-0.1.0.tar.gz
  • Upload date:
  • Size: 11.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for dbt_airflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ce9e8c4e0290e250d391987e3f4247855d2800e83ee059416acd226fa7d57ece
MD5 df2d6d3e765ce4704462d0fb612cb6ba
BLAKE2b-256 fa6f6fc7782b719b6a944c2a6da17362ce42bfead09d2a723f83e2446f45fbb5

See more details on using hashes here.

File details

Details for the file dbt_airflow-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dbt_airflow-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for dbt_airflow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 636e2a1b4ef7d127525804481ae10ba74ca934481714b0cf28372c2ca771b8ca
MD5 d6cb1acbe2fc19ed43389917569220dd
BLAKE2b-256 c1b5dafc9440c6e4f092ed3a16a59e1516ea592ca6fe5039193d3c30676d696a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page