Skip to main content

Apache Airflow integration for dbt

Project description

airflow-dbt

This is a collection of Airflow operators to provide easy integration with dbt.

from airflow import DAG
from airflow_dbt.operators.dbt_operator import (
    DbtSeedOperator,
    DbtSnapshotOperator,
    DbtRunOperator,
    DbtTestOperator
)
from airflow.utils.dates import days_ago

default_args = {
  'dir': '/srv/app/dbt',
  'start_date': days_ago(0)
}

with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:

  dbt_seed = DbtSeedOperator(
    task_id='dbt_seed',
  )

  dbt_snapshot = DbtSnapshotOperator(
    task_id='dbt_snapshot',
  )

  dbt_run = DbtRunOperator(
    task_id='dbt_run',
  )

  dbt_test = DbtTestOperator(
    task_id='dbt_test',
    retries=0,  # Failing tests would fail the task, and we don't want Airflow to try again
  )

  dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test

Installation

Install from PyPI:

pip install airflow-dbt

It will also need access to the dbt CLI, which should either be on your PATH or can be set with the dbt_bin argument in each operator.

Usage

There are five operators currently implemented:

Each of the above operators accept the arguments in here (dbt_command_config). The main ones being:

  • profiles_dir
    • If set, passed as the --profiles-dir argument to the dbt command
  • target
    • If set, passed as the --target argument to the dbt command
  • dir
    • The directory to run the dbt command in
  • full_refresh
    • If set to True, passes --full-refresh
  • vars
    • If set, passed as the --vars argument to the dbt command. Should be set as a Python dictionary, as will be passed to the dbt command as YAML
  • models
    • If set, passed as the --models argument to the dbt command
  • exclude
    • If set, passed as the --exclude argument to the dbt command
  • select
    • If set, passed as the --select argument to the dbt command
  • dbt_bin
    • The dbt CLI. Defaults to dbt, so assumes it's on your PATH
  • verbose
    • The operator will log verbosely to the Airflow logs
  • warn_error
    • If set to True, passes --warn-error argument to dbt command and will treat warnings as errors

Typically you will want to use the DbtRunOperator, followed by the DbtTestOperator, as shown earlier.

You can also use the hook directly. Typically this can be used for when you need to combine the dbt command with another task in the same operators, for example running dbt docs and uploading the docs to somewhere they can be served from.

A more advanced example:

If want to run your dbt project other tan in the airflow worker you can use the DbtCloudBuildHook and apply it to the DbtBaseOperator or simply use the provided DbtCloudBuildOperator:

from airflow_dbt.hooks import DbtCloudBuildHook
from airflow_dbt.operators import DbtBaseOperator, DbtCloudBuildOperator
DbtBaseOperator(
    task_id='provide_hook',
    command='run',
    use_colors=False,
    config={
        'profiles_dir': './jaffle-shop',
        'project_dir': './jaffle-shop',
    },
    dbt_hook=DbtCloudBuildHook(
        gcs_staging_location='gs://my-bucket/compressed-dbt-project.tar.gz'
    )
)

DbtCloudBuildOperator(
    task_id='default_hook_cloudbuild',
    gcs_staging_location='gs://my-bucket/compressed-dbt-project.tar.gz',
    command='run',
    use_colors=False,
    config={
        'profiles_dir': './jaffle-shop',
        'project_dir': './jaffle-shop',
    },
)

You can either define the dbt params/config/flags in the operator or you can group them into a config param. They both have validation, but only the config has templating. The following two tasks are equivalent:

from airflow_dbt.operators.dbt_operator import DbtBaseOperator

DbtBaseOperator(
    task_id='config_param',
    command='run',
    config={
        'profiles_dir': './jaffle-shop',
        'project_dir': './jaffle-shop',
        'dbt_bin': '/usr/local/airflow/.local/bin/dbt',
        'use_colors': False
    }
)

DbtBaseOperator(
    task_id='flat_config',
    command='run',
    profiles_dir='./jaffle-shop',
    project_dir='./jaffle-shop',
    dbt_bin='/usr/local/airflow/.local/bin/dbt',
    use_colors=False
)

Building Locally

To install from the repository: First it's recommended to create a virtual environment:

python3 -m venv .venv

source .venv/bin/activate

Install using pip:

pip install .

Testing

To run tests locally, first create a virtual environment (see Building Locally section)

Install dependencies:

pip install . pytest

Run the tests:

pytest tests/

Code style

This project uses flake8.

To check your code, first create a virtual environment (see Building Locally section):

pip install flake8
flake8 airflow_dbt/ tests/ setup.py

Package management

If you use dbt's package manager you should include all dependencies before deploying your dbt project.

For Docker users, packages specified in packages.yml should be included as part your docker image by calling dbt deps in your Dockerfile.

Amazon Managed Workflows for Apache Airflow (MWAA)

If you use MWAA, you just need to update the requirements.txt file and add airflow-dbt and dbt to it.

Then you can have your dbt code inside a folder {DBT_FOLDER} in the dags folder on S3 and configure the dbt task like below:

from airflow_dbt.operators.dbt_operator import DbtRunOperator 

dbt_run=DbtRunOperator(
  task_id='dbt_run',
  dbt_bin='/usr/local/airflow/.local/bin/dbt',
  profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
  dir='/usr/local/airflow/dags/{DBT_FOLDER}/'
)

License & Contributing

GoCardless ♥ open source. If you do too, come join us.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_dbt_dinigo-0.5.10.tar.gz (27.3 kB view details)

Uploaded Source

Built Distribution

airflow_dbt_dinigo-0.5.10-py2.py3-none-any.whl (15.5 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file airflow_dbt_dinigo-0.5.10.tar.gz.

File metadata

  • Download URL: airflow_dbt_dinigo-0.5.10.tar.gz
  • Upload date:
  • Size: 27.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.19.0

File hashes

Hashes for airflow_dbt_dinigo-0.5.10.tar.gz
Algorithm Hash digest
SHA256 07686b3b5ac9a95abec14840487686e54b9afeb324faf23884e633c5fafe1295
MD5 6f29802ed2a0055b1115b0d3712011e9
BLAKE2b-256 33f7926edee4ac80b90ab1236daeac01f3b1cc3860fbce6044216e083cd4cd0a

See more details on using hashes here.

File details

Details for the file airflow_dbt_dinigo-0.5.10-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_dbt_dinigo-0.5.10-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 cbc6837655d4f179e341574b1e9e3e7b49c2b65351cf8799d93fadc3f9babe71
MD5 3fbc46b36807dc5251d7dcf8e1a859a3
BLAKE2b-256 0ed307b854da4d7369daf54d0e7ea949a380f2c263e78ef00bcc526ed0e0458a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page