Skip to main content

Modular toolkit for Data Engineering with PySpark and Delta Lake — schema management, auditing, profiling, normalization, JSON handling, window functions, and Delta Lake operations.

Project description

pyspark-data-toolkit — Utilities Library for PySpark & Delta Lake

PyPI version Python License Build

A production-ready utility library to accelerate Data Engineering workflows with PySpark and Delta Lake.

Stop rewriting boilerplate code. pyspark-data-toolkit delivers robust, modular functions for schema management, audit trails, string normalization, profiling, union operations, windowing, and Delta Lake management — all with logging support and data governance best practices built-in.


📦 Installation

pip install pyspark-data-toolkit

For local development:

git clone https://github.com/<your-org>/pyspark-data-toolkit.git
cd pyspark-data-toolkit
pip install -e ".[dev]"

✨ Key Features

  • 📦 Modular design — Import only what you need (audit, schema, delta, json, profiling, etc.)
  • Production-grade — Exception handling and structured logging
  • 🧪 Profiling & validation — Null analysis, stats, outliers, schema diffs
  • 🧼 Normalization utilities — Clean strings and standardize column names
  • 🔄 Schema enforcement — Apply, validate, and cast schemas safely
  • 🧱 Delta Lake utilities — Merge, replace, optimize, vacuum, z-order
  • 🪟 Window-based deduplication — Keep the latest records per key
  • 📊 Audit metadata — Control columns, row hashes, batch IDs
  • 🔗 JSON manipulation — Nested structure extraction, flattening
  • 🧠 Diff utilities — Compare DataFrames and tag changes

📋 Modules Overview

Module Description
audit_utils Add control/audit columns and validate ingestion metadata.
dataframe_utils Common transformations for DataFrames.
datetime_utils Date/time parsing, formatting, and timezone conversion.
delta_table_utils Delta Lake management: merges, optimizations, partitions.
diff_utils Compare DataFrames and schemas, summarize differences.
json_utils Extract and flatten JSON/nested structures in columns.
normalization_utils Normalize strings and column names, safe numeric conversion.
profiling_utils Null analysis, stats, outliers, cardinality, skew, correlations.
schema_utils Apply, validate, and cast schemas from specs.
union_utils Schema-aligned DataFrame unions or JSON merges.
window_utils Latest-record selection and deduplication by window.

🚀 Quick Start Example

from pyspark.sql import SparkSession, Row
from pyspark_data_toolkit.audit_utils import add_control_columns
from pyspark_data_toolkit.profiling_utils import profile_nulls

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(id=1, value="A"),
    Row(id=2, value=None)
])

# Add control columns (e.g., ingestion timestamp, batch ID)
df = add_control_columns(df)

# Profile null values
null_report = profile_nulls(df)
print(null_report)

📖 Module Usage Examples

💡 Below are short, illustrative snippets. Full pipelines and advanced use cases are available in EXAMPLES.md.cd

Audit

from pyspark_data_toolkit.audit_utils import *
df = add_control_columns(df, add_hash=True, version='v2')
df = check_row_duplicates(df)
df = add_audit_trail_columns(df)

Delta Lake

from pyspark_data_toolkit.delta_table_utils import write_delta_table, merge_delta_table
write_delta_table(spark, df, "db.table", "/path", arq_format="delta", mode="overwrite", partition_cols=("part",))
merge_delta_table(spark, df_updates, "db.table", "/path", merge_cols=("id",))

Profiling

from pyspark_data_toolkit.profiling_utils import profile_nulls, profile_numeric_stats
nulls = profile_nulls(df)
stats = profile_numeric_stats(df)

Normalization

from pyspark_data_toolkit.normalization_utils import normalize_strings, normalize_column_names
df = normalize_strings(df, columns=["name"])
df = normalize_column_names(df)

Union

from pyspark_data_toolkit.union_utils import union_all_with_schema
df_union = union_all_with_schema([df1, df2])

Window

from pyspark_data_toolkit.window_utils import drop_duplicates_keep_latest
df_latest = drop_duplicates_keep_latest(df, keys=["id"], order_col="timestamp")

🏆 Best Practices

  • Be modular — Import only the needed functions for clarity and performance.
  • Profile early — Run profiling after ingestion to catch anomalies quickly.
  • Validate schemas — Apply and validate schemas before transformations.
  • Governance first — Use audit utilities to ensure traceability.
  • Delta Lake safety — When overwriting, always set replace_where to avoid unwanted partition drops.

🔧 Dependencies

  • Python >= 3.8
  • PySpark >= 3.0
  • Delta Lake (optional, required only for Delta modules)

🧪 Testing

# Install dev dependencies
pip install -e ".[dev]"

# Run all tests
make test

📝 Changelog

v0.1.0

  • Initial release
  • Schema management utilities
  • Profiling (nulls, stats, outliers, diffs)
  • Normalization functions
  • Delta Lake operations (merge, optimize, vacuum)
  • JSON extraction and flattening

🤝 Contributing

Contributions are welcome!

  1. Fork the project
  2. Create your feature branch (git checkout -b feature/pyspark-data-toolkit)
  3. Commit your changes (git commit -m 'Add new feature')
  4. Push to your branch (git push origin feature/pyspark-data-toolkit)
  5. Open a Pull Request

📜 License

MIT License — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_data_toolkit-0.3.7.1.tar.gz (42.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_data_toolkit-0.3.7.1-py3-none-any.whl (44.1 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_data_toolkit-0.3.7.1.tar.gz.

File metadata

  • Download URL: pyspark_data_toolkit-0.3.7.1.tar.gz
  • Upload date:
  • Size: 42.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for pyspark_data_toolkit-0.3.7.1.tar.gz
Algorithm Hash digest
SHA256 dab60d266bfdbf412cf8d8a10470ba278089b33799373e7b02513c6670b08a29
MD5 fe8489e149b51a98bf369eccef643410
BLAKE2b-256 e90f2f194228fe3b38d6c1462097d4c8852508dbec7227db5270fe8c0ae0f2df

See more details on using hashes here.

File details

Details for the file pyspark_data_toolkit-0.3.7.1-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_data_toolkit-0.3.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e5b94d49ed3e067627b09d54d08058381a1c4f7f78d6a1a9be66c60339ade9bf
MD5 651c56294dbb0cd63806399b82763bc2
BLAKE2b-256 3e3794e1aaf1c50846350987a9a2e7b4c9465b50708f1f0d748babe14b4fb01f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page