Skip to main content

Common utility functions for Crumbl Data Team

Project description

  .oooooo.                                           .o8       oooo  ooooooooo.               
 d8P'  `Y8b                                         "888       `888  `888   `Y88.             
888          oooo d8b oooo  oooo  ooo. .oo.  .oo.    888oooo.   888   888   .d88' oooo    ooo 
888          `888""8P `888  `888  `888P"Y88bP"Y88b   d88' `88b  888   888ooo88P'   `88.  .8'  
888           888      888   888   888   888   888   888   888  888   888           `88..8'   
`88b    ooo   888      888   888   888   888   888   888   888  888   888            `888'    
 `Y8bood8P'  d888b     `V88V"V8P' o888o o888o o888o  `Y8bod8P' o888o o888o            .8'     
                                                                                  .o..P'      
                                                                                  `Y8P'       

CrumblPy

Powered by CDT

Overview

CrumblPy is a Python package designed to simplify complex data operations and enhance Crumbl data workflow. It offers a comprehensive set of tools and utilities that integrate seamlessly with Python projects, allowing you to focus on building and analyzing without unnecessary overhead.


Installation

You can install CrumblPy using pip:

pip install crumblpy

ℹ️ AWSToolKit users: Install the AWS CLI before using the AWS tooling in CrumblPy.


Features

CrumblPy provides four main modules:

  • Email Module: Send emails with attachments through Gmail API
  • Snowflake Module: Connect to and interact with Snowflake databases
  • Slack Module: Send messages and files to Slack channels
  • AWS Module: Read from and write to Amazon S3 buckets with compressed JSON data, and scan DynamoDB tables

Quickstart

import crumblpy

# Email functionality
from crumblpy import send_gmail, generate_token

# Snowflake functionality
from crumblpy import SnowflakeToolKit

# Slack functionality
from crumblpy import SlackToolKit

# AWS functionality (S3 and DynamoDB)
from crumblpy import AWSToolKit

Email Module

The email module provides Gmail API integration for sending emails with attachments.

Functions

send_gmail(sender, recipient, subject, body, token, html_body=False, image_paths=None, attachment_paths=None)

Sends an email using the Gmail API.

Parameters:

  • sender (str): The email address of the sender
  • recipient (str): The email address of the recipient
  • subject (str): The subject of the email
  • body (str): The body of the email
  • token (dict): The token data for authentication
  • html_body (bool, optional): Whether the body is HTML or plain text. Defaults to False
  • image_paths (List[str], optional): List of paths to images to attach
  • attachment_paths (List[str], optional): List of paths to files to attach

Example:

import json
from crumblpy import send_gmail

# Load your token (generated using generate_token).
token = json.load(open('token.json'))

send_gmail(
    sender='your-email@gmail.com',
    recipient='recipient@example.com',
    subject='Test Email',
    body='This is a test email',
    token=token,
    html_body=True,
    attachment_paths=['report.pdf', 'data.csv']
)

⚠️ Security Warning: The above example is for local development only. In production environments, use Doppler or Prefect blocks to securely manage credentials instead of storing them in JSON files.

generate_token(credential, scopes=['https://www.googleapis.com/auth/gmail.send'], write_to_file=False)

Generates authentication token for Gmail API access.

Parameters:

  • credential (dict): The credential data from Google Cloud Console
  • scopes (list, optional): List of OAuth scopes. Defaults to Gmail send scope
  • write_to_file (bool, optional): Whether to write token to file. Defaults to False

Note: This function requires manual browser authorization.

Example:

import json
from crumblpy import generate_token

# Load your credentials from Google Cloud Console
credentials = json.load(open('credentials.json'))

generate_token(credentials, write_to_file=True)

⚠️ Security Warning: This example shows local development usage. In production, manage credentials securely using Doppler or Prefect blocks rather than storing them in JSON files.


Snowflake Module

The Snowflake module provides a toolkit for connecting to and interacting with Snowflake databases.

SnowflakeToolKit Class

__init__(prefect=False, user=None, password=None, role=None, schema='DATA_SCIENCE', warehouse='DATA_SCIENCE_TEAM')

Initialize the Snowflake connection.

Parameters:

  • prefect (bool, optional): Use Prefect secrets for authentication. Defaults to False
  • user (str, optional): Snowflake username
  • password (str, optional): Snowflake password
  • role (str, optional): Snowflake role
  • schema (str, optional): Default schema. Defaults to 'DATA_SCIENCE'
  • warehouse (str, optional): Snowflake warehouse. Defaults to 'DATA_SCIENCE_TEAM'

Methods

connect()

Establishes connection to Snowflake.

fetch_data(sql_query)

Fetch data from Snowflake using a SQL query.

Parameters:

  • sql_query (str): SQL query to execute

Returns:

  • pandas.DataFrame: Query results as a DataFrame
insert_data(df, table_name, auto_create_table=False)

Insert pandas DataFrame into Snowflake table.

Parameters:

  • df (pandas.DataFrame): DataFrame to insert
  • table_name (str): Target table name
  • auto_create_table (bool, optional): Whether to auto-create table. Defaults to False
execute_query(sql_query)

Execute a SQL query in Snowflake (useful for DML queries).

Parameters:

  • sql_query (str): SQL query to execute

Example:

from crumblpy import SnowflakeToolKit
import pandas as pd

# Initialize with environment variables.
sf = SnowflakeToolKit()

# Or initialize with explicit credentials (local development only)
sf = SnowflakeToolKit(
    user='your_username',
    password='your_password',
    role='your_role'
)

# For production, use Prefect blocks
sf = SnowflakeToolKit(prefect=True)

# Fetch data
df = sf.fetch_data("SELECT * FROM your_table LIMIT 100")

# Insert data
new_data = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['a', 'b', 'c']})
sf.insert_data(new_data, 'your_target_table', auto_create_table=True)

# Execute query
sf.execute_query("UPDATE your_table SET col1 = 0 WHERE col2 = 'a'")

⚠️ Security Warning: Explicit credentials shown above are for local experimentation only. In production environments, use prefect=True parameter to leverage Prefect blocks or use Doppler for secure credential management.


Slack Module

The Slack module provides integration with Slack for sending messages and files.

SlackToolKit Class

__init__(prefect=False, token=None, default_channel='U04RAQM788L')

Initialize the Slack client.

Parameters:

  • prefect (bool, optional): Use Prefect secrets for authentication. Defaults to False
  • token (str, optional): Slack bot token
  • default_channel (str, optional): Default channel ID. Defaults to 'U04RAQM788L'

Methods

post_message(message=None, channel=None, thread_id=None, blocks=None)

Send a message to a Slack channel.

Parameters:

  • message (str, optional): Message text
  • channel (str, optional): Channel ID or user ID
  • thread_id (str, optional): Thread timestamp for threaded messages
  • blocks (list, optional): Slack Block Kit blocks
post_file(file_path, message, channel=None, thread_id=None)

Upload a file to Slack channel.

Parameters:

  • file_path (str): Path to the file to upload
  • message (str): Message to accompany the file
  • channel (str, optional): Channel ID or user ID
  • thread_id (str, optional): Thread timestamp

Note: This method automatically deletes the file after upload.

get_thread_id(channel)

Get the timestamp of the most recent message in a channel.

Parameters:

  • channel (str): Channel ID

Returns:

  • str: Thread timestamp
push_notification(project=None, channel=None, e=None)

Send a notification about project status.

Parameters:

  • project (str, optional): Project name
  • channel (str, optional): Channel ID
  • e (Exception, optional): Exception object if there was an error

Example:

from crumblpy import SlackToolKit

# Initialize with environment variable
slack = SlackToolKit()

# Or initialize with explicit token (local development only)
slack = SlackToolKit(token='your-slack-token')

# For production, use Prefect blocks
slack = SlackToolKit(prefect=True)

# Send a message
slack.post_message("Hello from CrumblPy!", channel='your-channel-id')

# Send a file
slack.post_file('report.pdf', 'Here is the daily report', channel='your-channel-id')

# Send notification
slack.push_notification(project='Data Pipeline', channel='your-channel-id')

# Send error notification
try:
    # Some operation that might fail
    pass
except Exception as e:
    slack.push_notification(project='Data Pipeline', channel='#alerts', e=e)

⚠️ Security Warning: Examples showing explicit tokens are for local experimentation only. In production environments, use prefect=True parameter to leverage Prefect blocks or use Doppler for secure credential management.


AWS Module

The AWS module provides integration with Amazon S3 for reading and writing compressed JSON data, and DynamoDB for scanning tables.

ℹ️ Prerequisite: Install the AWS CLI before using AWSToolKit.

AWSToolKit Class

__init__(aws_access_key_id=None, aws_secret_access_key=None, prefect=False)

Initialize the AWS clients.

Parameters:

  • aws_access_key_id (str, optional): AWS access key ID
  • aws_secret_access_key (str, optional): AWS secret access key
  • prefect (bool, optional): Use Prefect secrets for authentication. Defaults to False

Methods

write_to_s3(df, bucket_name, key)

Write pandas DataFrame to S3 as compressed JSON.

Parameters:

  • df (pandas.DataFrame): DataFrame to write
  • bucket_name (str): S3 bucket name
  • key (str): S3 object key/path

Note: Data is automatically compressed using gzip and stored in JSON Lines format.

read_from_s3(bucket_name, key)

Read compressed JSON data from S3 and return as pandas DataFrame.

Parameters:

  • bucket_name (str): S3 bucket name
  • key (str): S3 object key/path

Returns:

  • pandas.DataFrame: Data from S3 as a DataFrame
scan_dynamodb_table(table_name, filter_expression=None, expression_attribute_values=None, projection_expression=None, expression_attribute_names=None)

Scan DynamoDB table completely using pagination and return as pandas DataFrame.

Parameters:

  • table_name (str): DynamoDB table name
  • filter_expression (str, optional): Filter expression for the scan
  • expression_attribute_values (dict, optional): Expression attribute values
  • projection_expression (str, optional): Projection expression to specify attributes to retrieve
  • expression_attribute_names (dict, optional): Expression attribute names for reserved keywords

Returns:

  • pandas.DataFrame: All items from DynamoDB table as a DataFrame

Note: This method automatically handles pagination using LastEvaluatedKey to retrieve all records.

DynamoDB Parameter Guide:

  • filter_expression: Use placeholders like :value for values and #attr for attribute names
  • expression_attribute_values: Dictionary mapping placeholders (:key) to actual values
  • expression_attribute_names: Dictionary mapping placeholders (#key) to actual attribute names (required for reserved keywords)
  • projection_expression: Comma-separated list of attributes to retrieve (use #attr for reserved keywords)

Common Filter Expression Operators:

  • Equality: attribute = :value
  • Comparison: attribute > :value, attribute < :value, attribute >= :value, attribute <= :value
  • Between: attribute BETWEEN :low AND :high
  • Contains: contains(attribute, :value)
  • Multiple conditions: Use AND, OR, NOT

Example:

from crumblpy import AWSToolKit
import pandas as pd

# Initialize with environment variables
aws = AWSToolKit()

# Or initialize with explicit credentials (local development only)
aws = AWSToolKit(
    aws_access_key_id='your_access_key',
    aws_secret_access_key='your_secret_key'
)

# For production, use Prefect blocks
aws = AWSToolKit(prefect=True)

# Write DataFrame to S3
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['a', 'b', 'c']})
aws.write_to_s3(df, 'your-bucket-name', 'data/output.json.gz')

# Read DataFrame from S3
loaded_df = aws.read_from_s3('your-bucket-name', 'data/output.json.gz')

# Scan DynamoDB table with filters (basic example)
df_dynamo = aws.scan_dynamodb_table(
    table_name='ProductModifierOptionOverride_prod',
    filter_expression='marketingType = :mt AND overrideType = :ot',
    expression_attribute_values={
        ':mt': 'MYSTERY_PICK', 
        ':ot': 'STORE_CHOICE'
    },
    projection_expression='metadata, storeId, startDate, createdAt'
)

# Scan with reserved keywords (using expression_attribute_names)
df_with_reserved = aws.scan_dynamodb_table(
    table_name='Store_prod',
    filter_expression='#status = :status_val AND #date > :date_val',
    expression_attribute_names={
        '#status': 'status',  # 'status' is a reserved keyword
        '#date': 'startDate'       # 'date' is a reserved keyword  
    },
    expression_attribute_values={
        ':status_val': 'ACTIVE',
        ':date_val': '2025-01-01'
    }
)

# For more advanced examples, check out the boto3 docs.

# Scan entire table without filters
all_items = aws.scan_dynamodb_table('your-table-name')

⚠️ Security Warning: Explicit credentials shown above are for local experimentation only. In production environments, use prefect=True parameter to leverage Prefect blocks or use Doppler for secure credential management.


Environment Variables

CrumblPy uses the following environment variables when explicit credentials are not provided:

  • SNOWFLAKE_USER: Snowflake username
  • SNOWFLAKE_PASSWORD: Snowflake password
  • SLACK_TOKEN: Slack bot token
  • AWS_ACCESS_KEY_ID: AWS access key ID
  • AWS_SECRET_ACCESS_KEY: AWS secret access key

Authentication Setup

🔒 Production Security Note: The setup instructions below are primarily for local development and experimentation. For production deployments, always use secure credential management solutions like Doppler or Prefect blocks instead of environment variables or local credential files.

Gmail API Setup

  1. Go to Google Cloud Console
  2. Create a new project or select existing one
  3. Enable Gmail API
  4. Create credentials (OAuth 2.0 Client ID)
  5. Download credentials JSON file
  6. Use generate_token() function to create authentication token

Snowflake Setup

Set environment variables or use explicit credentials:

export SNOWFLAKE_USER="your_username"
export SNOWFLAKE_PASSWORD="your_password"

Slack Setup

  1. Create a Slack app at api.slack.com
  2. Add bot token scopes: chat:write, files:write, channels:history
  3. Install app to workspace
  4. Copy Bot User OAuth Token
  5. Set environment variable:
export SLACK_TOKEN="xoxb-your-token-here"

AWS S3 Setup

  1. Create AWS account or use existing one
  2. Go to AWS IAM Console
  3. Create a new user or use existing one
  4. Attach appropriate S3 permissions (e.g., AmazonS3FullAccess or custom policy)
  5. Create access keys for the user
  6. Set environment variables:
export AWS_ACCESS_KEY_ID="your_access_key_id"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

crumblpy-1.1.9.tar.gz (14.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

crumblpy-1.1.9-py3-none-any.whl (17.0 kB view details)

Uploaded Python 3

File details

Details for the file crumblpy-1.1.9.tar.gz.

File metadata

  • Download URL: crumblpy-1.1.9.tar.gz
  • Upload date:
  • Size: 14.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.16

File hashes

Hashes for crumblpy-1.1.9.tar.gz
Algorithm Hash digest
SHA256 a497132054b2e0da76716ca88e3cf456e78e196ba3174887325ad83a946186ad
MD5 b9956dfdb70bbba53cc5f13f20a8e5f6
BLAKE2b-256 43de5ab982f0f2d6280cb551eb1133507a3b807e147cf532dd293c8f174c8ca1

See more details on using hashes here.

File details

Details for the file crumblpy-1.1.9-py3-none-any.whl.

File metadata

  • Download URL: crumblpy-1.1.9-py3-none-any.whl
  • Upload date:
  • Size: 17.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.16

File hashes

Hashes for crumblpy-1.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 4cbdb52df3da35ed9e1394318a880c9f5cac65dae8c60469cde1554e472f9324
MD5 aac943940cb945cf3ae2e2173673473a
BLAKE2b-256 b6707b6475ad3673d672aaee2fc321dddf99ed1edb249dd2cd6e8459a2c139f8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page