DR Solution for Amazon Managed Workflows for Apache Airflow (MWAA)
Project description
MWAA Disaster Recovery
The mwaa-dr
PyPi package is a part of the larger MWAA DR solution for the use case where exporting and importing metadata store needs to be performed independent to the broader DR solution. As of the time of writing, access to the MWAA metadata store is only available through DAGs. This solution simplifies the process of creating backup and restore DAGs, respectively, by providing a reusable python library.
Installation
You can install the mwaa-dr
package by including the latest version in your MWAA requirements.txt
file.
Simple Use Case
Let's look at creating a metadata backup and restore dags, respectively, as follows:
Metadata Backup DAG
Let's assume your environment version is 2.8.1
. You can create a metadata backup dag by creating a python file in your MWAA dags
folder as follows:
backup_metadata.py:
# Importing DAG is necessary for DAG detection
from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8
factory = DRFactory_2_8(
dag_id='backup',
path_prefix='data',
storage_type='S3'
)
# Assigning the returned dag to a global variable is necessary for DAG detection
dag: DAG = factory.create_backup_dag()
For running backup and restore on your Amazon MWAA environment on AWS, you need to do the following:
- Ensure you have an S3 bucket created to store the backup.
- Ensure that your MWAA execution role has read and write permissions on the bucket.
- Create an Airflow variable with the key named
DR_BACKUP_BUCKET
and the value containing the name (not ARN) of the S3 bucket. - You are all set to manually trigger the backup and restore DAGs at any point. The metadata backup will be stored in
<backup S3 bucket>/<path_prefix>
.
If you want to use the solution with aws-mwaa-local-runner, change the storage_type
argument from S3
to LOCAL_FS
. The backup will be located in the dags/data
folder or more generally at the dags/<path_prefix>
folder of the local runner project.
Here is a sample run of the backup workflow:
Metadata Restore DAG
You can create a metadata restore dag by creating a python file in your MWAA dags
folder as follows:
restore_metadata.py:
from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8
factory = DRFactory_2_8(
dag_id='restore',
path_prefix='data',
storage_type='S3'
)
dag:DAG = factory.create_restore_dag()
Here is a sample run of the restore workflow:
Please note that variable
and connection
tables are handled specially during the restore process. You can specify a restore strategy to be applied for these two tables by setting DR_VARIABLE_RESTORE_STRATEGY
and DR_CONNECTION_RESTORE_STRATEGY
Airflow variables. These variables can take on of the following values:
-
DO_NOTHING: As the name suggests, this strategy will not restore the variable and connection tables from the backup. This strategy is particularly useful if your MWAA environments have been configured to use AWS Secrets Manager for storing variables and connections.
-
APPEND: With this strategy, the restore workflow will not overwrite existing entries of the variable and connection tables and only add missing entries from the backup.
-
REPLACE: This strategy will overwrite existing variable and connections from backup.
Note that these two Airflow variables are treated specially and are unaffected by the restore process of the variable
table. In the absence of these variables, the default value of APPEND
is used for both variable
and connection
restores.
Note that you will need an empty database for restore to work. To cleanup the database before restore, please use the cleanup_metadata
DAG discussed next.
Metadata Cleanup DAG
You can create a metadata cleanup dag by creating a python file in your MWAA dags
folder as follows:
cleanup_metadata.py:
from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8
factory = DRFactory_2_8(
dag_id='cleanup',
path_prefix='data',
storage_type='S3'
)
dag:DAG = factory.create_cleanup_dag()
Advance Use Case
You may have some advance use cases, such as, a need to exclude or include additional tables from your backup, update SQL scripts for specific tables, and others. Note that, by default, the solution backs up only variable
, connection
, slot_pool
, log
, job
, dag_run
, trigger
, task_instance
, task_fail
, and xcom
tables. Majority of other tables are auto-generated by scheduler or by the webserver and thus, excluded from the list of tables to be backed up.
To add/remove tables from the backup or customize any aspects of the solution, you will derive from an appropriate factory class and override its methods. To see this more concretely, let's assume you want to exclude variable
and connection
tables from the backup and restore operations. Please follow the subsequent implementation guidelines.
First, find the appropriate factory class from the supported versions in the mwaa_dr package. Let's assume your environment version is 2.7.2
. So, you will pick DRFactory_2_7 as your base class for inheritance and override its setup_tables
method.
Note that majority of the functionality for the DR framework has either been implemented in BaseDRFactory or in DRFactory_2_5. The other factories just implement a chain of inheritance from these classes and override specific methods to implement differences.
Here is a sample implementation of your derived class that you will need to create in your dags
folder, let's name the file custom_dr_factory_2_7.py
:
custom_dr_factory_2_7.py:
from mwaa_dr.framework.model.base_table import BaseTable
from mwaa_dr.framework.model.dependency_model import DependencyModel
from mwaa_dr.v_2_7.dr_factory import DRFactory_2_7
class CustomDRFactory_2_7(DRFactory_2_7):
def setup_tables(self, model: DependencyModel[BaseTable]) -> list[BaseTable]:
# Create needed tables, all extend from the BaseTable class
active_dag = self.active_dag(model)
# Comment out variable and connection from the previous implementation
# variable = self.variable(model)
# connection = self.connection(model)
slot_pool = self.slot_pool(model)
log = self.log(model)
job = self.job(model)
dag_run = self.dag_run(model)
trigger = self.trigger(model)
task_instance = self.task_instance(model)
task_fail = self.task_fail(model)
# Specify dependencies
task_instance << [job, trigger, dag_run]
task_fail << [task_instance, dag_run]
active_dag << [
# variable,
# connection,
slot_pool,
log,
job,
dag_run,
trigger,
task_instance,
task_fail,
]
# Return the list of tables to be included in backup and restore
return [
# variable,
# connection,
slot_pool,
log,
job,
dag_run,
trigger,
task_instance,
task_fail,
active_dag,
]
Here is your metadata backup dag that will use your custom factory (also in the dags
folder):
backup.py:
from airflow import DAG
from custom_dr_factory_2_7 import CustomDRFactory_2_7
factory = CustomDRFactory_2_7(
dag_id='backup',
path_prefix='data',
storage_type='S3'
)
dag:DAG = factory.create_backup_dag()
And finally, here is your metadata restore dag (also in the dags
folder):
restore.py:
from airflow import DAG
from custom_dr_factory_2_7 import CustomDRFactory_2_7
factory = CustomDRFactory_2_7(
dag_id='restore',
path_prefix='data',
storage_type='S3'
)
dag: DAG = factory.create_restore_dag()
For additional details, please visit the project homepage.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file mwaa_dr-2.0.1.tar.gz
.
File metadata
- Download URL: mwaa_dr-2.0.1.tar.gz
- Upload date:
- Size: 38.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 30dab4b5179cbeb952fc512e97a893e7319add7bd7a168768c67e2c3d1a4938e |
|
MD5 | 0ba3ed92aa4228d4f864c9bd6104c6db |
|
BLAKE2b-256 | 80150acd4f8f11a13c1de749a7c92dd682faa07c31a440aaf23f6d444789d4d5 |
File details
Details for the file mwaa_dr-2.0.1-py3-none-any.whl
.
File metadata
- Download URL: mwaa_dr-2.0.1-py3-none-any.whl
- Upload date:
- Size: 37.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6da3203a4222d521c31edf39ad3f7256807f58156ea0d1d12e5eaa7744ed81bf |
|
MD5 | aa39f75cd8362d454a1470ee381f9d2d |
|
BLAKE2b-256 | 781769c6fff1a4be2f85c2d7cf50a775f0cb60c9c9c0ac107895bf0eeafeee2c |