Skip to main content

Copy-pasteable data transformation primitives for PySpark. Inspired by shadcn-svelte.

Project description

Datacompose

A powerful data transformation framework for building reusable, composable data cleaning pipelines in PySpark.

Overview

Datacompose provides a declarative way to build data transformation pipelines using composable primitives. It generates optimized, standalone PySpark code that can be deployed without runtime dependencies.

Key Features

  • Composable Primitives: Build complex transformations from simple, reusable functions
  • Smart Partial Application: Configure transformations with parameters for reuse
  • Pipeline Compilation: Convert declarative pipeline definitions into optimized Spark operations
  • Code Generation: Generate standalone PySpark code with embedded dependencies
  • Comprehensive Libraries: Pre-built primitives for emails, addresses, and phone numbers
  • Conditional Logic: Support for if/else branching in pipelines
  • Type-Safe Operations: All transformations maintain Spark column type safety

Installation

pip install datacompose

Quick Start

1. Initialize a Project

datacompose init

This creates a datacompose.json configuration file with default settings.

2. Generate Transformation Code

# Generate email cleaning primitives
datacompose add clean_emails --target pyspark

# Generate address standardization primitives  
datacompose add clean_addresses --target pyspark

# Generate phone number validation primitives
datacompose add clean_phone_numbers --target pyspark

3. Use the Generated Code

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Import the generated primitives
from build.pyspark.clean_emails.email_primitives import emails

# Create Spark session
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

# Load your data
df = spark.read.csv("data.csv", header=True)

# Apply email transformations
cleaned_df = df.withColumn(
    "email_clean",
    emails.standardize_email(F.col("email"))
).withColumn(
    "email_domain",
    emails.extract_domain(F.col("email_clean"))
).withColumn(
    "is_valid",
    emails.is_valid_email(F.col("email_clean"))
)

# Filter to valid emails only
valid_emails = cleaned_df.filter(F.col("is_valid"))

Core Concepts

PrimitiveRegistry

A container for organizing related transformation functions:

from datacompose.operators.primitives import PrimitiveRegistry

# Create a registry for text operations
text = PrimitiveRegistry("text")

# Register transformation functions
@text.register()
def lowercase(col):
    return F.lower(col)

@text.register()
def remove_spaces(col):
    return F.regexp_replace(col, r'\s+', '')

# Use the transformations
df = df.withColumn("clean_text", text.lowercase(F.col("input")))

SmartPrimitive

Enables partial application of transformations:

@text.register()
def trim(col, chars=' '):
    return F.trim(col, chars)

# Direct usage
df = df.withColumn("trimmed", text.trim(F.col("input")))

# Pre-configured usage
trim_tabs = text.trim(chars='\t')
df = df.withColumn("no_tabs", trim_tabs(F.col("input")))

Pipeline Composition

Build complex pipelines from simple primitives:

@text.compose(text=text)
def clean_pipeline():
    text.trim()
    text.lowercase()
    text.remove_spaces()

# Apply the entire pipeline
df = df.withColumn("cleaned", clean_pipeline(F.col("input")))

Conditional Pipelines

Add conditional logic to your transformations:

@text.register(is_conditional=True)
def is_valid_length(col):
    return F.length(col) > 5

@text.register()
def truncate(col):
    return F.substring(col, 1, 5)

@text.compose(text=text)
def smart_truncate():
    if text.is_valid_length():
        text.truncate()

Available Primitives

Email Primitives

from build.pyspark.clean_emails.email_primitives import emails

# Validation
emails.is_valid_email(col)
emails.is_business_email(col)
emails.is_disposable_email(col)

# Extraction
emails.extract_domain(col)
emails.extract_username(col)
emails.extract_tld(col)

# Standardization
emails.standardize_email(col)
emails.normalize_gmail(col)
emails.fix_common_typos(col)

# Filtering
emails.filter_valid_emails(col)
emails.filter_business_emails(col)

Address Primitives

from build.pyspark.clean_addresses.address_primitives import addresses

# Extraction
addresses.extract_street_number(col)
addresses.extract_street_name(col)
addresses.extract_city(col)
addresses.extract_state(col)
addresses.extract_zip_code(col)

# Standardization
addresses.standardize_state(col)
addresses.standardize_street_suffix(col)
addresses.standardize_direction(col)

# Validation
addresses.is_valid_zip_code(col)
addresses.is_valid_state(col)
addresses.is_po_box(col)

Phone Number Primitives

from build.pyspark.clean_phone_numbers.phone_primitives import phones

# Validation
phones.is_valid_nanp(col)
phones.is_valid_international(col)
phones.is_toll_free(col)

# Extraction
phones.extract_country_code(col)
phones.extract_area_code(col)
phones.extract_exchange(col)
phones.extract_subscriber(col)

# Formatting
phones.format_nanp(col)
phones.format_e164(col)
phones.format_international(col)

# Standardization
phones.standardize_phone(col)
phones.clean_phone(col)

Advanced Usage

Creating Custom Primitives

from datacompose.operators.primitives import PrimitiveRegistry

