Skip to main content

DR Solution for Amazon Managed Workflows for Apache Airflow (MWAA)

Project description

MWAA Disaster Recovery

MWAA Python Black CodeCoverage PyPI version

The mwaa-dr PyPi package is a part of the larger MWAA DR solution for the use case where exporting and importing metadata store needs to be performed independent to the broader DR solution. As of the time of writing, access to the MWAA metadata store is only available through DAGs. This solution simplifies the process of creating backup and restore DAGs, respectively, by providing a reusable python library.

Installation

You can install the mwaa-dr package by including the latest version in your MWAA requirements.txt file.

Simple Use Case

Let's look at creating a metadata backup and restore dags, respectively, as follows:

Metadata Backup DAG

Let's assume your environment version is 2.8.1. You can create a metadata backup dag by creating a python file in your MWAA dags folder as follows:

backup_metadata.py:

# Importing DAG is necessary for DAG detection
from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8

factory = DRFactory_2_8(
    dag_id='backup',
    path_prefix='data',
    storage_type='S3'
)

# Assigning the returned dag to a global variable is necessary for DAG detection
dag: DAG = factory.create_backup_dag()

For running backup and restore on your Amazon MWAA environment on AWS, you need to do the following:

  1. Ensure you have an S3 bucket created to store the backup.
  2. Ensure that your MWAA execution role has read and write permissions on the bucket.
  3. Create an Airflow variable with the key named DR_BACKUP_BUCKET and the value containing the name (not ARN) of the S3 bucket.
  4. You are all set to manually trigger the backup and restore DAGs at any point. The metadata backup will be stored in <backup S3 bucket>/<path_prefix>.

If you want to use the solution with aws-mwaa-local-runner, change the storage_type argument from S3 to LOCAL_FS. The backup will be located in the dags/data folder or more generally at the dags/<path_prefix> folder of the local runner project.

Here is a sample run of the backup workflow:

Backup Workflow

Metadata Restore DAG

You can create a metadata restore dag by creating a python file in your MWAA dags folder as follows:

restore_metadata.py:

from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8

