Common utility functions for Crumbl Data Team
Project description
.oooooo. .o8 oooo ooooooooo.
d8P' `Y8b "888 `888 `888 `Y88.
888 oooo d8b oooo oooo ooo. .oo. .oo. 888oooo. 888 888 .d88' oooo ooo
888 `888""8P `888 `888 `888P"Y88bP"Y88b d88' `88b 888 888ooo88P' `88. .8'
888 888 888 888 888 888 888 888 888 888 888 `88..8'
`88b ooo 888 888 888 888 888 888 888 888 888 888 `888'
`Y8bood8P' d888b `V88V"V8P' o888o o888o o888o `Y8bod8P' o888o o888o .8'
.o..P'
`Y8P'
CrumblPy
Overview
CrumblPy is a Python package designed to simplify complex data operations and enhance Crumbl data workflow. It offers a comprehensive set of tools and utilities that integrate seamlessly with Python projects, allowing you to focus on building and analyzing without unnecessary overhead.
Installation
You can install CrumblPy using pip:
pip install crumblpy
Features
CrumblPy provides four main modules:
- Email Module: Send emails with attachments through Gmail API
- Snowflake Module: Connect to and interact with Snowflake databases
- Slack Module: Send messages and files to Slack channels
- AWS Module: Read from and write to Amazon S3 buckets with compressed JSON data, and scan DynamoDB tables
Quickstart
import crumblpy
# Email functionality
from crumblpy import send_gmail, generate_token
# Snowflake functionality
from crumblpy import SnowflakeToolKit
# Slack functionality
from crumblpy import SlackToolKit
# AWS functionality (S3 and DynamoDB)
from crumblpy import AWSToolKit
Email Module
The email module provides Gmail API integration for sending emails with attachments.
Functions
send_gmail(sender, recipient, subject, body, token, html_body=False, image_paths=None, attachment_paths=None)
Sends an email using the Gmail API.
Parameters:
sender(str): The email address of the senderrecipient(str): The email address of the recipientsubject(str): The subject of the emailbody(str): The body of the emailtoken(dict): The token data for authenticationhtml_body(bool, optional): Whether the body is HTML or plain text. Defaults to Falseimage_paths(List[str], optional): List of paths to images to attachattachment_paths(List[str], optional): List of paths to files to attach
Example:
import json
from crumblpy import send_gmail
# Load your token (generated using generate_token).
token = json.load(open('token.json'))
send_gmail(
sender='your-email@gmail.com',
recipient='recipient@example.com',
subject='Test Email',
body='This is a test email',
token=token,
html_body=True,
attachment_paths=['report.pdf', 'data.csv']
)
⚠️ Security Warning: The above example is for local development only. In production environments, use Doppler or Prefect blocks to securely manage credentials instead of storing them in JSON files.
generate_token(credential, scopes=['https://www.googleapis.com/auth/gmail.send'], write_to_file=False)
Generates authentication token for Gmail API access.
Parameters:
credential(dict): The credential data from Google Cloud Consolescopes(list, optional): List of OAuth scopes. Defaults to Gmail send scopewrite_to_file(bool, optional): Whether to write token to file. Defaults to False
Note: This function requires manual browser authorization.
Example:
import json
from crumblpy import generate_token
# Load your credentials from Google Cloud Console
credentials = json.load(open('credentials.json'))
generate_token(credentials, write_to_file=True)
⚠️ Security Warning: This example shows local development usage. In production, manage credentials securely using Doppler or Prefect blocks rather than storing them in JSON files.
Snowflake Module
The Snowflake module provides a toolkit for connecting to and interacting with Snowflake databases.
SnowflakeToolKit Class
__init__(prefect=False, user=None, password=None, role=None, schema='DATA_SCIENCE', warehouse='DATA_SCIENCE_TEAM')
Initialize the Snowflake connection.
Parameters:
prefect(bool, optional): Use Prefect secrets for authentication. Defaults to Falseuser(str, optional): Snowflake usernamepassword(str, optional): Snowflake passwordrole(str, optional): Snowflake roleschema(str, optional): Default schema. Defaults to 'DATA_SCIENCE'warehouse(str, optional): Snowflake warehouse. Defaults to 'DATA_SCIENCE_TEAM'
Methods
connect()
Establishes connection to Snowflake.
fetch_data(sql_query)
Fetch data from Snowflake using a SQL query.
Parameters:
sql_query(str): SQL query to execute
Returns:
pandas.DataFrame: Query results as a DataFrame
insert_data(df, table_name, auto_create_table=False)
Insert pandas DataFrame into Snowflake table.
Parameters:
df(pandas.DataFrame): DataFrame to inserttable_name(str): Target table nameauto_create_table(bool, optional): Whether to auto-create table. Defaults to False
execute_query(sql_query)
Execute a SQL query in Snowflake (useful for DML queries).
Parameters:
sql_query(str): SQL query to execute
Example:
from crumblpy import SnowflakeToolKit
import pandas as pd
# Initialize with environment variables.
sf = SnowflakeToolKit()
# Or initialize with explicit credentials (local development only)
sf = SnowflakeToolKit(
user='your_username',
password='your_password',
role='your_role'
)
# For production, use Prefect blocks
sf = SnowflakeToolKit(prefect=True)
# Fetch data
df = sf.fetch_data("SELECT * FROM your_table LIMIT 100")
# Insert data
new_data = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['a', 'b', 'c']})
sf.insert_data(new_data, 'your_target_table', auto_create_table=True)
# Execute query
sf.execute_query("UPDATE your_table SET col1 = 0 WHERE col2 = 'a'")
⚠️ Security Warning: Explicit credentials shown above are for local experimentation only. In production environments, use
prefect=Trueparameter to leverage Prefect blocks or use Doppler for secure credential management.
Slack Module
The Slack module provides integration with Slack for sending messages and files.
SlackToolKit Class
__init__(prefect=False, token=None, default_channel='U04RAQM788L')
Initialize the Slack client.
Parameters:
prefect(bool, optional): Use Prefect secrets for authentication. Defaults to Falsetoken(str, optional): Slack bot tokendefault_channel(str, optional): Default channel ID. Defaults to 'U04RAQM788L'
Methods
post_message(message=None, channel=None, thread_id=None, blocks=None)
Send a message to a Slack channel.
Parameters:
message(str, optional): Message textchannel(str, optional): Channel ID or user IDthread_id(str, optional): Thread timestamp for threaded messagesblocks(list, optional): Slack Block Kit blocks
post_file(file_path, message, channel=None, thread_id=None)
Upload a file to Slack channel.
Parameters:
file_path(str): Path to the file to uploadmessage(str): Message to accompany the filechannel(str, optional): Channel ID or user IDthread_id(str, optional): Thread timestamp
Note: This method automatically deletes the file after upload.
get_thread_id(channel)
Get the timestamp of the most recent message in a channel.
Parameters:
channel(str): Channel ID
Returns:
str: Thread timestamp
push_notification(project=None, channel=None, e=None)
Send a notification about project status.
Parameters:
project(str, optional): Project namechannel(str, optional): Channel IDe(Exception, optional): Exception object if there was an error
Example:
from crumblpy import SlackToolKit
# Initialize with environment variable
slack = SlackToolKit()
# Or initialize with explicit token (local development only)
slack = SlackToolKit(token='your-slack-token')
# For production, use Prefect blocks
slack = SlackToolKit(prefect=True)
# Send a message
slack.post_message("Hello from CrumblPy!", channel='your-channel-id')
# Send a file
slack.post_file('report.pdf', 'Here is the daily report', channel='your-channel-id')
# Send notification
slack.push_notification(project='Data Pipeline', channel='your-channel-id')
# Send error notification
try:
# Some operation that might fail
pass
except Exception as e:
slack.push_notification(project='Data Pipeline', channel='#alerts', e=e)
⚠️ Security Warning: Examples showing explicit tokens are for local experimentation only. In production environments, use
prefect=Trueparameter to leverage Prefect blocks or use Doppler for secure credential management.
AWS Module
The AWS module provides integration with Amazon S3 for reading and writing compressed JSON data, and DynamoDB for scanning tables.
AWSToolKit Class
__init__(aws_access_key_id=None, aws_secret_access_key=None, prefect=False)
Initialize the AWS clients.
Parameters:
aws_access_key_id(str, optional): AWS access key IDaws_secret_access_key(str, optional): AWS secret access keyprefect(bool, optional): Use Prefect secrets for authentication. Defaults to False
Methods
write_to_s3(df, bucket_name, key)
Write pandas DataFrame to S3 as compressed JSON.
Parameters:
df(pandas.DataFrame): DataFrame to writebucket_name(str): S3 bucket namekey(str): S3 object key/path
Note: Data is automatically compressed using gzip and stored in JSON Lines format.
read_from_s3(bucket_name, key)
Read compressed JSON data from S3 and return as pandas DataFrame.
Parameters:
bucket_name(str): S3 bucket namekey(str): S3 object key/path
Returns:
pandas.DataFrame: Data from S3 as a DataFrame
scan_dynamodb_table(table_name, filter_expression=None, expression_attribute_values=None, projection_expression=None, expression_attribute_names=None)
Scan DynamoDB table completely using pagination and return as pandas DataFrame.
Parameters:
table_name(str): DynamoDB table namefilter_expression(str, optional): Filter expression for the scanexpression_attribute_values(dict, optional): Expression attribute valuesprojection_expression(str, optional): Projection expression to specify attributes to retrieveexpression_attribute_names(dict, optional): Expression attribute names for reserved keywords
Returns:
pandas.DataFrame: All items from DynamoDB table as a DataFrame
Note: This method automatically handles pagination using LastEvaluatedKey to retrieve all records.
DynamoDB Parameter Guide:
filter_expression: Use placeholders like:valuefor values and#attrfor attribute namesexpression_attribute_values: Dictionary mapping placeholders (:key) to actual valuesexpression_attribute_names: Dictionary mapping placeholders (#key) to actual attribute names (required for reserved keywords)projection_expression: Comma-separated list of attributes to retrieve (use#attrfor reserved keywords)
Common Filter Expression Operators:
- Equality:
attribute = :value - Comparison:
attribute > :value,attribute < :value,attribute >= :value,attribute <= :value - Between:
attribute BETWEEN :low AND :high - Contains:
contains(attribute, :value) - Multiple conditions: Use
AND,OR,NOT
Example:
from crumblpy import AWSToolKit
import pandas as pd
# Initialize with environment variables
aws = AWSToolKit()
# Or initialize with explicit credentials (local development only)
aws = AWSToolKit(
aws_access_key_id='your_access_key',
aws_secret_access_key='your_secret_key'
)
# For production, use Prefect blocks
aws = AWSToolKit(prefect=True)
# Write DataFrame to S3
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['a', 'b', 'c']})
aws.write_to_s3(df, 'your-bucket-name', 'data/output.json.gz')
# Read DataFrame from S3
loaded_df = aws.read_from_s3('your-bucket-name', 'data/output.json.gz')
# Scan DynamoDB table with filters (basic example)
df_dynamo = aws.scan_dynamodb_table(
table_name='ProductModifierOptionOverride_prod',
filter_expression='marketingType = :mt AND overrideType = :ot',
expression_attribute_values={
':mt': 'MYSTERY_PICK',
':ot': 'STORE_CHOICE'
},
projection_expression='metadata, storeId, startDate, createdAt'
)
# Scan with reserved keywords (using expression_attribute_names)
df_with_reserved = aws.scan_dynamodb_table(
table_name='Store_prod',
filter_expression='#status = :status_val AND #date > :date_val',
expression_attribute_names={
'#status': 'status', # 'status' is a reserved keyword
'#date': 'startDate' # 'date' is a reserved keyword
},
expression_attribute_values={
':status_val': 'ACTIVE',
':date_val': '2025-01-01'
}
)
# For more advanced examples, check out the boto3 docs.
# Scan entire table without filters
all_items = aws.scan_dynamodb_table('your-table-name')
⚠️ Security Warning: Explicit credentials shown above are for local experimentation only. In production environments, use
prefect=Trueparameter to leverage Prefect blocks or use Doppler for secure credential management.
Environment Variables
CrumblPy uses the following environment variables when explicit credentials are not provided:
SNOWFLAKE_USER: Snowflake usernameSNOWFLAKE_PASSWORD: Snowflake passwordSLACK_TOKEN: Slack bot tokenAWS_ACCESS_KEY_ID: AWS access key IDAWS_SECRET_ACCESS_KEY: AWS secret access key
Authentication Setup
🔒 Production Security Note: The setup instructions below are primarily for local development and experimentation. For production deployments, always use secure credential management solutions like Doppler or Prefect blocks instead of environment variables or local credential files.
Gmail API Setup
- Go to Google Cloud Console
- Create a new project or select existing one
- Enable Gmail API
- Create credentials (OAuth 2.0 Client ID)
- Download credentials JSON file
- Use
generate_token()function to create authentication token
Snowflake Setup
Set environment variables or use explicit credentials:
export SNOWFLAKE_USER="your_username"
export SNOWFLAKE_PASSWORD="your_password"
Slack Setup
- Create a Slack app at api.slack.com
- Add bot token scopes:
chat:write,files:write,channels:history - Install app to workspace
- Copy Bot User OAuth Token
- Set environment variable:
export SLACK_TOKEN="xoxb-your-token-here"
AWS S3 Setup
- Create AWS account or use existing one
- Go to AWS IAM Console
- Create a new user or use existing one
- Attach appropriate S3 permissions (e.g., AmazonS3FullAccess or custom policy)
- Create access keys for the user
- Set environment variables:
export AWS_ACCESS_KEY_ID="your_access_key_id"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key"
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file crumblpy-1.1.7.tar.gz.
File metadata
- Download URL: crumblpy-1.1.7.tar.gz
- Upload date:
- Size: 14.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.16
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f437f3770a55916ec98d678714274a1334b591961925ac268c61dc4b08df9ccb
|
|
| MD5 |
e68dbe394e67e032a47e37bae2f5d5ad
|
|
| BLAKE2b-256 |
237bef9338f5aa69d0700623aaa2d1a76490e0ee981d9c0390f85419766661b3
|
File details
Details for the file crumblpy-1.1.7-py3-none-any.whl.
File metadata
- Download URL: crumblpy-1.1.7-py3-none-any.whl
- Upload date:
- Size: 16.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.16
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
21d874a0655b1dfd872fb987454c259c3492c0ff11653559134f5b3d9bcffcb9
|
|
| MD5 |
eff608094ab2976e88104dfa50cdf264
|
|
| BLAKE2b-256 |
94910af0efb7eacdc6be178138fde5179cee9da592125d3638a0c496bee0ef51
|