Skip to main content

Data engineering & Data science Pipeline Framework

Project description

Snowflake Extract Load Transorm framework

Why in Snowflet L comes before E? I really like the sound of Snowflet

env variable required

"PROJECT_ROOT": "${ProjectFolder}"             # REQUIRED
"ACCOUNT":  "gsXXXXX.west-europe.azure"        # REQUIRED
"USER": "user"                                 # REQUIRED
"PASSWORD": secret_password                    # REQUIRED
"DATABASE": "default_database"                 # OPTIONAL
"SCHEMA": "default_schema"                     # OPTIONAL
"WAREHOUSE": ""                                # OPTIONAL
"ROLE": ""                                     # OPTIONAL
"TIMEZONE": "europe/london"                    # OPTIONAL

class snowflet.db.DBExecutor()

Snowflake API wrapper

Methods

validate_connection() return the snowflake version

query_exec() execute the sql query

Parameters:

  • file_query: path to the query file, either this or query shall be passed, can contain {parameters}
  • query: sql query to be executed, can contain {parameters}
  • return_df: Defaulted to False, passed True in case of SELECT query, it returns a pandas dataframe
  • kwargs: parameters in the sql are replaced with the corresponding kwarg value
    """ example """
    newdb = db()
    newdb.query_exec(
            query="create database {db}",
            db=test     #  {db} is replaced by test in the sql query        
        ) # database test is created

Usage

db = db() # initiate the snowflake connection using env variables
db.close() # close and dismiss the connection

class snowflet.db.PipelineExecutor()

Ad hoc tool for executing pipeline in snowflake, the tool read a yaml file which describe the pipeline steps, and provides method to either run the pipeline or test it (unit and/or uat)

Notes

All the query file shall be compliant with the follow (including CTE for mock data):

  • database and schema shall be explicit i.e. "database"."schema"."table" or database.schema.table

Methods

run() execute the pipeline

clone_prod() TBD clone the prod db metadata

clone_clean() TBD removed the cloned databases

Usage

  • for running the Pipeline
from snoflet import PipelineExecutor
pipeline = PipelineExecutor(
    "path_to_pipeline_folder/pipeline_name.yaml")     # initiate PipelineExecutor for Run
pipeline.run()                                        # run the pipeline
  • for ci-cd (testing)
from snoflet import PipelineExecutor
pipeline = PipelineExecutor(
    "path_to_pipeline_folder/pipeline_name.yaml", 
    test=True
    )                                                 # initiate PipelineExecutor for testing
pipeline.run_unit_tests()                             # run all unit tests in parallel
try:
    pipeline.clone_prod()                    # copy tables' structure from prod
    pipeline.run()                                    # run the pipeline on empty tables (dry_run)
finally:
    pipeline.clone_clean()                          # cleans the dev/test environment

YAML definition

Structure:

desc: 
databases: 
batches:    
release:

databases

list of database referenced in the pipeline

['database1', 'database2', 'database3']

release

list of files that are executed before the execution of the pipeline

example

release:
  date: "2020-05-07"
  desc: "change table schema and delete a table from prod"
  files:
    - path_to_file1

batches

  • contains the list of batches to execute
  • the batches are execute in serial
  • task within the batch runs in parallel
batches:
-   desc: creates table structure
    tasks:
-   desc: creates staging tables
    tasks:
-   desc: creates aggregated tables
    tasks:

tasks:

-   desc: creates aggregated tables
    tasks:
    -   desc: use Database
        object: query_executor
        args:
        -   file_query: path_to_file.sql
    -   desc: create table1
        object: create_table
        args:
        -   file: path_to_sql_query_select_file.sql
            table_schema: path_to_schema_definition_file.sql
            database_id: dbtest
            schema_id: sctest
            table_id: tbtest
            mock_file: path_to_mock_file.sql
            output_table_name: staging.attr_order_items_pk 

type of objects

  • query_executor:

it is a wrapper of snowflet.db.exec_query, same parameters

  • create_table:

it is a wrapper of snowflet.db.create_table, same parameters

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snowflet-0.0.2.tar.gz (12.1 kB view details)

Uploaded Source

Built Distribution

snowflet-0.0.2-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file snowflet-0.0.2.tar.gz.

File metadata

  • Download URL: snowflet-0.0.2.tar.gz
  • Upload date:
  • Size: 12.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3

File hashes

Hashes for snowflet-0.0.2.tar.gz
Algorithm Hash digest
SHA256 10d6cd43f601a0049206fc1fe5094cd0fdc5fe55ec4680a31c1a088029c88178
MD5 1459d029087cc9b906ba308f7314fd3e
BLAKE2b-256 64da8143c13f56cea531efda24569b852aba426887a4402e3eeddbca40d3d512

See more details on using hashes here.

File details

Details for the file snowflet-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: snowflet-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3

File hashes

Hashes for snowflet-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6c455d5fb9312b98b47a63555bada01f91749342a0800d36036f514c02511d80
MD5 b53b0476cbd996ca5d8ae6e113089e68
BLAKE2b-256 0c80bc2136928cee76443930a124fe84e26f74527f883b6caf00139642505ac5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page