Skip to main content

A local testing framework for Airflow DAGs.

Project description

Airflow DAG invariant testing

Note: Contributions welcome. I hope this library will belong to the Airflow community.

Dagcheck is a framework to assert for DAG invariants. Users of dagcheck can define DAG invariants to test via assertions, and dagcheck will generate DAG run scenarios that verify these invariants.

Dagcheck was created so that Airflow users could write tests for their DAGs with these characteristics:

  • They are easy to read through and understand
  • They do not orchestrate real infrastructure changes
  • They run on a local development environment
  • They run quickly as part of a developer's flow
  • They can be run in CI/CD and catch issues in the future

dagcheck is especially useful for DAGs that are complex, and that change over time. Tests from dagcheck allow you to offload complex dependency checks from your head onto an automatic test.

Examples

Consider this example of a complex DAG. This DAG has several possible execution paths - and in case of failures, we may want to ensure that it will not leak resources. For example, we may write a test that checks that if we create a resource successfully, we will always clean it up independently of any failure scenario.

For example, if the task create_entry_group succeeds, then we check that the task delete_entry_group will always run, like so:

example_dag = DagBag().get_dag("example_complex")

# First check: If we create an entry group, we want to make sure
# it will be cleaned up.
assert_that(
    given(example_dag)
    .when(task("create_entry_group"), succeeds())
    .then(task("delete_entry_group"), will_run())
)

By creating this test, and running it in CI, we can quickly make sure that our DAG will behave as expected, no matter how much it changes.

To see other examples of usage of the API, look at our unit tests and our small sample DAGs.

Configuring dagcheck

TODO(pabloem)

Caveats and pitfalls

Dagcheck works by simulating DAG execution scenarios.

DAGs that are dependent on side effects

Dagcheck simulates DAG executions, but it will not orchestrate any changes. If parts of your DAG execution depend on side effects from other operators, then Dagcheck will not know about this.

For example, consider a DAG that performs a database export operation, checks the output of those files, and uses them for something else. Something like:

(
  DatabaseExportOperator(
    'data_warehouse_export'
    destination='database_export_file',
    ...
  ) >>
  CheckFileExistsOperator(
    'check_export_went_well'
    filename='database_export_file'
  ) >>
  ArchiveFileInColdStorageOperator(
    'save_backup_to_storage'
    ...
  )
)

In the above code sample, the following statement is true:

  • If the database export runs properly, then the file existence check should succeed. and the archiving operator will run.
  • This is because there is an implicit assumption that if data_warehouse_export runs properly (i.e. succeeds), then check_export_went_well will succeed.

But the following dagcheck test will fail:

# Bad test example:
assert_that(
  given(the_dag)
  .when('data_warehouse_export', succeeds())
  .then('save_backup_to_storage', will_run())
)

This test fails because Dagcheck does not know about the implicit assumption, and assumes that the intermediate task between data_warehouse_export and save_backup_to_storage may still fail.

There are a couple ways to write this test to work well with dagcheck. Here's one of them:

# Good test example:

assert_that(
  given(the_dag)
  .when('data_warehouse_export', succeeds())
  .then('check_export_went_well', will_run())

assert_that(
  given(the_dag)
  .when('check_export_went_well', succeeds())
  .then('save_backup_to_storage', will_run())
)

TODOs before first launch

  • Figure out the name of the library (dagcheck? dagtest? flowtest? ilikedags? flowcheck?, assertflow?)
  • Figure out whether this belongs to airflow or is an independent library
  • Implement DAG-failure and DAG-assumption checkers.

Raw Development Notes

  • 2022/09/16: Picking up the development environment again

I started developing the library as part of airflow/, and later put it in the airflow_play/dagcheck/ directory. Because of this, a lot of import paths in the dagcheck/ directory are hacked up.

Currently, dagcheck tests require an Airflow instance running. To set up the local development environment for dagcheck, you need to run:

# From airflow_play/

# Activate your local virtualenv
. venv/bin/activate

# Run your standalone Airflow instance that runs beside the code
export AIRFLOW_HOME=~/codes/airflow_play/home/
airflow standalone

Once that is set up, you can run tests.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagcheck-0.1.2.tar.gz (20.3 kB view details)

Uploaded Source

File details

Details for the file dagcheck-0.1.2.tar.gz.

File metadata

  • Download URL: dagcheck-0.1.2.tar.gz
  • Upload date:
  • Size: 20.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.8.9

File hashes

Hashes for dagcheck-0.1.2.tar.gz
Algorithm Hash digest
SHA256 a4de65440e24eb35aa49b4cb0ab16cef82bbc0507b14d7dd98261952fd0fdd97
MD5 ac2fa292dcb11784afea3d73dd73dbe9
BLAKE2b-256 429c3354fe85995f8ca6dd56972c404ade7fbc2dede38d083752327e910c68f4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page