A local testing framework for Airflow DAGs.
Project description
Airflow DAG invariant testing
Note: Contributions welcome. I hope this library will belong to the Airflow community.
Dagcheck is a framework to assert for DAG invariants. Users of dagcheck can define DAG invariants to test via assertions, and dagcheck will generate DAG run scenarios that verify these invariants.
Dagcheck was created so that Airflow users could write tests for their DAGs with these characteristics:
- They are easy to read through and understand
- They do not orchestrate real infrastructure changes
- They run on a local development environment
- They run quickly as part of a developer's flow
- They can be run in CI/CD and catch issues in the future
dagcheck
is especially useful for DAGs that are complex, and that change
over time. Tests from dagcheck
allow you to offload complex dependency
checks from your head onto an automatic test.
Examples
Consider this example of a complex DAG. This DAG has several possible execution paths - and in case of failures, we may want to ensure that it will not leak resources. For example, we may write a test that checks that if we create a resource successfully, we will always clean it up independently of any failure scenario.
For example, if the task create_entry_group
succeeds, then we check that
the task delete_entry_group
will always run, like so:
example_dag = DagBag().get_dag("example_complex")
# First check: If we create an entry group, we want to make sure
# it will be cleaned up.
assert_that(
given(example_dag)
.when(task("create_entry_group"), succeeds())
.then(task("delete_entry_group"), will_run())
)
By creating this test, and running it in CI, we can quickly make sure that our DAG will behave as expected, no matter how much it changes.
To see other examples of usage of the API, look at our unit tests and our small sample DAGs.
Configuring dagcheck
TODO(pabloem)
Caveats and pitfalls
Dagcheck works by simulating DAG execution scenarios.
DAGs that are dependent on side effects
Dagcheck simulates DAG executions, but it will not orchestrate any changes. If parts of your DAG execution depend on side effects from other operators, then Dagcheck will not know about this.
For example, consider a DAG that performs a database export operation, checks the output of those files, and uses them for something else. Something like:
(
DatabaseExportOperator(
'data_warehouse_export'
destination='database_export_file',
...
) >>
CheckFileExistsOperator(
'check_export_went_well'
filename='database_export_file'
) >>
ArchiveFileInColdStorageOperator(
'save_backup_to_storage'
...
)
)
In the above code sample, the following statement is true:
- If the database export runs properly, then the file existence check should succeed. and the archiving operator will run.
- This is because there is an implicit assumption that if
data_warehouse_export
runs properly (i.e. succeeds), thencheck_export_went_well
will succeed.
But the following dagcheck test will fail:
# Bad test example:
assert_that(
given(the_dag)
.when('data_warehouse_export', succeeds())
.then('save_backup_to_storage', will_run())
)
This test fails because Dagcheck does not know about the implicit assumption,
and assumes that the intermediate task between data_warehouse_export
and
save_backup_to_storage
may still fail.
There are a couple ways to write this test to work well with dagcheck. Here's one of them:
# Good test example:
assert_that(
given(the_dag)
.when('data_warehouse_export', succeeds())
.then('check_export_went_well', will_run())
assert_that(
given(the_dag)
.when('check_export_went_well', succeeds())
.then('save_backup_to_storage', will_run())
)
TODOs before first launch
- Figure out the name of the library (dagcheck? dagtest? flowtest? ilikedags? flowcheck?, assertflow?)
- Figure out whether this belongs to airflow or is an independent library
- Implement DAG-failure and DAG-assumption checkers.
Raw Development Notes
- 2022/09/16: Picking up the development environment again
I started developing the library as part of airflow/, and later put it in the
airflow_play/dagcheck/
directory. Because of this, a lot of import paths in the
dagcheck/
directory are hacked up.
Currently, dagcheck tests require an Airflow instance running. To set up the local development environment for dagcheck, you need to run:
# From airflow_play/
# Activate your local virtualenv
. venv/bin/activate
# Run your standalone Airflow instance that runs beside the code
export AIRFLOW_HOME=~/codes/airflow_play/home/
airflow standalone
Once that is set up, you can run tests.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file dagcheck-0.1.2.tar.gz
.
File metadata
- Download URL: dagcheck-0.1.2.tar.gz
- Upload date:
- Size: 20.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.8.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a4de65440e24eb35aa49b4cb0ab16cef82bbc0507b14d7dd98261952fd0fdd97 |
|
MD5 | ac2fa292dcb11784afea3d73dd73dbe9 |
|
BLAKE2b-256 | 429c3354fe85995f8ca6dd56972c404ade7fbc2dede38d083752327e910c68f4 |