Apache Airflow integration for dbt
Project description
airflow-dbt
This is a collection of Airflow operators to provide easy integration with dbt.
from airflow import DAG
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)
from airflow.utils.dates import days_ago
default_args = {
'dir': '/srv/app/dbt',
'start_date': days_ago(0)
}
with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:
dbt_seed = DbtSeedOperator(
task_id='dbt_seed',
)
dbt_snapshot = DbtSnapshotOperator(
task_id='dbt_snapshot',
)
dbt_run = DbtRunOperator(
task_id='dbt_run',
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)
dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test
Installation
Install from PyPI:
pip install airflow-dbt
It will also need access to the dbt
CLI, which should either be on your PATH
or can be set with the dbt_bin
argument in each operator.
Usage
There are five operators currently implemented:
DbtDocsGenerateOperator
- Calls
dbt docs generate
- Calls
DbtDepsOperator
- Calls
dbt deps
- Calls
DbtSeedOperator
- Calls
dbt seed
- Calls
DbtSnapshotOperator
- Calls
dbt snapshot
- Calls
DbtRunOperator
- Calls
dbt run
- Calls
DbtTestOperator
- Calls
dbt test
- Calls
Each of the above operators accept the arguments in here (dbt_command_config). The main ones being:
profiles_dir
- If set, passed as the
--profiles-dir
argument to thedbt
command
- If set, passed as the
target
- If set, passed as the
--target
argument to thedbt
command
- If set, passed as the
dir
- The directory to run the
dbt
command in
- The directory to run the
full_refresh
- If set to
True
, passes--full-refresh
- If set to
vars
- If set, passed as the
--vars
argument to thedbt
command. Should be set as a Python dictionary, as will be passed to thedbt
command as YAML
- If set, passed as the
models
- If set, passed as the
--models
argument to thedbt
command
- If set, passed as the
exclude
- If set, passed as the
--exclude
argument to thedbt
command
- If set, passed as the
select
- If set, passed as the
--select
argument to thedbt
command
- If set, passed as the
dbt_bin
- The
dbt
CLI. Defaults todbt
, so assumes it's on yourPATH
- The
verbose
- The operator will log verbosely to the Airflow logs
warn_error
- If set to
True
, passes--warn-error
argument todbt
command and will treat warnings as errors
- If set to
Typically you will want to use the DbtRunOperator
, followed by the DbtTestOperator
, as shown earlier.
You can also use the hook directly. Typically this can be used for when you need to combine the dbt
command with another task in the same operators, for example running dbt docs
and uploading the docs to somewhere they can be served from.
A more advanced example:
If want to run your dbt
project other tan in the airflow worker you can use
the DbtCloudBuildHook
and apply it to the DbtBaseOperator
or simply use the
provided DbtCloudBuildOperator
:
from airflow_dbt.hooks import DbtCloudBuildHook
from airflow_dbt.operators import DbtBaseOperator, DbtCloudBuildOperator
DbtBaseOperator(
task_id='provide_hook',
command='run',
use_colors=False,
config={
'profiles_dir': './jaffle-shop',
'project_dir': './jaffle-shop',
},
dbt_hook=DbtCloudBuildHook(
gcs_staging_location='gs://my-bucket/compressed-dbt-project.tar.gz'
)
)
DbtCloudBuildOperator(
task_id='default_hook_cloudbuild',
gcs_staging_location='gs://my-bucket/compressed-dbt-project.tar.gz',
command='run',
use_colors=False,
config={
'profiles_dir': './jaffle-shop',
'project_dir': './jaffle-shop',
},
)
You can either define the dbt params/config/flags in the operator or you can
group them into a config
param. They both have validation, but only the config
has templating. The following two tasks are equivalent:
from airflow_dbt.operators.dbt_operator import DbtBaseOperator
DbtBaseOperator(
task_id='config_param',
command='run',
config={
'profiles_dir': './jaffle-shop',
'project_dir': './jaffle-shop',
'dbt_bin': '/usr/local/airflow/.local/bin/dbt',
'use_colors': False
}
)
DbtBaseOperator(
task_id='flat_config',
command='run',
profiles_dir='./jaffle-shop',
project_dir='./jaffle-shop',
dbt_bin='/usr/local/airflow/.local/bin/dbt',
use_colors=False
)
Building Locally
To install from the repository: First it's recommended to create a virtual environment:
python3 -m venv .venv
source .venv/bin/activate
Install using pip
:
pip install .
Testing
To run tests locally, first create a virtual environment (see Building Locally section)
Install dependencies:
pip install . pytest
Run the tests:
pytest tests/
Code style
This project uses flake8.
To check your code, first create a virtual environment (see Building Locally section):
pip install flake8
flake8 airflow_dbt/ tests/ setup.py
Package management
If you use dbt's package manager you should include all dependencies before deploying your dbt project.
For Docker users, packages specified in packages.yml
should be included as part your docker image by calling dbt deps
in your Dockerfile
.
Amazon Managed Workflows for Apache Airflow (MWAA)
If you use MWAA, you just need to update the requirements.txt
file and add airflow-dbt
and dbt
to it.
Then you can have your dbt code inside a folder {DBT_FOLDER}
in the dags folder on S3 and configure the dbt task like below:
from airflow_dbt.operators.dbt_operator import DbtRunOperator
dbt_run=DbtRunOperator(
task_id='dbt_run',
dbt_bin='/usr/local/airflow/.local/bin/dbt',
profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
dir='/usr/local/airflow/dags/{DBT_FOLDER}/'
)
License & Contributing
- This is available as open source under the terms of the MIT License.
- Bug reports and pull requests are welcome on GitHub at https://github.com/gocardless/airflow-dbt.
GoCardless ♥ open source. If you do too, come join us.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file airflow_dbt_dinigo-0.5.10.tar.gz
.
File metadata
- Download URL: airflow_dbt_dinigo-0.5.10.tar.gz
- Upload date:
- Size: 27.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.19.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 07686b3b5ac9a95abec14840487686e54b9afeb324faf23884e633c5fafe1295 |
|
MD5 | 6f29802ed2a0055b1115b0d3712011e9 |
|
BLAKE2b-256 | 33f7926edee4ac80b90ab1236daeac01f3b1cc3860fbce6044216e083cd4cd0a |
File details
Details for the file airflow_dbt_dinigo-0.5.10-py2.py3-none-any.whl
.
File metadata
- Download URL: airflow_dbt_dinigo-0.5.10-py2.py3-none-any.whl
- Upload date:
- Size: 15.5 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.19.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cbc6837655d4f179e341574b1e9e3e7b49c2b65351cf8799d93fadc3f9babe71 |
|
MD5 | 3fbc46b36807dc5251d7dcf8e1a859a3 |
|
BLAKE2b-256 | 0ed307b854da4d7369daf54d0e7ea949a380f2c263e78ef00bcc526ed0e0458a |