Skip to main content

Snowflake offline store for Feast

Project description

Feast Snowflake Offline & Online Store Support

Quickstart

Install feast-snowflake

pip install feast-snowflake

Create a feature repository

feast init feature_repo
cd feature_repo

Edit feature_store.yaml

set offline_store type to be feast_snowflake.SnowflakeOfflineStore
set online_store type to be feast_snowflake.SnowflakeOnlineStore

project: ...
registry: ...
provider: local
offline_store:
    type: feast_snowflake.SnowflakeOfflineStore
    account: SNOWFLAKE_DEPLOYMENT_URL #drop .snowflakecomputing.com
    user: USERNAME
    password: PASSWORD
    role: ROLE_NAME #remember cap sensitive
    warehouse: WAREHOUSE_NAME #remember cap sensitive
    database: DATABASE_NAME #remember cap sensitive
online_store:
  type: feast_snowflake.SnowflakeOnlineStore
  account: SNOWFLAKE_DEPLOYMENT_URL #drop .snowflakecomputing.com
  user: USERNAME
  password: PASSWORD
  role: ROLE_NAME #remember cap sensitive
  warehouse: WAREHOUSE_NAME #remember cap sensitive
  database: DATABASE_NAME #remember cap sensitive

Upload sample data to Snowflake

from feast_snowflake.snowflake_utils import create_new_snowflake_table, get_snowflake_conn
from snowflake.connector.pandas_tools import write_pandas
from feast import FeatureStore
import pandas as pd

fs = FeatureStore(repo_path=".")

with get_snowflake_conn(fs.config.offline_store) as conn:

  create_new_snowflake_table(conn, pd.read_parquet('data/driver_stats.parquet'), 'DRIVER_STATS')
  write_pandas(conn, pd.read_parquet('data/driver_stats.parquet'), 'DRIVER_STATS')

Replace the current text in example.py with the following:

# This is an example feature definition file
from datetime import timedelta
from feast import Entity, Feature, FeatureView, ValueType
from feast_snowflake import SnowflakeSource
import yaml

# Read data from Snowflake table
# Here we use a Table to reuse the original parquet data,
# but you can replace to your own Table or Query.
database = yaml.safe_load(open("feature_store.yaml"))["offline_store"]["database"]

driver_hourly_stats = SnowflakeSource(
    table=f'"{database}"."PUBLIC"."DRIVER_STATS"',
    #query = """ """,
    event_timestamp_column="event_timestamp",
    created_timestamp_column="created",
)

# Define an entity for the driver.
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id", )

# Define FeatureView
driver_hourly_stats_view = FeatureView(
    name="driver_hourly_stats",
    entities=["driver_id"],
    ttl=timedelta(weeks=4),
    features=[
        Feature(name="conv_rate", dtype=ValueType.FLOAT),
        Feature(name="acc_rate", dtype=ValueType.FLOAT),
        Feature(name="avg_daily_trips", dtype=ValueType.INT64),
    ],
    online=True,
    batch_source=driver_hourly_stats,
    tags={},
)

Work with your Offline & Online Snowflake Feature Store

from example import driver, driver_hourly_stats_view
from datetime import datetime, timedelta
import pandas as pd
from feast import FeatureStore

fs = FeatureStore(repo_path=".")

fs.apply([driver, driver_hourly_stats_view])

# Select features
features = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips"]

# Create an entity dataframe. This is the dataframe that will be enriched with historical features
entity_df = pd.DataFrame(
    {
        "event_timestamp": [
            pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
            for dt in pd.date_range(
                start=datetime.now() - timedelta(days=3),
                end=datetime.now(),
                periods=3,
            )
        ],
        "driver_id": [1001, 1002, 1003],
    }
)

# Retrieve historical features by joining the entity dataframe to the Snowflake table source
print("Retrieving training data...")
training_df = fs.get_historical_features(
    features=features, entity_df=entity_df
).to_df()
print(training_df)

print("Loading features into the online store...")
fs.materialize_incremental(end_date=datetime.now())

# Retrieve features from the online store
print("Retrieving online features...")
online_features = fs.get_online_features(
    features=features, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}],
).to_dict()
print(online_features)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

feast-snowflake-0.1.7.tar.gz (20.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page