A collection of my frequently used utilities for data science projects
Project description
DS-Utilities
A collection of data science utilities and helper functions for common data workflows. This package provides reusable tools to streamline data science projects, including data pipeline frameworks, data loading strategies, and transformation utilities.
Features
- Data Pipeline Framework: Flexible and extensible pipeline architecture for loading and transforming data
- Multiple Data Sources: Support for Snowflake databases and CSV files
- Strategy Pattern: Extensible design that makes adding new data sources straightforward
- Composable Operations: Chain multiple data operations together in a declarative way
- Type Safety: Full type hints for better IDE support and code reliability
- Unified Interface: Consistent API across different data source types
Installation
# Install required dependencies
pip install pandas snowflake-connector-python
# Clone or install the package
pip install -e .
Requirements
- Python 3.8+
- pandas
- snowflake-connector-python (for Snowflake integration)
Environment variables for Snowflake connection:
SNOWFLAKE_USERSNOWFLAKE_ACCTSNOWFLAKE_ROLE
Data Pipeline Utilities
The data pipeline module provides a flexible framework for loading and transforming data from multiple sources through a strategy pattern design with composable data transformation pipelines.
Architecture
The pipeline utilities are built around three main components:
- DataStrategy (Abstract Base Class): Defines the interface for data loading strategies
- DataConnector: Factory class that manages strategy selection and execution
- DataPipeline: Orchestrates multiple data operations in a sequential pipeline
Design Pattern
The package uses the Strategy Pattern to separate data loading logic from the client code. This allows:
- Easy addition of new data sources without modifying existing code
- Consistent interface across different data sources
- Better testability through dependency injection
Classes
DataStrategy (Abstract Base Class)
Base class for all data loading strategies.
from abc import ABC, abstractmethod
from pandas import DataFrame
class DataStrategy(ABC):
@abstractmethod
def __init__(self, connection):
self.connection = connection
self._source_list: list[str] = None
@abstractmethod
def load(self, sources: Union[str, list[str]]) -> DataFrame:
...
CsvStrategy
Loads data from CSV files.
Parameters:
root_dir(str): Root directory containing CSV files
Methods:
load(sources): Load one or more CSV files
Example:
from ds_utilities import CsvStrategy
# Initialize with root directory
strategy = CsvStrategy("/path/to/data")
# Load single file
data = strategy.load("sales.csv")
# Load multiple files
data_list = strategy.load(["sales.csv", "customers.csv"])
SnowflakeStrategy
Executes SQL queries against Snowflake database.
Parameters:
connector: Snowflake connection object
Methods:
load(sources): Execute one or more SQL queries
Example:
import snowflake.connector
from ds_utilities import SnowflakeStrategy
# Create Snowflake connection
conn = snowflake.connector.connect(
user=os.environ['SNOWFLAKE_USER'],
account=os.environ['SNOWFLAKE_ACCT'],
role=os.environ['SNOWFLAKE_ROLE'],
authenticator='externalbrowser'
)
# Initialize strategy
strategy = SnowflakeStrategy(conn)
# Execute single query
query = "SELECT * FROM sales WHERE date >= '2024-01-01'"
data = strategy.load(query)
# Execute multiple queries
queries = [
"SELECT * FROM sales",
"SELECT * FROM customers"
]
data_list = strategy.load(queries)
DataConnector
Factory class for creating and managing data strategies.
Parameters:
type(str): Type of data source - must be "snowflake" or "csv"connection: Connection object appropriate for the strategy type
Methods:
load_data(sources): Load data using the configured strategy
Example:
from ds_utilities import DataConnector
# CSV connector
csv_connector = DataConnector("csv", "/path/to/data")
data = csv_connector.load_data("sales.csv")
# Snowflake connector
sf_connector = DataConnector("snowflake", snowflake_conn)
data = sf_connector.load_data("SELECT * FROM sales")
# Check connector type
print(sf_connector) # Output: "snowflake"
DataPipeline
Chains multiple data operations together in a sequential pipeline.
Methods:
add_stage(stage, **kwargs): Add a processing stage to the pipelinerun(): Execute all stages in order
Example:
from ds_utilities import DataPipeline
import pandas as pd
# Create pipeline
pipeline = DataPipeline()
# Define stages
def load_data(file_path, data=None):
return pd.read_csv(file_path)
def clean_data(data):
return data.dropna()
def transform_data(column_name, data):
data[f'{column_name}_squared'] = data[column_name] ** 2
return data
# Add stages to pipeline
pipeline.add_stage(load_data, file_path="data/sales.csv")
pipeline.add_stage(clean_data)
pipeline.add_stage(transform_data, column_name='revenue')
# Execute pipeline
result = pipeline.run()
Usage Examples
Example 1: Loading CSV Data
import os
from ds_utilities import DataConnector
# Set up connector
connector = DataConnector("csv", os.getcwd())
# Load single file
media_data = connector.load_data("data/media.csv")
print(media_data.head())
# Load multiple files
data_files = ["data/media.csv", "data/nrmps.csv"]
datasets = connector.load_data(data_files)
print(f"Loaded {len(datasets)} datasets")
Example 2: Querying Snowflake
import os
import snowflake.connector
from ds_utilities import DataConnector
# Connect to Snowflake
conn = snowflake.connector.connect(
user=os.environ['SNOWFLAKE_USER'],
account=os.environ['SNOWFLAKE_ACCT'],
role=os.environ['SNOWFLAKE_ROLE'],
authenticator='externalbrowser'
)
# Create connector
connector = DataConnector("snowflake", conn)
# Execute query
query = """
SELECT
DATE_TRUNC(WEEK, DATE) AS week,
STATE,
SUM(COST) AS total_cost
FROM advertising
GROUP BY week, STATE
"""
data = connector.load_data(query)
print(data.head())
Example 3: Building a Data Pipeline
from ds_utilities import DataPipeline, DataConnector
import pandas as pd
# Create pipeline
pipeline = DataPipeline()
# Stage 1: Load data
def load_data(file_path, data=None):
return pd.read_csv(file_path)
# Stage 2: Filter data
def filter_data(data):
return data[data['COST'] > 1000]
# Stage 3: Aggregate by state
def aggregate_data(data):
return data.groupby('STATE').agg({
'COST': 'sum',
'CLICKS': 'sum',
'IMPRESSIONS': 'sum'
}).reset_index()
# Stage 4: Convert to dictionary
def to_dict(data):
return data.to_dict()
# Build pipeline
pipeline.add_stage(load_data, file_path="data/media.csv")
pipeline.add_stage(filter_data)
pipeline.add_stage(aggregate_data)
pipeline.add_stage(to_dict)
# Execute
result = pipeline.run()
print(result)
Example 4: Using Lambda Functions in Pipelines
from ds_utilities import DataPipeline
import pandas as pd
pipeline = DataPipeline()
def load_data(file_path, data=None):
return pd.read_csv(file_path)
pipeline.add_stage(load_data, file_path="data/media.csv")
pipeline.add_stage(lambda data: data[['WEEK_T', 'STATE', 'COST']])
pipeline.add_stage(lambda data: data.sort_values('COST', ascending=False))
result = pipeline.run()
print(result.head())
Example 5: Integrating Connector with Pipeline
from ds_utilities import DataPipeline, DataConnector
import os
pipeline = DataPipeline()
# Stage 1: Load from CSV
def load_csv(filename, data=None):
connector = DataConnector("csv", os.getcwd())
return connector.load_data(filename)
# Stage 2: Transform
def calculate_metrics(data):
data['CPM'] = (data['COST'] / data['IMPRESSIONS']) * 1000
data['CPC'] = data['COST'] / data['CLICKS']
return data
# Stage 3: Export
def save_results(output_file, data):
data.to_csv(output_file, index=False)
return data
pipeline.add_stage(load_csv, filename="data/media.csv")
pipeline.add_stage(calculate_metrics)
pipeline.add_stage(save_results, output_file="data/processed_media.csv")
result = pipeline.run()
Extending the Data Pipeline
Adding a New Data Source
To add support for a new data source, create a new strategy class:
from ds_utilities import DataStrategy
from typing import Union
from pandas import DataFrame
class PostgresStrategy(DataStrategy):
def __init__(self, connection):
super().__init__(connection)
def load(self, sources: Union[str, list[str]]) -> Union[DataFrame, list[DataFrame]]:
super().load(sources)
cursor = self.connection.cursor()
results = []
for source in self._source_list:
cursor.execute(source)
df = DataFrame(cursor.fetchall())
df.columns = [desc[0] for desc in cursor.description]
results.append(df)
return results[0] if len(results) == 1 else results
# Register the strategy
DataConnector.STRATEGIES['postgres'] = PostgresStrategy
# Use it
connector = DataConnector('postgres', postgres_connection)
data = connector.load_data("SELECT * FROM table")
Common Pipeline Patterns
Pattern 1: ETL Pipeline
pipeline = DataPipeline()
pipeline.add_stage(extract_from_source) # Extract
pipeline.add_stage(clean_data) # Transform
pipeline.add_stage(validate_schema) # Transform
pipeline.add_stage(load_to_destination) # Load
result = pipeline.run()
Pattern 2: Multi-Source Aggregation
def load_and_combine(sources, data=None):
connector = DataConnector("csv", "./data")
datasets = connector.load_data(sources)
return pd.concat(datasets, ignore_index=True)
pipeline = DataPipeline()
pipeline.add_stage(load_and_combine, sources=["sales_q1.csv", "sales_q2.csv"])
pipeline.add_stage(aggregate_by_region)
result = pipeline.run()
Testing
Run the test suite:
pytest tests/ -v
Run specific test class:
pytest tests/test_data_pipeline.py::TestDataPipeline -v
Run with coverage:
pytest tests/ --cov=ds_utilities --cov-report=html
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ds_utilities_dkirby-0.1.0.tar.gz.
File metadata
- Download URL: ds_utilities_dkirby-0.1.0.tar.gz
- Upload date:
- Size: 10.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b58dd3df6523f07b10a73d78cbcdc4a2e4b9bfbcb4dd2d0eda4617407ad69464
|
|
| MD5 |
72cd13f3790bb81022f8770d8b6576b9
|
|
| BLAKE2b-256 |
94780d6f774183d49199c866b6107fb1efa9ae50a48b5d580300c0dbd25560ab
|
File details
Details for the file ds_utilities_dkirby-0.1.0-py3-none-any.whl.
File metadata
- Download URL: ds_utilities_dkirby-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
849bd6d030cabd0156013786adcfeb6559755077efb99b61cc0f71693e5f26db
|
|
| MD5 |
f7b55a7145cb19d54e6250b9cc8a73ed
|
|
| BLAKE2b-256 |
23c3b149576ca262f8b4c566bd439d9e0235c2cc2864f4ac7fc20dfc033ba5ab
|