# Create your own registry
custom = PrimitiveRegistry("custom")

@custom.register()
def remove_special_chars(col):
    return F.regexp_replace(col, r'[^a-zA-Z0-9\s]', '')

@custom.register()
def capitalize_words(col):
    return F.initcap(col)

@custom.register(is_conditional=True)
def contains_numbers(col):
    return col.rlike(r'\d+')

# Create a pipeline with your custom primitives
@custom.compose(custom=custom)
def clean_text():
    custom.remove_special_chars()
    if custom.contains_numbers():
        custom.capitalize_words()

Working with Parameters

@custom.register()
def pad_string(col, length=10, fill_char='0'):
    return F.lpad(col, length, fill_char)

# Use with different parameters
df = df.withColumn("padded_10", custom.pad_string(F.col("id")))
df = df.withColumn("padded_5", custom.pad_string(length=5)(F.col("id")))
df = df.withColumn("padded_x", custom.pad_string(length=8, fill_char='X')(F.col("id")))

Combining Multiple Registries

from build.pyspark.clean_emails.email_primitives import emails
from build.pyspark.clean_phones.phone_primitives import phones

# Create a combined validation pipeline
validation = PrimitiveRegistry("validation")

@validation.compose(emails=emails, phones=phones)
def validate_contact_info():
    # Check email
    if emails.is_valid_email():
        emails.standardize_email()
    
    # Check phone
    if phones.is_valid_phone():
        phones.standardize_phone()

CLI Commands

Initialize a Project

datacompose init [--yes]

Add Transformers

datacompose add <transformer> [--target TARGET] [--output OUTPUT] [--verbose]

# Examples
datacompose add clean_emails --target pyspark
datacompose add clean_addresses --target pyspark --output ./custom/path
datacompose add clean_phone_numbers --target pyspark --verbose

List Available Transformers

datacompose list transformers
datacompose list generators

Project Structure

After running datacompose add, your project will have the following structure:

project/
├── datacompose.json                    # Configuration file
├── build/
│   └── pyspark/
│       ├── clean_emails/
│       │   ├── email_primitives.py     # Generated email primitives
│       │   └── utils/
│       │       └── primitives.py       # Core framework (embedded)
│       ├── clean_addresses/
│       │   ├── address_primitives.py
│       │   └── utils/
│       │       └── primitives.py
│       └── clean_phone_numbers/
│           ├── phone_primitives.py
│           └── utils/
│               └── primitives.py

Configuration

The datacompose.json file configures default settings:

{
  "version": "1.0.0",
  "targets": {
    "pyspark": {
      "output": "./build/pyspark",
      "generator": "SparkPandasUDFGenerator"
    }
  },
  "templates": {
    "directory": "src/transformers/templates"
  }
}

Performance Considerations

  • Primitives are designed to be efficient Spark operations
  • Pipelines are compiled to minimize intermediate columns
  • Conditional logic uses Spark's when/otherwise for vectorized operations
  • Generated code has no runtime dependencies beyond PySpark

Philosophy & Inspiration

Datacompose is inspired by shadcn-svelte and huntabyte's approach to component libraries. Just as shadcn-svelte provides "copy and paste" components rather than npm packages, Datacompose generates data transformation code that becomes part of YOUR codebase.

Why we believe in this approach:

  • You Own Your Code: No external dependencies to manage or worry about breaking changes
  • Full Transparency: Every transformation is readable, debuggable PySpark code you can understand
  • Customization First: Need to adjust transformation? Just edit the code
  • Learn by Reading: The generated code serves as documentation and learning material

This is NOT a traditional library - it's a code generator that gives you production-ready data transformation primitives that you can modify to fit your exact needs.

License

MIT License - see LICENSE file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datacompose-0.2.4.tar.gz (128.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datacompose-0.2.4-py3-none-any.whl (53.1 kB view details)

Uploaded Python 3

File details

Details for the file datacompose-0.2.4.tar.gz.

File metadata

  • Download URL: datacompose-0.2.4.tar.gz
  • Upload date:
  • Size: 128.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.11

File hashes

Hashes for datacompose-0.2.4.tar.gz
Algorithm Hash digest
SHA256 034c42cb50d4bffe322a55df89241030a453ae8a46b9ba106411316a3c183a2f
MD5 a75212b4e674b667d5a2de758c276fec
BLAKE2b-256 e020daf721e8d837f4109419685791acc38ea046089972440f342718e313d4dc

See more details on using hashes here.

File details

Details for the file datacompose-0.2.4-py3-none-any.whl.

File metadata

  • Download URL: datacompose-0.2.4-py3-none-any.whl
  • Upload date:
  • Size: 53.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.11

File hashes

Hashes for datacompose-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 a7d1e06c0c5da208c665d62140ef2ddf28ef48edd80341b91a4e8f04f7820c92
MD5 8b59e0e3bf030d27597dec937f1b8aed
BLAKE2b-256 3eaa7a58644450b025f6eb61af212e5f1024dbe835679c8ca0c681ed8987abc1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page