Skip to main content

A collection of my frequently used utilities for data science projects

Project description

DS-Utilities

A collection of data science utilities and helper functions for common data workflows. This package provides reusable tools to streamline data science projects, including data pipeline frameworks, data loading strategies, and transformation utilities.

Features

  • Data Pipeline Framework: Flexible and extensible pipeline architecture for loading and transforming data
  • Multiple Data Sources: Support for Snowflake databases and CSV files
  • Strategy Pattern: Extensible design that makes adding new data sources straightforward
  • Composable Operations: Chain multiple data operations together in a declarative way
  • Type Safety: Full type hints for better IDE support and code reliability
  • Unified Interface: Consistent API across different data source types

Installation

# Install required dependencies
pip install pandas snowflake-connector-python

# Clone or install the package
pip install -e .

Requirements

  • Python 3.8+
  • pandas
  • snowflake-connector-python (for Snowflake integration)

Environment variables for Snowflake connection:

  • SNOWFLAKE_USER
  • SNOWFLAKE_ACCT
  • SNOWFLAKE_ROLE

Data Pipeline Utilities

The data pipeline module provides a flexible framework for loading and transforming data from multiple sources through a strategy pattern design with composable data transformation pipelines.

Architecture

The pipeline utilities are built around three main components:

  1. DataStrategy (Abstract Base Class): Defines the interface for data loading strategies
  2. DataConnector: Factory class that manages strategy selection and execution
  3. DataPipeline: Orchestrates multiple data operations in a sequential pipeline

Design Pattern

The package uses the Strategy Pattern to separate data loading logic from the client code. This allows:

  • Easy addition of new data sources without modifying existing code
  • Consistent interface across different data sources
  • Better testability through dependency injection

Classes

DataStrategy (Abstract Base Class)

Base class for all data loading strategies.

from abc import ABC, abstractmethod
from pandas import DataFrame

class DataStrategy(ABC):
    @abstractmethod
    def __init__(self, connection):
        self.connection = connection
        self._source_list: list[str] = None
    
    @abstractmethod
    def load(self, sources: Union[str, list[str]]) -> DataFrame:
        ...

CsvStrategy

Loads data from CSV files.

Parameters:

  • root_dir (str): Root directory containing CSV files

Methods:

  • load(sources): Load one or more CSV files

Example:

from ds_utilities import CsvStrategy

# Initialize with root directory
strategy = CsvStrategy("/path/to/data")

# Load single file
data = strategy.load("sales.csv")

# Load multiple files
data_list = strategy.load(["sales.csv", "customers.csv"])

SnowflakeStrategy

Executes SQL queries against Snowflake database.

Parameters:

  • connector: Snowflake connection object

Methods:

  • load(sources): Execute one or more SQL queries

Example:

import snowflake.connector
from ds_utilities import SnowflakeStrategy

# Create Snowflake connection
conn = snowflake.connector.connect(
    user=os.environ['SNOWFLAKE_USER'],
    account=os.environ['SNOWFLAKE_ACCT'],
    role=os.environ['SNOWFLAKE_ROLE'],
    authenticator='externalbrowser'
)

# Initialize strategy
strategy = SnowflakeStrategy(conn)

# Execute single query
query = "SELECT * FROM sales WHERE date >= '2024-01-01'"
data = strategy.load(query)

# Execute multiple queries
queries = [
    "SELECT * FROM sales",
    "SELECT * FROM customers"
]
data_list = strategy.load(queries)

DataConnector

Factory class for creating and managing data strategies.

Parameters:

  • type (str): Type of data source - must be "snowflake" or "csv"
  • connection: Connection object appropriate for the strategy type

Methods:

  • load_data(sources): Load data using the configured strategy

Example:

from ds_utilities import DataConnector

# CSV connector
csv_connector = DataConnector("csv", "/path/to/data")
data = csv_connector.load_data("sales.csv")

# Snowflake connector
sf_connector = DataConnector("snowflake", snowflake_conn)
data = sf_connector.load_data("SELECT * FROM sales")

# Check connector type
print(sf_connector)  # Output: "snowflake"

DataPipeline

Chains multiple data operations together in a sequential pipeline.

Methods:

  • add_stage(stage, **kwargs): Add a processing stage to the pipeline
  • run(): Execute all stages in order

Example:

from ds_utilities import DataPipeline
import pandas as pd

# Create pipeline
pipeline = DataPipeline()

# Define stages
def load_data(file_path, data=None):
    return pd.read_csv(file_path)

def clean_data(data):
    return data.dropna()

def transform_data(column_name, data):
    data[f'{column_name}_squared'] = data[column_name] ** 2
    return data

# Add stages to pipeline
pipeline.add_stage(load_data, file_path="data/sales.csv")
pipeline.add_stage(clean_data)
pipeline.add_stage(transform_data, column_name='revenue')

# Execute pipeline
result = pipeline.run()

Usage Examples

Example 1: Loading CSV Data

import os
from ds_utilities import DataConnector

# Set up connector
connector = DataConnector("csv", os.getcwd())

# Load single file
media_data = connector.load_data("data/media.csv")
print(media_data.head())

# Load multiple files
data_files = ["data/media.csv", "data/nrmps.csv"]
datasets = connector.load_data(data_files)
print(f"Loaded {len(datasets)} datasets")

Example 2: Querying Snowflake

import os
import snowflake.connector
from ds_utilities import DataConnector

# Connect to Snowflake
conn = snowflake.connector.connect(
    user=os.environ['SNOWFLAKE_USER'],
    account=os.environ['SNOWFLAKE_ACCT'],
    role=os.environ['SNOWFLAKE_ROLE'],
    authenticator='externalbrowser'
)

# Create connector
connector = DataConnector("snowflake", conn)