factory = DRFactory_2_8(
    dag_id='restore',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_restore_dag()

Here is a sample run of the restore workflow:

Restore Workflow

Please note that variable and connection tables are handled specially during the restore process. You can specify a restore strategy to be applied for these two tables by setting DR_VARIABLE_RESTORE_STRATEGY and DR_CONNECTION_RESTORE_STRATEGY Airflow variables. These variables can take on of the following values:

  1. DO_NOTHING: As the name suggests, this strategy will not restore the variable and connection tables from the backup. This strategy is particularly useful if your MWAA environments have been configured to use AWS Secrets Manager for storing variables and connections.

  2. APPEND: With this strategy, the restore workflow will not overwrite existing entries of the variable and connection tables and only add missing entries from the backup.

  3. REPLACE: This strategy will overwrite existing variable and connections from backup.

Note that these two Airflow variables are treated specially and are unaffected by the restore process of the variable table. In the absence of these variables, the default value of APPEND is used for both variable and connection restores.

Note that you will need an empty database for restore to work. To cleanup the database before restore, please use the cleanup_metadata DAG discussed next.

Metadata Cleanup DAG

You can create a metadata cleanup dag by creating a python file in your MWAA dags folder as follows:

cleanup_metadata.py:

from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8

factory = DRFactory_2_8(
    dag_id='cleanup',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_cleanup_dag()

Advance Use Case

You may have some advance use cases, such as, a need to exclude or include additional tables from your backup, update SQL scripts for specific tables, and others. Note that, by default, the solution backs up only variable, connection, slot_pool, log, job, dag_run, trigger, task_instance, task_fail, and xcom tables. Majority of other tables are auto-generated by scheduler or by the webserver and thus, excluded from the list of tables to be backed up.

To add/remove tables from the backup or customize any aspects of the solution, you will derive from an appropriate factory class and override its methods. To see this more concretely, let's assume you want to exclude variable and connection tables from the backup and restore operations. Please follow the subsequent implementation guidelines.

First, find the appropriate factory class from the supported versions in the mwaa_dr package. Let's assume your environment version is 2.7.2. So, you will pick DRFactory_2_7 as your base class for inheritance and override its setup_tables method.

Note that majority of the functionality for the DR framework has either been implemented in BaseDRFactory or in DRFactory_2_5. The other factories just implement a chain of inheritance from these classes and override specific methods to implement differences.

Here is a sample implementation of your derived class that you will need to create in your dags folder, let's name the file custom_dr_factory_2_7.py:

custom_dr_factory_2_7.py:

from mwaa_dr.framework.model.base_table import BaseTable
from mwaa_dr.framework.model.dependency_model import DependencyModel
from mwaa_dr.v_2_7.dr_factory import DRFactory_2_7

class CustomDRFactory_2_7(DRFactory_2_7):
    def setup_tables(self, model: DependencyModel[BaseTable]) -> list[BaseTable]:
        # Create needed tables, all extend from the BaseTable class
        active_dag = self.active_dag(model)

        # Comment out variable and connection from the previous implementation
        # variable = self.variable(model)
        # connection = self.connection(model)
        slot_pool = self.slot_pool(model)

        log = self.log(model)
        job = self.job(model)
        dag_run = self.dag_run(model)
        trigger = self.trigger(model)

        task_instance = self.task_instance(model)
        task_fail = self.task_fail(model)

        # Specify dependencies
        task_instance << [job, trigger, dag_run]
        task_fail << [task_instance, dag_run]
        active_dag << [
            # variable,
            # connection,
            slot_pool,
            log,
            job,
            dag_run,
            trigger,
            task_instance,
            task_fail,
        ]

        # Return the list of tables to be included in backup and restore
        return [
            # variable,
            # connection,
            slot_pool,
            log,
            job,
            dag_run,
            trigger,
            task_instance,
            task_fail,
            active_dag,
        ]

Here is your metadata backup dag that will use your custom factory (also in the dags folder):

backup.py:

from airflow import DAG
from custom_dr_factory_2_7 import CustomDRFactory_2_7

factory = CustomDRFactory_2_7(
    dag_id='backup',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_backup_dag()

And finally, here is your metadata restore dag (also in the dags folder):

restore.py:

from airflow import DAG
from custom_dr_factory_2_7 import CustomDRFactory_2_7

factory = CustomDRFactory_2_7(
    dag_id='restore',
    path_prefix='data',
    storage_type='S3'
)

dag: DAG = factory.create_restore_dag()

For additional details, please visit the project homepage.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mwaa_dr-2.0.1.tar.gz (38.0 kB view details)

Uploaded Source

Built Distribution

mwaa_dr-2.0.1-py3-none-any.whl (37.9 kB view details)

Uploaded Python 3

File details

Details for the file mwaa_dr-2.0.1.tar.gz.

File metadata

  • Download URL: mwaa_dr-2.0.1.tar.gz
  • Upload date:
  • Size: 38.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.5

File hashes

Hashes for mwaa_dr-2.0.1.tar.gz
Algorithm Hash digest
SHA256 30dab4b5179cbeb952fc512e97a893e7319add7bd7a168768c67e2c3d1a4938e
MD5 0ba3ed92aa4228d4f864c9bd6104c6db
BLAKE2b-256 80150acd4f8f11a13c1de749a7c92dd682faa07c31a440aaf23f6d444789d4d5

See more details on using hashes here.

File details

Details for the file mwaa_dr-2.0.1-py3-none-any.whl.

File metadata

  • Download URL: mwaa_dr-2.0.1-py3-none-any.whl
  • Upload date:
  • Size: 37.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.5

File hashes

Hashes for mwaa_dr-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6da3203a4222d521c31edf39ad3f7256807f58156ea0d1d12e5eaa7744ed81bf
MD5 aa39f75cd8362d454a1470ee381f9d2d
BLAKE2b-256 781769c6fff1a4be2f85c2d7cf50a775f0cb60c9c9c0ac107895bf0eeafeee2c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page