Macrometa target for loading data to Snowflake
Project description
macrometa-target-snowflake
This is a Macrometa compatible target connector.
Install
First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.
It's recommended to use a virtualenv:
python3 -m venv venv
pip install macrometa-target-snowflake
or
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
To run
Like any other target that's following the singer specificiation:
some-singer-tap | macrometa-target-snowflake --config [config.json]
It's reading incoming messages from STDIN and using the properites in config.json
to upload data into Snowflake.
Note: To avoid version conflicts run tap
and targets
in separate virtual environments.
Pre-requirements
You need to create a few objects in snowflake in one schema before start using this target.
- Create a named file format. This will be used by the MERGE/COPY commands to parse the files correctly from S3. You can use CSV or Parquet file formats.
To use CSV files:
CREATE FILE FORMAT {database}.{schema}.{file_format_name}
TYPE = 'CSV' ESCAPE='\\' FIELD_OPTIONALLY_ENCLOSED_BY='"';
To use Parquet files (experimental):
CREATE FILE FORMAT {database}.{schema}.{file_format_name} TYPE = 'PARQUET';
Important: Parquet files are not supported with table stages. If you want to use Parquet files then you need to have an external stage in snowflake. Please read further for more details in point 4).
- Create a Role with all the required permissions:
CREATE OR REPLACE ROLE ppw_target_snowflake;
GRANT USAGE ON DATABASE {database} TO ROLE ppw_target_snowflake;
GRANT CREATE SCHEMA ON DATABASE {database} TO ROLE ppw_target_snowflake;
GRANT USAGE ON SCHEMA {database}.{schema} TO role ppw_target_snowflake;
GRANT USAGE ON STAGE {database}.{schema}.{stage_name} TO ROLE ppw_target_snowflake;
GRANT USAGE ON FILE FORMAT {database}.{schema}.{file_format_name} TO ROLE ppw_target_snowflake;
GRANT USAGE ON WAREHOUSE {warehouse} TO ROLE ppw_target_snowflake;
Replace database
, schema
, warehouse
, stage_name
and file_format_name
between {
and }
characters to the actual values from point 1 and 2.
- Create a user and grant permission to the role:
CREATE OR REPLACE USER {user}
PASSWORD = '{password}'
DEFAULT_ROLE = ppw_target_snowflake
DEFAULT_WAREHOUSE = '{warehouse}'
MUST_CHANGE_PASSWORD = FALSE;
GRANT ROLE ppw_target_snowflake TO USER {user};
Replace warehouse
between {
and }
characters to the actual values from point 3.
- Optional external stage:
By default table stages are used to load data into snowflake tables. If you want to use external stages with s3 or s3 compatible storage engines then you need to create a STAGE object:
CREATE STAGE {database}.{schema}.{stage_name}
url='s3://{s3_bucket}'
credentials=(AWS_KEY_ID='{aws_key_id}' AWS_SECRET_KEY='{aws_secret_key}')
encryption=(MASTER_KEY='{client_side_encryption_master_key}');
GRANT USAGE ON STAGE {database}.{schema}.{stage_name} TO ROLE ppw_target_snowflake;
Notes:
- To use external stages you need to define
s3_bucket
andstage
values inconfig.json
as well. - The
encryption
option is optional and used for client side encryption. - If you want client side encryption enabled you need to define the same master key in the target
config.json
. - Instead of
credentials
you can also use storage_integration
Further details below in the Configuration settings section.
Configuration settings
Running the the target connector requires a config.json
file. Example with the minimal settings:
{
"account": "rtxxxxx.eu-central-1",
"dbname": "database_name",
"user": "my_user",
"password": "password",
"warehouse": "my_virtual_warehouse",
"file_format": "snowflake_file_format_object_name",
"target_schema": "my_target_schema"
}
Full list of options in config.json
:
Property | Type | Required? | Description |
---|---|---|---|
account | String | Yes | Snowflake account name (i.e. rtXXXXX.eu-central-1) |
dbname | String | Yes | Snowflake Database name |
user | String | Yes | Snowflake User |
password | String | Yes | Snowflake Password |
warehouse | String | Yes | Snowflake virtual warehouse name |
role | String | No | Snowflake role to use. If not defined then the user's default role will be used |
aws_access_key_id | String | No | S3 Access Key Id. If not provided, AWS_ACCESS_KEY_ID environment variable or IAM role will be used |
aws_secret_access_key | String | No | S3 Secret Access Key. If not provided, AWS_SECRET_ACCESS_KEY environment variable or IAM role will be used |
aws_session_token | String | No | AWS Session token. If not provided, AWS_SESSION_TOKEN environment variable will be used |
aws_profile | String | No | AWS profile name for profile based authentication. If not provided, AWS_PROFILE environment variable will be used. |
s3_bucket | String | No | S3 Bucket name. Required if to use S3 External stage. When this is defined then stage has to be defined as well. |
s3_key_prefix | String | No | (Default: None) A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket. |
s3_endpoint_url | String | No | The complete URL to use for the constructed client. This is allowing to use non-native s3 account. |
s3_region_name | String | No | Default region when creating new connections |
s3_acl | String | No | S3 ACL name to set on the uploaded files |
stage | String | No | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name. If not specified, table internal stage are used. When this is defined then s3_bucket has to be defined as well. |
file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. |
batch_size_rows | Integer | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. | |
batch_wait_limit_seconds | Integer | (Default: None) Maximum time to wait for batch to reach batch_size_rows . |
|
flush_all_streams | Boolean | (Default: False) Flush and load every stream into Snowflake when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems. | |
parallelism | Integer | (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max. | |
parallelism_max | Integer | (Default: 16) Max number of parallel threads to use when flushing tables. | |
target_schema | String | Name of the schema where the tables will be created, without database prefix. | |
target_schema_select_permission | String | Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific role or a list of roles. |
| disable*table_cache | Boolean | | (Default: False) By default the connector caches the available table structures in Snowflake at startup. In this way it doesn't need to run additional queries when ingesting data to check if altering the target tables is required. With disable_table_cache
option you can turn off this caching. You will always see the most recent table structures but will cause an extra query runtime. |
| client_side_encryption_master_key | String | | (Default: None) When this is defined, Client-Side Encryption is enabled. The data in S3 will be encrypted, No third parties, including Amazon AWS and any ISPs, can see data in the clear. Snowflake COPY command will decrypt the data once it's in Snowflake. The master key must be 256-bit length and must be encoded as base64 string. |
| add_metadata_columns | Boolean | | (Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in snowflake etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix \_SDC*
. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the _SDC_DELETED_AT
metadata column. Without theadd_metadata_columns
option the deleted rows from singer taps will not be recongisable in Snowflake. |
| hard_delete | Boolean | | (Default: False) Whenhard_delete
option is true then DELETE SQL commands will be performed in Snowflake to delete rows in tables. It's achieved by continuously checking the\_SDC_DELETED_AT
metadata column sent by the singer tap. Due to deleting rows requires metadata columns,hard_delete
option automatically enables theadd_metadata_columns
option as well. |
| data_flattening_max_level | Integer | | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.
When value is 0 (default) then flattening functionality is turned off. |
| primary_key_required | Boolean | | (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined. |
| validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. |
| temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. |
| no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. |
| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens{{database}}
, {{schema}}
and{{table}}
with the appropriate values. The tags are displayed in the output of the SnowflakeQUERY_HISTORY
, QUERY_HISTORY_BY_\*
functions. |
| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored inarchive_load_files_s3_bucket
under the key/{archive_load_files_s3_prefix}/{schema_name}/{table_name}/
. All archived files will have tap
, schema
, table
andarchived-by
as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys:incremental-key
, incremental-key-min
andincremental-key-max
. |
| archive_load_files_s3_prefix | String | | (Default: "archive") When archive_load_files
is enabled, the archived files will be placed in the archive S3 bucket under this prefix. |
| archive_load_files_s3_bucket | String | | (Default: Value ofs3_bucket
) When archive_load_files
is enabled, the archived files will be placed in this bucket. |
To run tests:
- Define the environment variables that are required to run the tests by creating a
.env
file intests/integration
, or by exporting the variables below.
export MACROMETA_TARGET_SNOWFLAKE_ACCOUNT=<snowflake-account-name>
export MACROMETA_TARGET_SNOWFLAKE_DBNAME=<snowflake-database-name>
export MACROMETA_TARGET_SNOWFLAKE_USER=<snowflake-user>
export MACROMETA_TARGET_SNOWFLAKE_PASSWORD=<snowflake-password>
export MACROMETA_TARGET_SNOWFLAKE_WAREHOUSE=<snowflake-warehouse>
export MACROMETA_TARGET_SNOWFLAKE_SCHEMA=<snowflake-schema>
export MACROMETA_TARGET_SNOWFLAKE_AWS_ACCESS_KEY=<aws-access-key-id>
export MACROMETA_TARGET_SNOWFLAKE_AWS_SECRET_ACCESS_KEY=<aws-access-secret-access-key>
export MACROMETA_TARGET_SNOWFLAKE_S3_ACL=<s3-target-acl>
export MACROMETA_TARGET_SNOWFLAKE_S3_BUCKET=<s3-external-bucket>
export MACROMETA_TARGET_SNOWFLAKE_S3_KEY_PREFIX=<bucket-directory>
export MACROMETA_TARGET_SNOWFLAKE_STAGE=<stage-object-with-schema-name>
export MACROMETA_TARGET_SNOWFLAKE_FILE_FORMAT_CSV=<file-format-csv-object-with-schema-name>
export MACROMETA_TARGET_SNOWFLAKE_FILE_FORMAT_PARQUET=<file-format-parquet-object-with-schema-name>
export CLIENT_SIDE_ENCRYPTION_MASTER_KEY=<client_side_encryption_master_key>
- Install python test dependencies in a virtual env and run unit and integration tests
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .[test]
- To run unit tests:
pytest tests/unit
- To run integration tests:
pytest tests/integration
To run pylint:
- Install python dependencies and run python linter
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .[test]
pylint target_snowflake
License
Apache License Version 2.0
See LICENSE to see the full text.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file macrometa-target-snowflake-1.0.0.tar.gz
.
File metadata
- Download URL: macrometa-target-snowflake-1.0.0.tar.gz
- Upload date:
- Size: 40.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d360d380d39782bbbd0bb77fe5770b1befaa3eb71d904486cdf264b26e2098c8 |
|
MD5 | fdcddd1fdffe96589470b4e50a36dcfd |
|
BLAKE2b-256 | 8ea2f0bce33bbb368e610ea0b0fb166ed6e377f128d99d55ab7b63a02e4d636e |
File details
Details for the file macrometa_target_snowflake-1.0.0-py3-none-any.whl
.
File metadata
- Download URL: macrometa_target_snowflake-1.0.0-py3-none-any.whl
- Upload date:
- Size: 41.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8a6a2e4e8b6f86815f5d7d5bb88cd79f9e9701119c63ed53462244cd932128bc |
|
MD5 | e338291a17bb11cd210e5ec9499a2690 |
|
BLAKE2b-256 | 180146eebfe3fd98a9a1d6d829376f942edafec1c92a045238a56aed1a6b1ddd |