# Execute query
query = """
    SELECT 
        DATE_TRUNC(WEEK, DATE) AS week,
        STATE,
        SUM(COST) AS total_cost
    FROM advertising
    GROUP BY week, STATE
"""

data = connector.load_data(query)
print(data.head())

Example 3: Building a Data Pipeline

from ds_utilities import DataPipeline, DataConnector
import pandas as pd

# Create pipeline
pipeline = DataPipeline()

# Stage 1: Load data
def load_data(file_path, data=None):
    return pd.read_csv(file_path)

# Stage 2: Filter data
def filter_data(data):
    return data[data['COST'] > 1000]

# Stage 3: Aggregate by state
def aggregate_data(data):
    return data.groupby('STATE').agg({
        'COST': 'sum',
        'CLICKS': 'sum',
        'IMPRESSIONS': 'sum'
    }).reset_index()

# Stage 4: Convert to dictionary
def to_dict(data):
    return data.to_dict()

# Build pipeline
pipeline.add_stage(load_data, file_path="data/media.csv")
pipeline.add_stage(filter_data)
pipeline.add_stage(aggregate_data)
pipeline.add_stage(to_dict)

# Execute
result = pipeline.run()
print(result)

Example 4: Using Lambda Functions in Pipelines

from ds_utilities import DataPipeline
import pandas as pd

pipeline = DataPipeline()

def load_data(file_path, data=None):
    return pd.read_csv(file_path)

pipeline.add_stage(load_data, file_path="data/media.csv")
pipeline.add_stage(lambda data: data[['WEEK_T', 'STATE', 'COST']])
pipeline.add_stage(lambda data: data.sort_values('COST', ascending=False))

result = pipeline.run()
print(result.head())

Example 5: Integrating Connector with Pipeline

from ds_utilities import DataPipeline, DataConnector
import os

pipeline = DataPipeline()

# Stage 1: Load from CSV
def load_csv(filename, data=None):
    connector = DataConnector("csv", os.getcwd())
    return connector.load_data(filename)

# Stage 2: Transform
def calculate_metrics(data):
    data['CPM'] = (data['COST'] / data['IMPRESSIONS']) * 1000
    data['CPC'] = data['COST'] / data['CLICKS']
    return data

# Stage 3: Export
def save_results(output_file, data):
    data.to_csv(output_file, index=False)
    return data

pipeline.add_stage(load_csv, filename="data/media.csv")
pipeline.add_stage(calculate_metrics)
pipeline.add_stage(save_results, output_file="data/processed_media.csv")

result = pipeline.run()

Extending the Data Pipeline

Adding a New Data Source

To add support for a new data source, create a new strategy class:

from ds_utilities import DataStrategy
from typing import Union
from pandas import DataFrame

class PostgresStrategy(DataStrategy):
    def __init__(self, connection):
        super().__init__(connection)
    
    def load(self, sources: Union[str, list[str]]) -> Union[DataFrame, list[DataFrame]]:
        super().load(sources)
        cursor = self.connection.cursor()
        results = []
        
        for source in self._source_list:
            cursor.execute(source)
            df = DataFrame(cursor.fetchall())
            df.columns = [desc[0] for desc in cursor.description]
            results.append(df)
        
        return results[0] if len(results) == 1 else results

# Register the strategy
DataConnector.STRATEGIES['postgres'] = PostgresStrategy

# Use it
connector = DataConnector('postgres', postgres_connection)
data = connector.load_data("SELECT * FROM table")

Common Pipeline Patterns

Pattern 1: ETL Pipeline

pipeline = DataPipeline()
pipeline.add_stage(extract_from_source)  # Extract
pipeline.add_stage(clean_data)           # Transform
pipeline.add_stage(validate_schema)      # Transform
pipeline.add_stage(load_to_destination)  # Load
result = pipeline.run()

Pattern 2: Multi-Source Aggregation

def load_and_combine(sources, data=None):
    connector = DataConnector("csv", "./data")
    datasets = connector.load_data(sources)
    return pd.concat(datasets, ignore_index=True)

pipeline = DataPipeline()
pipeline.add_stage(load_and_combine, sources=["sales_q1.csv", "sales_q2.csv"])
pipeline.add_stage(aggregate_by_region)
result = pipeline.run()

Testing

Run the test suite:

pytest tests/ -v

Run specific test class:

pytest tests/test_data_pipeline.py::TestDataPipeline -v

Run with coverage:

pytest tests/ --cov=ds_utilities --cov-report=html

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ds_utilities_dkirby-0.1.0.tar.gz (10.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ds_utilities_dkirby-0.1.0-py3-none-any.whl (6.8 kB view details)

Uploaded Python 3

File details

Details for the file ds_utilities_dkirby-0.1.0.tar.gz.

File metadata

  • Download URL: ds_utilities_dkirby-0.1.0.tar.gz
  • Upload date:
  • Size: 10.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for ds_utilities_dkirby-0.1.0.tar.gz
Algorithm Hash digest
SHA256 b58dd3df6523f07b10a73d78cbcdc4a2e4b9bfbcb4dd2d0eda4617407ad69464
MD5 72cd13f3790bb81022f8770d8b6576b9
BLAKE2b-256 94780d6f774183d49199c866b6107fb1efa9ae50a48b5d580300c0dbd25560ab

See more details on using hashes here.

File details

Details for the file ds_utilities_dkirby-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for ds_utilities_dkirby-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 849bd6d030cabd0156013786adcfeb6559755077efb99b61cc0f71693e5f26db
MD5 f7b55a7145cb19d54e6250b9cc8a73ed
BLAKE2b-256 23c3b149576ca262f8b4c566bd439d9e0235c2cc2864f4ac7fc20dfc033ba5ab

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page