Packag for airflow operator which copies tables from oracle database to snowflake.
Project description
airflow-oracle-snowflake-plugin
Steps to use the OracleToSnowflake from the plugin
-
Install the plugin by
pip install airflow-oracle-snowflake-plugin
. You can putairflow-oracle-snowflake-plugin
in the requirements.txt file for CI/CD operations. This plugin will also install the following dependencies if not already satisfied:- oracledb
- apache-airflow-providers-oracle
- apache-airflow-providers-snowflake
-
Create
config.py
insidedags/table_config
directory. This file will include the necessary information about the source and destination database table specifications. It will have the structure as follows:
CONFIG = [
{
'source_schema': 'ADMIN',
'source_table': 'CUSTOMERS',
'destination_schema': 'PUBLIC',
'destination_table': 'CUSTOMERS',
'columns': [
('ID', 'varchar'),
('FULL_NAME', 'varchar'),
('ADDRESS', 'varchar'),
('EMAIL', 'varchar'),
('PHONE_NUMBER', 'varchar'),
]
},
]
- Import the operator, sql_utils and the config in your DAG python file by including the following statements:
from airflow_oracle_snowflake_plugin.oracle_to_snowflake_operator import OracleToSnowflake
import airflow_oracle_snowflake_plugin.utils.sql_utils as sql_utils
from table_config.config import CONFIG
- Implement a for loop to iterate over all the table configurations and create DAG tasks using the operator as follows:
for config in CONFIG:
create_table_statement = sql_utils.get_create_statement(
table_name=config.get('destination_table'),
columns_definition=config.get('columns')
)
create_table_if_not_exists = SnowflakeOperator(
task_id='create_{}'.format(config.get('destination_table')),
snowflake_conn_id='SNOWFLAKE',
sql=create_table_statement,
warehouse='LANDING',
database='LANDING_DEV',
role='ACCOUNTADMIN',
schema=config.get('destination_schema'),
dag=dag
)
fill_table_statement = sql_utils.get_select_statement(
table_name=config.get('source_table'),
schema_name=config.get('source_schema'),
columns_definition=config.get('columns'),
sql_server_syntax=False
)
oracle_to_snowflake_operator = OracleToSnowflake(
task_id = 'recreate_{}'.format(config.get('destination_table')),
dag = dag,
warehouse='LANDING',
database='LANDING_DEV',
role='ACCOUNTADMIN',
schema='PUBLIC',
source_schema=config.get('source_schema'),
source_table=config.get('source_table'),
destination_schema=config.get('destination_schema'),
destination_table=config.get('destination_table'),
fill_table_statement=fill_table_statement,
snowflake_conn_id='SNOWFLAKE',
oracle_conn_id='ORACLE',
recreate_table=True
)
create_table_if_not_exists >> oracle_to_snowflake_operator
This script will create two tasks for each table in Oracle database that you want to migrate. This will be determined by the CONFIG
array in config.py
.
First Task
First task creates the table in the Snowflake database if it doesn't exist already using the SnowflakeOperator. It requires:
- An existing airflow connection to your Snowflake account
- Name of the warehouse to use ('LANDING' in the example above)
- Name of the database to use ('LANDING_DEV' in the example above)
- Name of the role to use ('ACCOUNTADMIN' in the example above).
- It takes an SQL statement which we have provided as the
create_table_statement
generated by thesql_utils.get_create_statement
method. The method usesCONFIG
and extracts the table name, columns, and their data types.
Second Task
The second task uses the OracleToSnowflake
operator from the plugin. It creates a temporary csv file after selecting the rows from the source table, uploads it to a Snowflake stage, and finally uploads it to the destination table in Snowflake. It requires:
- An existing airflow connection id to your Snowflake account as well as your Oracle database instance. The connection IDs will default to
SNOWFLAKE
andORACLE
if not provided. - Inside the operator, a custom Snowflake hook is used which will upload the csv file to a Snowflake table. This hook requires:
- Name of the warehouse to use (defaults to 'LANDING' if not provided)
- Name of the database to use (defaults to'LANDING_DEV' if not provided)
- Name of the role to use (defaults to 'ACCOUNTADMIN' if not provided).
- It takes an SQL statement which we have provided as the
fill_table_statement
generated by thesql_utils.get_select_statement
method. The method usesCONFIG
and extracts the table name, schema, and the columns.
Note
Added tags to facilitate version releasing and CI/CD operations
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for airflow_oracle_snowflake_plugin-0.1.2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 10b4447a91f753a65ad8fe611821cb3c11ac862c1afc95326a35854f2dd8f030 |
|
MD5 | cba8e7e0a5bfaca4b2d594284834b049 |
|
BLAKE2b-256 | 012d2e667ac533ebdf84eb8f48051e3b56356f51e7b53c67becbd8f1f56bc06f |
Hashes for airflow_oracle_snowflake_plugin-0.1.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3be725d989130581541258f35dc3c26fc3012e3d5fdbfd1259d68b01f337ccd2 |
|
MD5 | c85148ddd6aa314297de11555f9f3a4f |
|
BLAKE2b-256 | d52736e95ba49a1ee6b2d015313714b907c9bc0cd45ad03b77ab8aad938fc98d |