Skip to main content

Packag for airflow operator which copies tables from oracle database to snowflake.

Project description

airflow-oracle-snowflake-plugin

Steps to use the OracleToSnowflake from the plugin

  1. Install the plugin by pip install airflow-oracle-snowflake-plugin. You can put airflow-oracle-snowflake-plugin in the requirements.txt file for CI/CD operations. This plugin will also install the following dependencies if not already satisfied:

    • oracledb
    • apache-airflow-providers-oracle
    • apache-airflow-providers-snowflake
  2. Create config.py inside dags/table_config directory. This file will include the necessary information about the source and destination database table specifications. It will have the structure as follows:

  CONFIG = [
    {
        'source_schema': 'ADMIN',
        'source_table': 'CUSTOMERS',
        'destination_schema': 'PUBLIC',
        'destination_table': 'CUSTOMERS',
        'columns': [
            ('ID', 'varchar'),
            ('FULL_NAME', 'varchar'),
            ('ADDRESS', 'varchar'),
            ('EMAIL', 'varchar'),
            ('PHONE_NUMBER', 'varchar'),
        ]
    },
]
  1. Import the operator, sql_utils and the config in your DAG python file by including the following statements:
from airflow_oracle_snowflake_plugin.oracle_to_snowflake_operator import OracleToSnowflake
import airflow_oracle_snowflake_plugin.utils.sql_utils as sql_utils
from table_config.config import CONFIG
  1. Implement a for loop to iterate over all the table configurations and create DAG tasks using the operator as follows:
for config in CONFIG:
    create_table_statement = sql_utils.get_create_statement(
        table_name=config.get('destination_table'),
        columns_definition=config.get('columns')
    )
    create_table_if_not_exists = SnowflakeOperator(
        task_id='create_{}'.format(config.get('destination_table')),
        snowflake_conn_id='SNOWFLAKE',
        sql=create_table_statement,
        warehouse='LANDING',
        database='LANDING_DEV',
        role='ACCOUNTADMIN',
        schema=config.get('destination_schema'),
        dag=dag
    )

    fill_table_statement = sql_utils.get_select_statement(
        table_name=config.get('source_table'),
        schema_name=config.get('source_schema'),
        columns_definition=config.get('columns'),
        sql_server_syntax=False
    )

    oracle_to_snowflake_operator = OracleToSnowflake(
        task_id = 'recreate_{}'.format(config.get('destination_table')),
        dag = dag,
        warehouse='LANDING',
        database='LANDING_DEV',
        role='ACCOUNTADMIN',
        schema='PUBLIC',
        source_schema=config.get('source_schema'),
        source_table=config.get('source_table'),
        destination_schema=config.get('destination_schema'),
        destination_table=config.get('destination_table'),
        fill_table_statement=fill_table_statement,
        snowflake_conn_id='SNOWFLAKE',
        oracle_conn_id='ORACLE',
        recreate_table=True
    )
    create_table_if_not_exists >> oracle_to_snowflake_operator

This script will create two tasks for each table in Oracle database that you want to migrate. This will be determined by the CONFIG array in config.py.

First Task

First task creates the table in the Snowflake database if it doesn't exist already using the SnowflakeOperator. It requires:

  • An existing airflow connection to your Snowflake account
  • Name of the warehouse to use ('LANDING' in the example above)
  • Name of the database to use ('LANDING_DEV' in the example above)
  • Name of the role to use ('ACCOUNTADMIN' in the example above).
  • It takes an SQL statement which we have provided as the create_table_statement generated by the sql_utils.get_create_statement method. The method uses CONFIG and extracts the table name, columns, and their data types.

Second Task

The second task uses the OracleToSnowflake operator from the plugin. It creates a temporary csv file after selecting the rows from the source table, uploads it to a Snowflake stage, and finally uploads it to the destination table in Snowflake. It requires:

  • An existing airflow connection id to your Snowflake account as well as your Oracle database instance. The connection IDs will default to SNOWFLAKE and ORACLE if not provided.
  • Inside the operator, a custom Snowflake hook is used which will upload the csv file to a Snowflake table. This hook requires:
    • Name of the warehouse to use (defaults to 'LANDING' if not provided)
    • Name of the database to use (defaults to'LANDING_DEV' if not provided)
    • Name of the role to use (defaults to 'ACCOUNTADMIN' if not provided).
  • It takes an SQL statement which we have provided as the fill_table_statement generated by the sql_utils.get_select_statement method. The method uses CONFIG and extracts the table name, schema, and the columns.

Note

Added tags to facilitate version releasing and CI/CD operations

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_oracle_snowflake_plugin-0.1.2.tar.gz (5.5 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file airflow_oracle_snowflake_plugin-0.1.2.tar.gz.

File metadata

File hashes

Hashes for airflow_oracle_snowflake_plugin-0.1.2.tar.gz
Algorithm Hash digest
SHA256 10b4447a91f753a65ad8fe611821cb3c11ac862c1afc95326a35854f2dd8f030
MD5 cba8e7e0a5bfaca4b2d594284834b049
BLAKE2b-256 012d2e667ac533ebdf84eb8f48051e3b56356f51e7b53c67becbd8f1f56bc06f

See more details on using hashes here.

File details

Details for the file airflow_oracle_snowflake_plugin-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_oracle_snowflake_plugin-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3be725d989130581541258f35dc3c26fc3012e3d5fdbfd1259d68b01f337ccd2
MD5 c85148ddd6aa314297de11555f9f3a4f
BLAKE2b-256 d52736e95ba49a1ee6b2d015313714b907c9bc0cd45ad03b77ab8aad938fc98d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page