Skip to main content

Modular toolkit for Data Engineering with PySpark and Delta Lake — schema management, auditing, profiling, normalization, JSON handling, window functions, and Delta Lake operations.

Project description

pyspark-data-toolkit — Utilities Library for PySpark & Delta Lake

PyPI version Python License Build

A production-ready utility library to accelerate Data Engineering workflows with PySpark and Delta Lake.

Stop rewriting boilerplate code. pyspark-data-toolkit delivers robust, modular functions for schema management, audit trails, string normalization, profiling, union operations, windowing, and Delta Lake management — all with logging support and data governance best practices built-in.


📦 Installation

pip install pyspark-data-toolkit

For local development:

git clone https://github.com/<your-org>/pyspark-data-toolkit.git
cd pyspark-data-toolkit
pip install -e ".[dev]"

✨ Key Features

  • 📦 Modular design — Import only what you need (audit, schema, delta, json, profiling, etc.)
  • Production-grade — Exception handling and structured logging
  • 🧪 Profiling & validation — Null analysis, stats, outliers, schema diffs
  • 🧼 Normalization utilities — Clean strings and standardize column names
  • 🔄 Schema enforcement — Apply, validate, and cast schemas safely
  • 🧱 Delta Lake utilities — Merge, replace, optimize, vacuum, z-order
  • 🪟 Window-based deduplication — Keep the latest records per key
  • 📊 Audit metadata — Control columns, row hashes, batch IDs
  • 🔗 JSON manipulation — Nested structure extraction, flattening
  • 🧠 Diff utilities — Compare DataFrames and tag changes

📋 Modules Overview

Module Description
audit_utils Add control/audit columns and validate ingestion metadata.
dataframe_utils Common transformations for DataFrames.
datetime_utils Date/time parsing, formatting, and timezone conversion.
delta_table_utils Delta Lake management: merges, optimizations, partitions.
diff_utils Compare DataFrames and schemas, summarize differences.
json_utils Extract and flatten JSON/nested structures in columns.
normalization_utils Normalize strings and column names, safe numeric conversion.
profiling_utils Null analysis, stats, outliers, cardinality, skew, correlations.
schema_utils Apply, validate, and cast schemas from specs.
union_utils Schema-aligned DataFrame unions or JSON merges.
window_utils Latest-record selection and deduplication by window.

🚀 Quick Start Example

from pyspark.sql import SparkSession, Row
from pyspark_data_toolkit.audit_utils import add_control_columns
from pyspark_data_toolkit.profiling_utils import profile_nulls

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(id=1, value="A"),
    Row(id=2, value=None)
])

# Add control columns (e.g., ingestion timestamp, batch ID)
df = add_control_columns(df)

# Profile null values
null_report = profile_nulls(df)
print(null_report)

📖 Module Usage Examples

💡 Below are short, illustrative snippets. Full pipelines and advanced use cases are available in EXAMPLES.md.cd

Audit

from pyspark_data_toolkit.audit_utils import *
df = add_control_columns(df, add_hash=True, version='v2')
df = check_row_duplicates(df)
df = add_audit_trail_columns(df)

Delta Lake

from pyspark_data_toolkit.delta_table_utils import write_delta_table, merge_delta_table
write_delta_table(spark, df, "db.table", "/path", arq_format="delta", mode="overwrite", partition_cols=("part",))
merge_delta_table(spark, df_updates, "db.table", "/path", merge_cols=("id",))

Profiling

from pyspark_data_toolkit.profiling_utils import profile_nulls, profile_numeric_stats
nulls = profile_nulls(df)
stats = profile_numeric_stats(df)

Normalization

from pyspark_data_toolkit.normalization_utils import normalize_strings, normalize_column_names
df = normalize_strings(df, columns=["name"])
df = normalize_column_names(df)

Union

from pyspark_data_toolkit.union_utils import union_all_with_schema
df_union = union_all_with_schema([df1, df2])

Window

from pyspark_data_toolkit.window_utils import drop_duplicates_keep_latest
df_latest = drop_duplicates_keep_latest(df, keys=["id"], order_col="timestamp")

🏆 Best Practices

  • Be modular — Import only the needed functions for clarity and performance.
  • Profile early — Run profiling after ingestion to catch anomalies quickly.
  • Validate schemas — Apply and validate schemas before transformations.
  • Governance first — Use audit utilities to ensure traceability.
  • Delta Lake safety — When overwriting, always set replace_where to avoid unwanted partition drops.

🔧 Dependencies

  • Python >= 3.8
  • PySpark >= 3.0
  • Delta Lake (optional, required only for Delta modules)

🧪 Testing

# Install dev dependencies
pip install -e ".[dev]"

# Run all tests
make test

📝 Changelog

v0.1.0

  • Initial release
  • Schema management utilities
  • Profiling (nulls, stats, outliers, diffs)
  • Normalization functions
  • Delta Lake operations (merge, optimize, vacuum)
  • JSON extraction and flattening

🤝 Contributing

Contributions are welcome!

  1. Fork the project
  2. Create your feature branch (git checkout -b feature/pyspark-data-toolkit)
  3. Commit your changes (git commit -m 'Add new feature')
  4. Push to your branch (git push origin feature/pyspark-data-toolkit)
  5. Open a Pull Request

📜 License

MIT License — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_data_toolkit-0.3.1.tar.gz (40.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_data_toolkit-0.3.1-py3-none-any.whl (42.4 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_data_toolkit-0.3.1.tar.gz.

File metadata

  • Download URL: pyspark_data_toolkit-0.3.1.tar.gz
  • Upload date:
  • Size: 40.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for pyspark_data_toolkit-0.3.1.tar.gz
Algorithm Hash digest
SHA256 6620ecd5f30c38f07682a4f606f11ecea7ca688ea515e2a99be974395928ea06
MD5 7fce0bdf4027c36ea96580638e1501c1
BLAKE2b-256 de56de6fd263acfaf945c285761e4e7d90a8db359b03d07417fcb7729f4d703a

See more details on using hashes here.

File details

Details for the file pyspark_data_toolkit-0.3.1-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_data_toolkit-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3d8155c8a79c39b3db9801532068eb64bf74ee2f441a2ffceba4c62c718dfb84
MD5 15e195a511bf7ba89e6328d483d931b6
BLAKE2b-256 73553a80636e522e3adf633216e8a8f48154af1d6945a96c21d20fd0d1396b76

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page