Package for airflow operator which copies tables from oracle database to snowflake.
Project description
airflow-oracle-snowflake-plugin
Steps to use the OracleToSnowflake from the plugin
-
Install the plugin by
pip install airflow-oracle-snowflake-plugin
. You can putairflow-oracle-snowflake-plugin
in the requirements.txt file for CI/CD operations. This plugin will also install the following dependencies if not already satisfied:- oracledb
- apache-airflow-providers-oracle
- apache-airflow-providers-snowflake
-
Create
config.py
insidedags/table_config
directory. This file will include the necessary information about the source and destination database table specifications. It will have the structure as follows:
CONFIG = [
{
'source_schema': 'ADMIN',
'source_table': 'CUSTOMERS',
'destination_schema': 'PUBLIC',
'destination_table': 'CUSTOMERS',
'columns': [
('ID', 'varchar'),
('FULL_NAME', 'varchar'),
('ADDRESS', 'varchar'),
('EMAIL', 'varchar'),
('PHONE_NUMBER', 'varchar'),
]
},
]
- Import the operator, sql_utils and the config in your DAG python file by including the following statements:
from airflow_oracle_snowflake_plugin.oracle_to_snowflake_operator import OracleToSnowflake
import airflow_oracle_snowflake_plugin.utils.sql_utils as sql_utils
from table_config.config import CONFIG
- Implement a for loop to iterate over all the table configurations and create DAG tasks using the operator as follows:
for config in CONFIG:
create_table_statement = sql_utils.get_create_statement(
table_name=config.get('destination_table'),
columns_definition=config.get('columns')
)
create_table_if_not_exists = SnowflakeOperator(
task_id='create_{}'.format(config.get('destination_table')),
snowflake_conn_id='SNOWFLAKE',
sql=create_table_statement,
warehouse='LANDING',
database='LANDING_DEV',
role='ACCOUNTADMIN',
schema=config.get('destination_schema'),
dag=dag
)
fill_table_statement = sql_utils.get_select_statement(
table_name=config.get('source_table'),
schema_name=config.get('source_schema'),
columns_definition=config.get('columns'),
sql_server_syntax=False
)
oracle_to_snowflake_operator = OracleToSnowflake(
task_id = 'recreate_{}'.format(config.get('destination_table')),
dag = dag,
warehouse='LANDING',
database='LANDING_DEV',
role='ACCOUNTADMIN',
schema='PUBLIC',
source_schema=config.get('source_schema'),
source_table=config.get('source_table'),
destination_schema=config.get('destination_schema'),
destination_table=config.get('destination_table'),
fill_table_statement=fill_table_statement,
snowflake_conn_id='SNOWFLAKE',
oracle_conn_id='ORACLE',
recreate_table=True
)
create_table_if_not_exists >> oracle_to_snowflake_operator
This script will create two tasks for each table in Oracle database that you want to migrate. This will be determined by the CONFIG
array in config.py
.
First Task
First task creates the table in the Snowflake database if it doesn't exist already using the SnowflakeOperator. It requires:
- An existing airflow connection to your Snowflake account
- Name of the warehouse to use ('LANDING' in the example above)
- Name of the database to use ('LANDING_DEV' in the example above)
- Name of the role to use ('ACCOUNTADMIN' in the example above).
- It takes an SQL statement which we have provided as the
create_table_statement
generated by thesql_utils.get_create_statement
method. The method usesCONFIG
and extracts the table name, columns, and their data types.
Second Task
The second task uses the OracleToSnowflake
operator from the plugin. It creates a temporary csv file after selecting the rows from the source table, uploads it to a Snowflake stage, and finally uploads it to the destination table in Snowflake. It requires:
- An existing airflow connection id to your Snowflake account as well as your Oracle database instance. The connection IDs will default to
SNOWFLAKE
andORACLE
if not provided. - Inside the operator, a custom Snowflake hook is used which will upload the csv file to a Snowflake table. This hook requires:
- Name of the warehouse to use (defaults to 'LANDING' if not provided)
- Name of the database to use (defaults to'LANDING_DEV' if not provided)
- Name of the role to use (defaults to 'ACCOUNTADMIN' if not provided).
- It takes an SQL statement which we have provided as the
fill_table_statement
generated by thesql_utils.get_select_statement
method. The method usesCONFIG
and extracts the table name, schema, and the columns.
Note
Added tags to facilitate version releasing and CI/CD operations
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file airflow_oracle_snowflake_plugin-0.1.0.tar.gz
.
File metadata
- Download URL: airflow_oracle_snowflake_plugin-0.1.0.tar.gz
- Upload date:
- Size: 5.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.2 Linux/5.15.0-1022-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1ee9d4dec9919a3c39784359cf1763b6448eef2e6778f7946c5c48c164ec281e |
|
MD5 | e83508361d0326eb6df53aca7130aaf4 |
|
BLAKE2b-256 | 582b7f00e3cd3692d7056e71b1e07acaf6ff8eb3f9e134b65f27a0cdb9b46ef7 |
File details
Details for the file airflow_oracle_snowflake_plugin-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: airflow_oracle_snowflake_plugin-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.2 Linux/5.15.0-1022-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 856ec21d61847a2aeb156dc168327c956c01b76a6095da62f0dced038458dc86 |
|
MD5 | 8a809bc71b6fdc28ba1d2b6e466cb6ed |
|
BLAKE2b-256 | a04a0a640ddf0512522c3b53fbe8d8b8f00f214af4afd457f58df40a50c43131 |