Data engineering & Data science Pipeline Framework
Project description
Snowflake Extract Load Transorm framework
Why in Snowflet L comes before E? I really like the sound of Snowflet
env variable required
"PROJECT_ROOT": "${ProjectFolder}" # REQUIRED
"ACCOUNT": "gsXXXXX.west-europe.azure" # REQUIRED
"USER": "user" # REQUIRED
"PASSWORD": secret_password # REQUIRED
"DATABASE": "default_database" # OPTIONAL
"SCHEMA": "default_schema" # OPTIONAL
"WAREHOUSE": "" # OPTIONAL
"ROLE": "" # OPTIONAL
"TIMEZONE": "europe/london" # OPTIONAL
class snowflet.db.DBExecutor()
Snowflake API wrapper
Methods
validate_connection() return the snowflake version
query_exec() execute the sql query
Parameters:
- file_query: path to the query file, either this or query shall be passed, can contain {parameters}
- query: sql query to be executed, can contain {parameters}
- return_df: Defaulted to False, passed True in case of SELECT query, it returns a pandas dataframe
- kwargs: parameters in the sql are replaced with the corresponding kwarg value
""" example """
newdb = db()
newdb.query_exec(
query="create database {db}",
db=test # {db} is replaced by test in the sql query
) # database test is created
Usage
db = db() # initiate the snowflake connection using env variables
db.close() # close and dismiss the connection
class snowflet.db.PipelineExecutor()
Ad hoc tool for executing pipeline in snowflake, the tool read a yaml file which describe the pipeline steps, and provides method to either run the pipeline or test it (unit and/or uat)
Notes
All the query file shall be compliant with the follow (including CTE for mock data):
- database and schema shall be explicit i.e. "database"."schema"."table" or database.schema.table
Methods
run() execute the pipeline
clone_prod() TBD clone the prod db metadata
clone_clean() TBD removed the cloned databases
Usage
- for running the Pipeline
from snoflet import PipelineExecutor
pipeline = PipelineExecutor(
"path_to_pipeline_folder/pipeline_name.yaml") # initiate PipelineExecutor for Run
pipeline.run() # run the pipeline
- for ci-cd (testing)
from snoflet import PipelineExecutor
pipeline = PipelineExecutor(
"path_to_pipeline_folder/pipeline_name.yaml",
test=True
) # initiate PipelineExecutor for testing
pipeline.run_unit_tests() # run all unit tests in parallel
try:
pipeline.clone_prod() # copy tables' structure from prod
pipeline.run() # run the pipeline on empty tables (dry_run)
finally:
pipeline.clone_clean() # cleans the dev/test environment
YAML definition
Structure:
desc:
databases:
batches:
release:
databases
list of database referenced in the pipeline
['database1', 'database2', 'database3']
release
list of files that are executed before the execution of the pipeline
example
release:
date: "2020-05-07"
desc: "change table schema and delete a table from prod"
files:
- path_to_file1
batches
- contains the list of batches to execute
- the batches are execute in serial
- task within the batch runs in parallel
batches:
- desc: creates table structure
tasks:
- desc: creates staging tables
tasks:
- desc: creates aggregated tables
tasks:
tasks:
- desc: creates aggregated tables
tasks:
- desc: use Database
object: query_executor
args:
- file_query: path_to_file.sql
- desc: create table1
object: create_table
args:
- file: path_to_sql_query_select_file.sql
table_schema: path_to_schema_definition_file.sql
database_id: dbtest
schema_id: sctest
table_id: tbtest
mock_file: path_to_mock_file.sql
output_table_name: staging.attr_order_items_pk
type of objects
- query_executor:
it is a wrapper of snowflet.db.exec_query, same parameters
- create_table:
it is a wrapper of snowflet.db.create_table, same parameters
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file snowflet-0.0.2.tar.gz
.
File metadata
- Download URL: snowflet-0.0.2.tar.gz
- Upload date:
- Size: 12.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 10d6cd43f601a0049206fc1fe5094cd0fdc5fe55ec4680a31c1a088029c88178 |
|
MD5 | 1459d029087cc9b906ba308f7314fd3e |
|
BLAKE2b-256 | 64da8143c13f56cea531efda24569b852aba426887a4402e3eeddbca40d3d512 |
File details
Details for the file snowflet-0.0.2-py3-none-any.whl
.
File metadata
- Download URL: snowflet-0.0.2-py3-none-any.whl
- Upload date:
- Size: 12.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6c455d5fb9312b98b47a63555bada01f91749342a0800d36036f514c02511d80 |
|
MD5 | b53b0476cbd996ca5d8ae6e113089e68 |
|
BLAKE2b-256 | 0c80bc2136928cee76443930a124fe84e26f74527f883b6caf00139642505ac5 |