Skip to main content

Package for airflow operator which copies tables from oracle database to snowflake.

Project description

airflow-oracle-snowflake-plugin

Steps to use the OracleToSnowflake from the plugin

  1. Install the plugin by pip install airflow-oracle-snowflake-plugin. You can put airflow-oracle-snowflake-plugin in the requirements.txt file for CI/CD operations. This plugin will also install the following dependencies if not already satisfied:

    • oracledb
    • apache-airflow-providers-oracle
    • apache-airflow-providers-snowflake
  2. Create config.py inside dags/table_config directory. This file will include the necessary information about the source and destination database table specifications. It will have the structure as follows:

  CONFIG = [
    {
        'source_schema': 'ADMIN',
        'source_table': 'CUSTOMERS',
        'destination_schema': 'PUBLIC',
        'destination_table': 'CUSTOMERS',
        'columns': [
            ('ID', 'varchar'),
            ('FULL_NAME', 'varchar'),
            ('ADDRESS', 'varchar'),
            ('EMAIL', 'varchar'),
            ('PHONE_NUMBER', 'varchar'),
        ]
    },
]
  1. Import the operator, sql_utils and the config in your DAG python file by including the following statements:
from airflow_oracle_snowflake_plugin.oracle_to_snowflake_operator import OracleToSnowflake
import airflow_oracle_snowflake_plugin.utils.sql_utils as sql_utils
from table_config.config import CONFIG
  1. Implement a for loop to iterate over all the table configurations and create DAG tasks using the operator as follows:
for config in CONFIG:
    create_table_statement = sql_utils.get_create_statement(
        table_name=config.get('destination_table'),
        columns_definition=config.get('columns')
    )
    create_table_if_not_exists = SnowflakeOperator(
        task_id='create_{}'.format(config.get('destination_table')),
        snowflake_conn_id='SNOWFLAKE',
        sql=create_table_statement,
        warehouse='LANDING',
        database='LANDING_DEV',
        role='ACCOUNTADMIN',
        schema=config.get('destination_schema'),
        dag=dag
    )

    fill_table_statement = sql_utils.get_select_statement(
        table_name=config.get('source_table'),
        schema_name=config.get('source_schema'),
        columns_definition=config.get('columns'),
        sql_server_syntax=False
    )

    oracle_to_snowflake_operator = OracleToSnowflake(
        task_id = 'recreate_{}'.format(config.get('destination_table')),
        dag = dag,
        warehouse='LANDING',
        database='LANDING_DEV',
        role='ACCOUNTADMIN',
        schema='PUBLIC',
        source_schema=config.get('source_schema'),
        source_table=config.get('source_table'),
        destination_schema=config.get('destination_schema'),
        destination_table=config.get('destination_table'),
        fill_table_statement=fill_table_statement,
        snowflake_conn_id='SNOWFLAKE',
        oracle_conn_id='ORACLE',
        recreate_table=True
    )
    create_table_if_not_exists >> oracle_to_snowflake_operator

This script will create two tasks for each table in Oracle database that you want to migrate. This will be determined by the CONFIG array in config.py.

First Task

First task creates the table in the Snowflake database if it doesn't exist already using the SnowflakeOperator. It requires:

  • An existing airflow connection to your Snowflake account
  • Name of the warehouse to use ('LANDING' in the example above)
  • Name of the database to use ('LANDING_DEV' in the example above)
  • Name of the role to use ('ACCOUNTADMIN' in the example above).
  • It takes an SQL statement which we have provided as the create_table_statement generated by the sql_utils.get_create_statement method. The method uses CONFIG and extracts the table name, columns, and their data types.

Second Task

The second task uses the OracleToSnowflake operator from the plugin. It creates a temporary csv file after selecting the rows from the source table, uploads it to a Snowflake stage, and finally uploads it to the destination table in Snowflake. It requires:

  • An existing airflow connection id to your Snowflake account as well as your Oracle database instance. The connection IDs will default to SNOWFLAKE and ORACLE if not provided.
  • Inside the operator, a custom Snowflake hook is used which will upload the csv file to a Snowflake table. This hook requires:
    • Name of the warehouse to use (defaults to 'LANDING' if not provided)
    • Name of the database to use (defaults to'LANDING_DEV' if not provided)
    • Name of the role to use (defaults to 'ACCOUNTADMIN' if not provided).
  • It takes an SQL statement which we have provided as the fill_table_statement generated by the sql_utils.get_select_statement method. The method uses CONFIG and extracts the table name, schema, and the columns.

Note

Added tags to facilitate version releasing and CI/CD operations

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_oracle_snowflake_plugin-0.1.1.tar.gz (5.4 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file airflow_oracle_snowflake_plugin-0.1.1.tar.gz.

File metadata

File hashes

Hashes for airflow_oracle_snowflake_plugin-0.1.1.tar.gz
Algorithm Hash digest
SHA256 ebffd04746993701f8f6adf8ace4809740e049b4ef9085009f6a673e397659db
MD5 6f8f097037cbb16daab09a4b0146f168
BLAKE2b-256 09fbb4c552a6fb519afbb694e80df9a370176652d947e7ae0dbf7dc38485d01f

See more details on using hashes here.

File details

Details for the file airflow_oracle_snowflake_plugin-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_oracle_snowflake_plugin-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8442c52fbe0203de8d73f23c79375c45e8e52be7272922860ab6388e3d208317
MD5 7396d1a7643f9ec6442f3111ad877e63
BLAKE2b-256 977dbbc1a7bd69d0a82390028a8461d0671b6c95e6c16e546f97c2a280687991

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page