Skip to main content

Package for airflow operator which copies tables from oracle database to snowflake.

Project description

airflow-oracle-snowflake-plugin

Steps to use the OracleToSnowflake from the plugin

  1. Install the plugin by pip install airflow-oracle-snowflake-plugin. You can put airflow-oracle-snowflake-plugin in the requirements.txt file for CI/CD operations. This plugin will also install the following dependencies if not already satisfied:

    • oracledb
    • apache-airflow-providers-oracle
    • apache-airflow-providers-snowflake
  2. Create config.py inside dags/table_config directory. This file will include the necessary information about the source and destination database table specifications. It will have the structure as follows:

  CONFIG = [
    {
        'source_schema': 'ADMIN',
        'source_table': 'CUSTOMERS',
        'destination_schema': 'PUBLIC',
        'destination_table': 'CUSTOMERS',
        'columns': [
            ('ID', 'varchar'),
            ('FULL_NAME', 'varchar'),
            ('ADDRESS', 'varchar'),
            ('EMAIL', 'varchar'),
            ('PHONE_NUMBER', 'varchar'),
        ]
    },
]
  1. Import the operator, sql_utils and the config in your DAG python file by including the following statements:
from airflow_oracle_snowflake_plugin.oracle_to_snowflake_operator import OracleToSnowflake
import airflow_oracle_snowflake_plugin.utils.sql_utils as sql_utils
from table_config.config import CONFIG
  1. Implement a for loop to iterate over all the table configurations and create DAG tasks using the operator as follows:
for config in CONFIG:
    create_table_statement = sql_utils.get_create_statement(
        table_name=config.get('destination_table'),
        columns_definition=config.get('columns')
    )
    create_table_if_not_exists = SnowflakeOperator(
        task_id='create_{}'.format(config.get('destination_table')),
        snowflake_conn_id='SNOWFLAKE',
        sql=create_table_statement,
        warehouse='LANDING',
        database='LANDING_DEV',
        role='ACCOUNTADMIN',
        schema=config.get('destination_schema'),
        dag=dag
    )

    fill_table_statement = sql_utils.get_select_statement(
        table_name=config.get('source_table'),
        schema_name=config.get('source_schema'),
        columns_definition=config.get('columns'),
        sql_server_syntax=False
    )

    oracle_to_snowflake_operator = OracleToSnowflake(
        task_id = 'recreate_{}'.format(config.get('destination_table')),
        dag = dag,
        warehouse='LANDING',
        database='LANDING_DEV',
        role='ACCOUNTADMIN',
        schema='PUBLIC',
        source_schema=config.get('source_schema'),
        source_table=config.get('source_table'),
        destination_schema=config.get('destination_schema'),
        destination_table=config.get('destination_table'),
        fill_table_statement=fill_table_statement,
        snowflake_conn_id='SNOWFLAKE',
        oracle_conn_id='ORACLE',
        recreate_table=True
    )
    create_table_if_not_exists >> oracle_to_snowflake_operator

This script will create two tasks for each table in Oracle database that you want to migrate. This will be determined by the CONFIG array in config.py.

First Task

First task creates the table in the Snowflake database if it doesn't exist already using the SnowflakeOperator. It requires:

  • An existing airflow connection to your Snowflake account
  • Name of the warehouse to use ('LANDING' in the example above)
  • Name of the database to use ('LANDING_DEV' in the example above)
  • Name of the role to use ('ACCOUNTADMIN' in the example above).
  • It takes an SQL statement which we have provided as the create_table_statement generated by the sql_utils.get_create_statement method. The method uses CONFIG and extracts the table name, columns, and their data types.

Second Task

The second task uses the OracleToSnowflake operator from the plugin. It creates a temporary csv file after selecting the rows from the source table, uploads it to a Snowflake stage, and finally uploads it to the destination table in Snowflake. It requires:

  • An existing airflow connection id to your Snowflake account as well as your Oracle database instance. The connection IDs will default to SNOWFLAKE and ORACLE if not provided.
  • Inside the operator, a custom Snowflake hook is used which will upload the csv file to a Snowflake table. This hook requires:
    • Name of the warehouse to use (defaults to 'LANDING' if not provided)
    • Name of the database to use (defaults to'LANDING_DEV' if not provided)
    • Name of the role to use (defaults to 'ACCOUNTADMIN' if not provided).
  • It takes an SQL statement which we have provided as the fill_table_statement generated by the sql_utils.get_select_statement method. The method uses CONFIG and extracts the table name, schema, and the columns.

Note

Added tags to facilitate version releasing and CI/CD operations

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_oracle_snowflake_plugin-0.1.0.tar.gz (5.3 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file airflow_oracle_snowflake_plugin-0.1.0.tar.gz.

File metadata

File hashes

Hashes for airflow_oracle_snowflake_plugin-0.1.0.tar.gz
Algorithm Hash digest
SHA256 1ee9d4dec9919a3c39784359cf1763b6448eef2e6778f7946c5c48c164ec281e
MD5 e83508361d0326eb6df53aca7130aaf4
BLAKE2b-256 582b7f00e3cd3692d7056e71b1e07acaf6ff8eb3f9e134b65f27a0cdb9b46ef7

See more details on using hashes here.

File details

Details for the file airflow_oracle_snowflake_plugin-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_oracle_snowflake_plugin-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 856ec21d61847a2aeb156dc168327c956c01b76a6095da62f0dced038458dc86
MD5 8a809bc71b6fdc28ba1d2b6e466cb6ed
BLAKE2b-256 a04a0a640ddf0512522c3b53fbe8d8b8f00f214af4afd457f58df40a50c43131

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page