Skip to main content

Packag for airflow operator which copies tables from oracle database to snowflake.

Project description

airflow-oracle-snowflake-plugin

Steps to use the OracleToSnowflake from the plugin

  1. Install the plugin by pip install airflow-oracle-snowflake-plugin. You can put airflow-oracle-snowflake-plugin in the requirements.txt file for CI/CD operations. This plugin will also install the following dependencies if not already satisfied:

    • oracledb
    • apache-airflow-providers-oracle
    • apache-airflow-providers-snowflake
  2. Create config.py inside dags/table_config directory. This file will include the necessary information about the source and destination database table specifications. It will have the structure as follows:

  CONFIG = [
    {
        'source_schema': 'ADMIN',
        'source_table': 'CUSTOMERS',
        'destination_schema': 'PUBLIC',
        'destination_table': 'CUSTOMERS',
        'columns': [
            ('ID', 'varchar'),
            ('FULL_NAME', 'varchar'),
            ('ADDRESS', 'varchar'),
            ('EMAIL', 'varchar'),
            ('PHONE_NUMBER', 'varchar'),
        ]
    },
]
  1. Import the operator, sql_utils and the config in your DAG python file by including the following statements:
from airflow_oracle_snowflake_plugin.oracle_to_snowflake_operator import OracleToSnowflake
import airflow_oracle_snowflake_plugin.utils.sql_utils as sql_utils
from table_config.config import CONFIG
  1. Implement a for loop to iterate over all the table configurations and create DAG tasks using the operator as follows:
for config in CONFIG:
    create_table_statement = sql_utils.get_create_statement(
        table_name=config.get('destination_table'),
        columns_definition=config.get('columns')
    )
    create_table_if_not_exists = SnowflakeOperator(
        task_id='create_{}'.format(config.get('destination_table')),
        snowflake_conn_id='SNOWFLAKE',
        sql=create_table_statement,
        warehouse='LANDING',
        database='LANDING_DEV',
        role='ACCOUNTADMIN',
        schema=config.get('destination_schema'),
        dag=dag
    )

    fill_table_statement = sql_utils.get_select_statement(
        table_name=config.get('source_table'),
        schema_name=config.get('source_schema'),
        columns_definition=config.get('columns'),
        sql_server_syntax=False
    )

    oracle_to_snowflake_operator = OracleToSnowflake(
        task_id = 'recreate_{}'.format(config.get('destination_table')),
        dag = dag,
        warehouse='LANDING',
        database='LANDING_DEV',
        role='ACCOUNTADMIN',
        schema='PUBLIC',
        source_schema=config.get('source_schema'),
        source_table=config.get('source_table'),
        destination_schema=config.get('destination_schema'),
        destination_table=config.get('destination_table'),
        fill_table_statement=fill_table_statement,
        snowflake_conn_id='SNOWFLAKE',
        oracle_conn_id='ORACLE',
        recreate_table=True
    )
    create_table_if_not_exists >> oracle_to_snowflake_operator

This script will create two tasks for each table in Oracle database that you want to migrate. This will be determined by the CONFIG array in config.py.

First Task

First task creates the table in the Snowflake database if it doesn't exist already using the SnowflakeOperator. It requires:

  • An existing airflow connection to your Snowflake account
  • Name of the warehouse to use ('LANDING' in the example above)
  • Name of the database to use ('LANDING_DEV' in the example above)
  • Name of the role to use ('ACCOUNTADMIN' in the example above).
  • It takes an SQL statement which we have provided as the create_table_statement generated by the sql_utils.get_create_statement method. The method uses CONFIG and extracts the table name, columns, and their data types.

Second Task

The second task uses the OracleToSnowflake operator from the plugin. It creates a temporary csv file after selecting the rows from the source table, uploads it to a Snowflake stage, and finally uploads it to the destination table in Snowflake. It requires:

  • An existing airflow connection id to your Snowflake account as well as your Oracle database instance. The connection IDs will default to SNOWFLAKE and ORACLE if not provided.
  • Inside the operator, a custom Snowflake hook is used which will upload the csv file to a Snowflake table. This hook requires:
    • Name of the warehouse to use (defaults to 'LANDING' if not provided)
    • Name of the database to use (defaults to'LANDING_DEV' if not provided)
    • Name of the role to use (defaults to 'ACCOUNTADMIN' if not provided).
  • It takes an SQL statement which we have provided as the fill_table_statement generated by the sql_utils.get_select_statement method. The method uses CONFIG and extracts the table name, schema, and the columns.

Note

Added tags to facilitate version releasing and CI/CD operations

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_oracle_snowflake_plugin-0.1.2.tar.gz (5.5 kB view hashes)

Uploaded Source

Built Distribution

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page