Skip to main content

Common utility functions for Crumbl Data Team

Project description

  .oooooo.                                           .o8       oooo  ooooooooo.               
 d8P'  `Y8b                                         "888       `888  `888   `Y88.             
888          oooo d8b oooo  oooo  ooo. .oo.  .oo.    888oooo.   888   888   .d88' oooo    ooo 
888          `888""8P `888  `888  `888P"Y88bP"Y88b   d88' `88b  888   888ooo88P'   `88.  .8'  
888           888      888   888   888   888   888   888   888  888   888           `88..8'   
`88b    ooo   888      888   888   888   888   888   888   888  888   888            `888'    
 `Y8bood8P'  d888b     `V88V"V8P' o888o o888o o888o  `Y8bod8P' o888o o888o            .8'     
                                                                                  .o..P'      
                                                                                  `Y8P'       

CrumblPy

Powered by CDT

Overview

CrumblPy is a Python package designed to simplify complex data operations and enhance Crumbl data workflow. It offers a comprehensive set of tools and utilities that integrate seamlessly with Python projects, allowing you to focus on building and analyzing without unnecessary overhead.


Installation

You can install CrumblPy using pip:

pip install crumblpy

Features

CrumblPy provides four main modules:

  • Email Module: Send emails with attachments through Gmail API
  • Snowflake Module: Connect to and interact with Snowflake databases
  • Slack Module: Send messages and files to Slack channels
  • AWS Module: Read from and write to Amazon S3 buckets with compressed JSON data, and scan DynamoDB tables

Quickstart

import crumblpy

# Email functionality
from crumblpy import send_gmail, generate_token

# Snowflake functionality
from crumblpy import SnowflakeToolKit

# Slack functionality
from crumblpy import SlackToolKit

# AWS functionality (S3 and DynamoDB)
from crumblpy import AWSToolKit

Email Module

The email module provides Gmail API integration for sending emails with attachments.

Functions

send_gmail(sender, recipient, subject, body, token, html_body=False, image_paths=None, attachment_paths=None)

Sends an email using the Gmail API.

Parameters:

  • sender (str): The email address of the sender
  • recipient (str): The email address of the recipient
  • subject (str): The subject of the email
  • body (str): The body of the email
  • token (dict): The token data for authentication
  • html_body (bool, optional): Whether the body is HTML or plain text. Defaults to False
  • image_paths (List[str], optional): List of paths to images to attach
  • attachment_paths (List[str], optional): List of paths to files to attach

Example:

import json
from crumblpy import send_gmail

# Load your token (generated using generate_token).
token = json.load(open('token.json'))

send_gmail(
    sender='your-email@gmail.com',
    recipient='recipient@example.com',
    subject='Test Email',
    body='This is a test email',
    token=token,
    html_body=True,
    attachment_paths=['report.pdf', 'data.csv']
)

⚠️ Security Warning: The above example is for local development only. In production environments, use Doppler or Prefect blocks to securely manage credentials instead of storing them in JSON files.

generate_token(credential, scopes=['https://www.googleapis.com/auth/gmail.send'], write_to_file=False)

Generates authentication token for Gmail API access.

Parameters:

  • credential (dict): The credential data from Google Cloud Console
  • scopes (list, optional): List of OAuth scopes. Defaults to Gmail send scope
  • write_to_file (bool, optional): Whether to write token to file. Defaults to False

Note: This function requires manual browser authorization.

Example:

import json
from crumblpy import generate_token

# Load your credentials from Google Cloud Console
credentials = json.load(open('credentials.json'))

generate_token(credentials, write_to_file=True)

⚠️ Security Warning: This example shows local development usage. In production, manage credentials securely using Doppler or Prefect blocks rather than storing them in JSON files.


Snowflake Module

The Snowflake module provides a toolkit for connecting to and interacting with Snowflake databases.

SnowflakeToolKit Class

__init__(prefect=False, user=None, password=None, role=None, schema='DATA_SCIENCE', warehouse='DATA_SCIENCE_TEAM')

Initialize the Snowflake connection.

Parameters:

  • prefect (bool, optional): Use Prefect secrets for authentication. Defaults to False
  • user (str, optional): Snowflake username
  • password (str, optional): Snowflake password
  • role (str, optional): Snowflake role
  • schema (str, optional): Default schema. Defaults to 'DATA_SCIENCE'
  • warehouse (str, optional): Snowflake warehouse. Defaults to 'DATA_SCIENCE_TEAM'

Methods

connect()

Establishes connection to Snowflake.

fetch_data(sql_query)

Fetch data from Snowflake using a SQL query.

Parameters:

  • sql_query (str): SQL query to execute

Returns:

  • pandas.DataFrame: Query results as a DataFrame
insert_data(df, table_name, auto_create_table=False)

Insert pandas DataFrame into Snowflake table.

Parameters:

  • df (pandas.DataFrame): DataFrame to insert
  • table_name (str): Target table name
  • auto_create_table (bool, optional): Whether to auto-create table. Defaults to False
execute_query(sql_query)

Execute a SQL query in Snowflake (useful for DML queries).

Parameters:

  • sql_query (str): SQL query to execute

Example:

from crumblpy import SnowflakeToolKit
import pandas as pd

# Initialize with environment variables.
sf = SnowflakeToolKit()

# Or initialize with explicit credentials (local development only)
sf = SnowflakeToolKit(
    user='your_username',
    password='your_password',
    role='your_role'
)

# For production, use Prefect blocks
sf = SnowflakeToolKit(prefect=True)

# Fetch data
df = sf.fetch_data("SELECT * FROM your_table LIMIT 100")

# Insert data
new_data = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['a', 'b', 'c']})
sf.insert_data(new_data, 'your_target_table', auto_create_table=True)

# Execute query
sf.execute_query("UPDATE your_table SET col1 = 0 WHERE col2 = 'a'")

⚠️ Security Warning: Explicit credentials shown above are for local experimentation only. In production environments, use prefect=True parameter to leverage Prefect blocks or use Doppler for secure credential management.


Slack Module

The Slack module provides integration with Slack for sending messages and files.

SlackToolKit Class

__init__(prefect=False, token=None, default_channel='U04RAQM788L')

Initialize the Slack client.

Parameters:

  • prefect (bool, optional): Use Prefect secrets for authentication. Defaults to False
  • token (str, optional): Slack bot token
  • default_channel (str, optional): Default channel ID. Defaults to 'U04RAQM788L'

Methods

post_message(message=None, channel=None, thread_id=None, blocks=None)

Send a message to a Slack channel.

Parameters:

  • message (str, optional): Message text
  • channel (str, optional): Channel ID or user ID
  • thread_id (str, optional): Thread timestamp for threaded messages
  • blocks (list, optional): Slack Block Kit blocks
post_file(file_path, message, channel=None, thread_id=None)

Upload a file to Slack channel.

Parameters:

  • file_path (str): Path to the file to upload
  • message (str): Message to accompany the file
  • channel (str, optional): Channel ID or user ID
  • thread_id (str, optional): Thread timestamp

Note: This method automatically deletes the file after upload.

get_thread_id(channel)

Get the timestamp of the most recent message in a channel.

Parameters:

  • channel (str): Channel ID

Returns:

  • str: Thread timestamp
push_notification(project=None, channel=None, e=None)

Send a notification about project status.

Parameters:

  • project (str, optional): Project name
  • channel (str, optional): Channel ID
  • e (Exception, optional): Exception object if there was an error

Example:

from crumblpy import SlackToolKit

# Initialize with environment variable
slack = SlackToolKit()

# Or initialize with explicit token (local development only)
slack = SlackToolKit(token='your-slack-token')

# For production, use Prefect blocks
slack = SlackToolKit(prefect=True)

# Send a message
slack.post_message("Hello from CrumblPy!", channel='your-channel-id')

# Send a file
slack.post_file('report.pdf', 'Here is the daily report', channel='your-channel-id')

# Send notification
slack.push_notification(project='Data Pipeline', channel='your-channel-id')

# Send error notification
try:
    # Some operation that might fail
    pass
except Exception as e:
    slack.push_notification(project='Data Pipeline', channel='#alerts', e=e)

⚠️ Security Warning: Examples showing explicit tokens are for local experimentation only. In production environments, use prefect=True parameter to leverage Prefect blocks or use Doppler for secure credential management.


AWS Module

The AWS module provides integration with Amazon S3 for reading and writing compressed JSON data, and DynamoDB for scanning tables.

AWSToolKit Class

__init__(aws_access_key_id=None, aws_secret_access_key=None, prefect=False)

Initialize the AWS clients.

Parameters:

  • aws_access_key_id (str, optional): AWS access key ID
  • aws_secret_access_key (str, optional): AWS secret access key
  • prefect (bool, optional): Use Prefect secrets for authentication. Defaults to False

Methods

write_to_s3(df, bucket_name, key)

Write pandas DataFrame to S3 as compressed JSON.

Parameters:

  • df (pandas.DataFrame): DataFrame to write
  • bucket_name (str): S3 bucket name
  • key (str): S3 object key/path

Note: Data is automatically compressed using gzip and stored in JSON Lines format.

read_from_s3(bucket_name, key)

Read compressed JSON data from S3 and return as pandas DataFrame.

Parameters:

  • bucket_name (str): S3 bucket name
  • key (str): S3 object key/path

Returns:

  • pandas.DataFrame: Data from S3 as a DataFrame
scan_dynamodb_table(table_name, filter_expression=None, expression_attribute_values=None, projection_expression=None, expression_attribute_names=None)

Scan DynamoDB table completely using pagination and return as pandas DataFrame.

Parameters:

  • table_name (str): DynamoDB table name
  • filter_expression (str, optional): Filter expression for the scan
  • expression_attribute_values (dict, optional): Expression attribute values
  • projection_expression (str, optional): Projection expression to specify attributes to retrieve
  • expression_attribute_names (dict, optional): Expression attribute names for reserved keywords

Returns:

  • pandas.DataFrame: All items from DynamoDB table as a DataFrame

Note: This method automatically handles pagination using LastEvaluatedKey to retrieve all records.

DynamoDB Parameter Guide:

  • filter_expression: Use placeholders like :value for values and #attr for attribute names
  • expression_attribute_values: Dictionary mapping placeholders (:key) to actual values
  • expression_attribute_names: Dictionary mapping placeholders (#key) to actual attribute names (required for reserved keywords)
  • projection_expression: Comma-separated list of attributes to retrieve (use #attr for reserved keywords)

Common Filter Expression Operators:

  • Equality: attribute = :value
  • Comparison: attribute > :value, attribute < :value, attribute >= :value, attribute <= :value
  • Between: attribute BETWEEN :low AND :high
  • Contains: contains(attribute, :value)
  • Multiple conditions: Use AND, OR, NOT

Example:

from crumblpy import AWSToolKit
import pandas as pd

# Initialize with environment variables
aws = AWSToolKit()

# Or initialize with explicit credentials (local development only)
aws = AWSToolKit(
    aws_access_key_id='your_access_key',
    aws_secret_access_key='your_secret_key'
)

# For production, use Prefect blocks
aws = AWSToolKit(prefect=True)

# Write DataFrame to S3
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['a', 'b', 'c']})
aws.write_to_s3(df, 'your-bucket-name', 'data/output.json.gz')

# Read DataFrame from S3
loaded_df = aws.read_from_s3('your-bucket-name', 'data/output.json.gz')

# Scan DynamoDB table with filters (basic example)
df_dynamo = aws.scan_dynamodb_table(
    table_name='ProductModifierOptionOverride_prod',
    filter_expression='marketingType = :mt AND overrideType = :ot',
    expression_attribute_values={
        ':mt': 'MYSTERY_PICK', 
        ':ot': 'STORE_CHOICE'
    },
    projection_expression='metadata, storeId, startDate, createdAt'
)

# Scan with reserved keywords (using expression_attribute_names)
df_with_reserved = aws.scan_dynamodb_table(
    table_name='Store_prod',
    filter_expression='#status = :status_val AND #date > :date_val',
    expression_attribute_names={
        '#status': 'status',  # 'status' is a reserved keyword
        '#date': 'startDate'       # 'date' is a reserved keyword  
    },
    expression_attribute_values={
        ':status_val': 'ACTIVE',
        ':date_val': '2025-01-01'
    }
)

# For more advanced examples, check out the boto3 docs.

# Scan entire table without filters
all_items = aws.scan_dynamodb_table('your-table-name')

⚠️ Security Warning: Explicit credentials shown above are for local experimentation only. In production environments, use prefect=True parameter to leverage Prefect blocks or use Doppler for secure credential management.


Environment Variables

CrumblPy uses the following environment variables when explicit credentials are not provided:

  • SNOWFLAKE_USER: Snowflake username
  • SNOWFLAKE_PASSWORD: Snowflake password
  • SLACK_TOKEN: Slack bot token
  • AWS_ACCESS_KEY_ID: AWS access key ID
  • AWS_SECRET_ACCESS_KEY: AWS secret access key

Authentication Setup

🔒 Production Security Note: The setup instructions below are primarily for local development and experimentation. For production deployments, always use secure credential management solutions like Doppler or Prefect blocks instead of environment variables or local credential files.

Gmail API Setup

  1. Go to Google Cloud Console
  2. Create a new project or select existing one
  3. Enable Gmail API
  4. Create credentials (OAuth 2.0 Client ID)
  5. Download credentials JSON file
  6. Use generate_token() function to create authentication token

Snowflake Setup

Set environment variables or use explicit credentials:

export SNOWFLAKE_USER="your_username"
export SNOWFLAKE_PASSWORD="your_password"

Slack Setup

  1. Create a Slack app at api.slack.com
  2. Add bot token scopes: chat:write, files:write, channels:history
  3. Install app to workspace
  4. Copy Bot User OAuth Token
  5. Set environment variable:
export SLACK_TOKEN="xoxb-your-token-here"

AWS S3 Setup

  1. Create AWS account or use existing one
  2. Go to AWS IAM Console
  3. Create a new user or use existing one
  4. Attach appropriate S3 permissions (e.g., AmazonS3FullAccess or custom policy)
  5. Create access keys for the user
  6. Set environment variables:
export AWS_ACCESS_KEY_ID="your_access_key_id"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

crumblpy-1.1.8.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

crumblpy-1.1.8-py3-none-any.whl (16.9 kB view details)

Uploaded Python 3

File details

Details for the file crumblpy-1.1.8.tar.gz.

File metadata

  • Download URL: crumblpy-1.1.8.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.16

File hashes

Hashes for crumblpy-1.1.8.tar.gz
Algorithm Hash digest
SHA256 778b8f1889781510e6d70e2b11d5b3e8e5fb797d70f19f1eb817fba4293ca3e9
MD5 b42070cb85035cb62caf79480cc3f50f
BLAKE2b-256 b11a99bd151153a9d63d5a0ec9c2f0b0d48008c2ce488455779a7314f1aee3e8

See more details on using hashes here.

File details

Details for the file crumblpy-1.1.8-py3-none-any.whl.

File metadata

  • Download URL: crumblpy-1.1.8-py3-none-any.whl
  • Upload date:
  • Size: 16.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.16

File hashes

Hashes for crumblpy-1.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 2f01a91a67c940ce4421642b4ee8375725ff5dfe7a8b0657cfe9f5bd9e31b984
MD5 37de6cba355ed99f1797dd979f48aa1b
BLAKE2b-256 a17dde3c536849c7476b5dd2b170d489ec4b9a849d52b41c2cb47f844d3822a9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page