Snowflake extractor for mkpipe.
Project description
mkpipe-extractor-snowflake
Snowflake extractor plugin for MkPipe. Reads Snowflake tables using the native Snowflake Spark connector (spark-snowflake), which transfers data via internal staging (S3/Azure/GCS) — significantly faster than JDBC for large datasets.
Documentation
For more detailed documentation, please visit the GitHub repository.
License
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
Connection Configuration
connections:
snowflake_source:
variant: snowflake
host: myaccount.snowflakecomputing.com
port: 443
database: MY_DATABASE
schema: MY_SCHEMA
user: myuser
password: mypassword
warehouse: MY_WAREHOUSE
With RSA key pair authentication:
connections:
snowflake_source:
variant: snowflake
host: myaccount.snowflakecomputing.com
port: 443
database: MY_DATABASE
schema: MY_SCHEMA
user: myuser
warehouse: MY_WAREHOUSE
private_key_file: /path/to/rsa_key.p8
private_key_file_pwd: mypassphrase # omit if key is unencrypted
Table Configuration
pipelines:
- name: snowflake_to_pg
source: snowflake_source
destination: pg_target
tables:
- name: MY_SCHEMA.EVENTS
target_name: stg_events
replication_method: full
fetchsize: 100000
Incremental Replication
- name: MY_SCHEMA.EVENTS
target_name: stg_events
replication_method: incremental
iterate_column: UPDATED_AT
iterate_column_type: datetime
partitions_column: ID
partitions_count: 8
fetchsize: 50000
Custom SQL
- name: MY_SCHEMA.EVENTS
target_name: stg_events
replication_method: full
custom_query: "SELECT ID, USER_ID, EVENT_TYPE, CREATED_AT FROM MY_SCHEMA.EVENTS WHERE {query_filter}"
Use {query_filter} as a placeholder — it is replaced with the incremental WHERE clause on incremental runs, or WHERE 1=1 on full runs.
Read Parallelism
Snowflake extractor uses the native Snowflake Spark connector with Spark's partition support. For large tables, set partitions_column and partitions_count to read in parallel:
- name: MY_SCHEMA.EVENTS
target_name: stg_events
replication_method: incremental
iterate_column: UPDATED_AT
iterate_column_type: datetime
partitions_column: ID # numeric column to split on
partitions_count: 8 # number of parallel Spark partitions
fetchsize: 50000
How it works
- Spark reads the min/max of
partitions_columnand divides the range intopartitions_countequal slices - Each slice is fetched by a separate Spark task via the Snowflake connector
fetchsizecontrols how many rows are fetched per round-trip
Performance Notes
- Full replication: partitioning is not applied (only works with
incremental). partitions_columnshould be a numeric column with good distribution (e.g. surrogate key).- Snowflake Warehouse size is the primary performance lever — a larger warehouse processes queries faster regardless of JDBC partitioning.
- Keep
fetchsizehigh (50,000–200,000) to minimize round-trips; Snowflake handles large result sets efficiently.
All Table Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
string | required | Snowflake table name (include schema if needed) |
target_name |
string | required | Destination table name |
replication_method |
full / incremental |
full |
Replication strategy |
iterate_column |
string | — | Column used for incremental watermark |
iterate_column_type |
int / datetime |
— | Type of iterate_column |
partitions_column |
string | same as iterate_column |
Column to split Spark reads on |
partitions_count |
int | 10 |
Number of parallel Spark partitions |
fetchsize |
int | 100000 |
Rows per JDBC fetch |
custom_query |
string | — | Override SQL with {query_filter} placeholder |
custom_query_file |
string | — | Path to SQL file (relative to sql/ dir) |
write_partitions |
int | — | Coalesce to N partitions before writing |
tags |
list | [] |
Tags for selective pipeline execution |
pass_on_error |
bool | false |
Skip table on error instead of failing |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mkpipe_extractor_snowflake-0.5.2.tar.gz.
File metadata
- Download URL: mkpipe_extractor_snowflake-0.5.2.tar.gz
- Upload date:
- Size: 8.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ca58ccf1301d0b0ed9e61629f776d86678d27b4c8bbbc3fd04ff7de6eb7d5579
|
|
| MD5 |
20466aa347d7a9fcfc1a5fe73e915290
|
|
| BLAKE2b-256 |
dcb5ba213e2f6f1d0096ef73c7238d77b51edec0d3b9a5083698670b5d501701
|
File details
Details for the file mkpipe_extractor_snowflake-0.5.2-py3-none-any.whl.
File metadata
- Download URL: mkpipe_extractor_snowflake-0.5.2-py3-none-any.whl
- Upload date:
- Size: 9.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dacf620cd76b56adfff5eee59142295724884a343e2ccaf840ac261844ed50f0
|
|
| MD5 |
b8926c8713f9792dba629a48b5de44a7
|
|
| BLAKE2b-256 |
d8beb2d1b4924326175566baca2b462b888601fc33b68efdf8eed69c21a7a23a
|