Skip to main content

Prefect integrations for interacting with Snowflake

Project description

prefect-snowflake

PyPI

Welcome!

The prefect-snowflake collection makes it easy to connect to a Snowflake database in your Prefect flows. Check out the examples below to get started!

Getting Started

Integrate with Prefect flows

Prefect works with Snowflake by providing dataflow automation for faster, more efficient data pipeline creation, execution, and monitoring.

This results in reduced errors, increased confidence in your data, and ultimately, faster insights.

To set up a table, use the execute and execute_many methods. Then, use the fetch_many method to retrieve data in a stream until there's no more data.

By using the SnowflakeConnector as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you're done with them.

Be sure to install prefect-snowflake and save to block to run the examples below!

=== "Sync"

from prefect import flow, task
from prefect_snowflake import SnowflakeConnector


@task
def setup_table(block_name: str) -> None:
    with SnowflakeConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def snowflake_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows

snowflake_flow()

=== "Async"

from prefect import flow, task
from prefect_snowflake import SnowflakeConnector
import asyncio

@task
async def setup_table(block_name: str) -> None:
    with await SnowflakeConnector.load(block_name) as connector:
        await connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
async def fetch_data(block_name: str) -> list:
    all_rows = []
    with await SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
async def snowflake_flow(block_name: str) -> list:
    await setup_table(block_name)
    all_rows = await fetch_data(block_name)
    return all_rows

asyncio.run(snowflake_flow("example"))

Access underlying Snowflake connection

If the native methods of the block don't meet your requirements, don't worry.

You have the option to access the underlying Snowflake connection and utilize its built-in methods as well.

import pandas as pd
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.pandas_tools import write_pandas

@flow
def snowflake_write_pandas_flow():
    connector = SnowflakeConnector.load("my-block")
    with connector.get_connection() as connection:
        table_name = "TABLE_NAME"
        ddl = "NAME STRING, NUMBER INT"
        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
        with connection.cursor() as cursor:
            cursor.execute(statement)

        # case sensitivity matters here!
        df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
        success, num_chunks, num_rows, _ = write_pandas(
            conn=connection,
            df=df,
            table_name=table_name,
            database=snowflake_connector.database,
            schema=snowflake_connector.schema_  # note the "_" suffix
        )

Resources

For more tips on how to use tasks and flows in an integration, check out Using Collections!

Installation

Install prefect-snowflake with pip:

pip install prefect-snowflake

A list of available blocks in prefect-snowflake and their setup instructions can be found here.

Requires an installation of Python 3.8+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Saving credentials to block

Note, to use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

Below is a walkthrough on saving a SnowflakeCredentials block through code.

  1. Head over to https://app.snowflake.com/.
  2. Login to your Snowflake account, e.g. nh12345.us-east-2.aws, with your username and password.
  3. Use those credentials to fill replace the placeholders below.
from prefect_snowflake import SnowflakeCredentials

credentials = SnowflakeCredentials(
    account="ACCOUNT-PLACEHOLDER",  # resembles nh12345.us-east-2.aws
    user="USER-PLACEHOLDER",
    password="PASSWORD-PLACEHOLDER"
)
credentials.save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

Then, to create a SnowflakeConnector block:

  1. After logging in, click on any worksheet.
  2. On the left side, select a database and schema.
  3. On the top right, select a warehouse.
  4. Create a short script, replacing the placeholders below.
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

credentials = SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

connector = SnowflakeConnector(
    credentials=credentials,
    database="DATABASE-PLACEHOLDER",
    schema="SCHEMA-PLACEHOLDER",
    warehouse="COMPUTE_WH",
)
connector.save("CONNECTOR-BLOCK-NAME-PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials and connection info:

from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
SnowflakeConnector.load("CONNECTOR-BLOCK-NAME-PLACEHOLDER")

!!! info "Registering blocks"

Register blocks in this module to view and edit them on Prefect Cloud:

prefect block register -m prefect_snowflake

A list of available blocks in prefect-snowflake and their setup instructions can be found here.

Feedback

If you encounter any bugs while using prefect-snowflake, feel free to open an issue in the prefect-snowflake repository.

If you have any questions or issues while using prefect-snowflake, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-snowflake for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-snowflake, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prefect_snowflake-0.27.7.tar.gz (28.8 kB view details)

Uploaded Source

Built Distribution

prefect_snowflake-0.27.7-py3-none-any.whl (19.0 kB view details)

Uploaded Python 3

File details

Details for the file prefect_snowflake-0.27.7.tar.gz.

File metadata

  • Download URL: prefect_snowflake-0.27.7.tar.gz
  • Upload date:
  • Size: 28.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for prefect_snowflake-0.27.7.tar.gz
Algorithm Hash digest
SHA256 01ec7a37936d5a8e35aa7fc26854f6b769993d926b476ab7f49aa6b298b3a61f
MD5 c7ba5f29ddd789f438efdb5d5f8a30f6
BLAKE2b-256 3c6b3a32dc634666b430428c303824f145558f70ee614f85bdfd2287e130ef88

See more details on using hashes here.

File details

Details for the file prefect_snowflake-0.27.7-py3-none-any.whl.

File metadata

File hashes

Hashes for prefect_snowflake-0.27.7-py3-none-any.whl
Algorithm Hash digest
SHA256 b3b8f563cd41e1b89575c503b7b6d0e6bc97e193e1ca51059b9248cc1093414a
MD5 002e48f3f0ca5f1377e5a77bf4fe7d0c
BLAKE2b-256 cd865498d03c208bb22e0c8e6eb1b3c8ed5fddb8a2df3e5eb013c43cd1d78e0a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page