Skip to main content

Modular toolkit for Data Engineering with PySpark and Delta Lake — schema management, auditing, profiling, normalization, JSON handling, window functions, and Delta Lake operations.

Project description

pyspark-data-toolkit — Utilities Library for PySpark & Delta Lake

PyPI version Python License Build

A production-ready utility library to accelerate Data Engineering workflows with PySpark and Delta Lake.

Stop rewriting boilerplate code. pyspark-data-toolkit delivers robust, modular functions for schema management, audit trails, string normalization, profiling, union operations, windowing, and Delta Lake management — all with logging support and data governance best practices built-in.


📦 Installation

pip install pyspark-data-toolkit

For local development:

git clone https://github.com/<your-org>/pyspark-data-toolkit.git
cd pyspark-data-toolkit
pip install -e ".[dev]"

✨ Key Features

  • 📦 Modular design — Import only what you need (audit, schema, delta, json, profiling, etc.)
  • Production-grade — Exception handling and structured logging
  • 🧪 Profiling & validation — Null analysis, stats, outliers, schema diffs
  • 🧼 Normalization utilities — Clean strings and standardize column names
  • 🔄 Schema enforcement — Apply, validate, and cast schemas safely
  • 🧱 Delta Lake utilities — Merge, replace, optimize, vacuum, z-order
  • 🪟 Window-based deduplication — Keep the latest records per key
  • 📊 Audit metadata — Control columns, row hashes, batch IDs
  • 🔗 JSON manipulation — Nested structure extraction, flattening
  • 🧠 Diff utilities — Compare DataFrames and tag changes

📋 Modules Overview

Module Description
audit_utils Add control/audit columns and validate ingestion metadata.
dataframe_utils Common transformations for DataFrames.
datetime_utils Date/time parsing, formatting, and timezone conversion.
delta_table_utils Delta Lake management: merges, optimizations, partitions.
diff_utils Compare DataFrames and schemas, summarize differences.
json_utils Extract and flatten JSON/nested structures in columns.
normalization_utils Normalize strings and column names, safe numeric conversion.
profiling_utils Null analysis, stats, outliers, cardinality, skew, correlations.
schema_utils Apply, validate, and cast schemas from specs.
union_utils Schema-aligned DataFrame unions or JSON merges.
window_utils Latest-record selection and deduplication by window.

🚀 Quick Start Example

from pyspark.sql import SparkSession, Row
from pyspark_data_toolkit.audit_utils import add_control_columns
from pyspark_data_toolkit.profiling_utils import profile_nulls

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(id=1, value="A"),
    Row(id=2, value=None)
])

# Add control columns (e.g., ingestion timestamp, batch ID)
df = add_control_columns(df)

# Profile null values
null_report = profile_nulls(df)
print(null_report)

📖 Module Usage Examples

💡 Below are short, illustrative snippets. Full pipelines and advanced use cases are available in EXAMPLES.md.cd

Audit

from pyspark_data_toolkit.audit_utils import *
df = add_control_columns(df, add_hash=True, version='v2')
df = check_row_duplicates(df)
df = add_audit_trail_columns(df)

Delta Lake

from pyspark_data_toolkit.delta_table_utils import write_delta_table, merge_delta_table
write_delta_table(spark, df, "db.table", "/path", arq_format="delta", mode="overwrite", partition_cols=("part",))
merge_delta_table(spark, df_updates, "db.table", "/path", merge_cols=("id",))

Profiling

from pyspark_data_toolkit.profiling_utils import profile_nulls, profile_numeric_stats
nulls = profile_nulls(df)
stats = profile_numeric_stats(df)

Normalization

from pyspark_data_toolkit.normalization_utils import normalize_strings, normalize_column_names
df = normalize_strings(df, columns=["name"])
df = normalize_column_names(df)

Union

from pyspark_data_toolkit.union_utils import union_all_with_schema
df_union = union_all_with_schema([df1, df2])

Window

from pyspark_data_toolkit.window_utils import drop_duplicates_keep_latest
df_latest = drop_duplicates_keep_latest(df, keys=["id"], order_col="timestamp")

🏆 Best Practices

  • Be modular — Import only the needed functions for clarity and performance.
  • Profile early — Run profiling after ingestion to catch anomalies quickly.
  • Validate schemas — Apply and validate schemas before transformations.
  • Governance first — Use audit utilities to ensure traceability.
  • Delta Lake safety — When overwriting, always set replace_where to avoid unwanted partition drops.

🔧 Dependencies

  • Python >= 3.8
  • PySpark >= 3.0
  • Delta Lake (optional, required only for Delta modules)

🧪 Testing

# Install dev dependencies
pip install -e ".[dev]"

# Run all tests
make test

📝 Changelog

v0.1.0

  • Initial release
  • Schema management utilities
  • Profiling (nulls, stats, outliers, diffs)
  • Normalization functions
  • Delta Lake operations (merge, optimize, vacuum)
  • JSON extraction and flattening

🤝 Contributing

Contributions are welcome!

  1. Fork the project
  2. Create your feature branch (git checkout -b feature/pyspark-data-toolkit)
  3. Commit your changes (git commit -m 'Add new feature')
  4. Push to your branch (git push origin feature/pyspark-data-toolkit)
  5. Open a Pull Request

📜 License

MIT License — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_data_toolkit-0.3.4.tar.gz (42.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_data_toolkit-0.3.4-py3-none-any.whl (44.0 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_data_toolkit-0.3.4.tar.gz.

File metadata

  • Download URL: pyspark_data_toolkit-0.3.4.tar.gz
  • Upload date:
  • Size: 42.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for pyspark_data_toolkit-0.3.4.tar.gz
Algorithm Hash digest
SHA256 f8b450786e82759bcd596818f3acfb9d8493e3e6256fe2a85a0e0129230fb371
MD5 8b2feb6e1dc047aa3e93a1e9952ea23b
BLAKE2b-256 558d998b9e28ad5bb424ec34b592470bdf8fd1bef98edd7ae81efc4dab32d79f

See more details on using hashes here.

File details

Details for the file pyspark_data_toolkit-0.3.4-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_data_toolkit-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 d06ed992364d98f6e3e9cc8354b699e13052538d586adf7115852fd1ffcd1949
MD5 c09a73d8ae651abc2d737a3ebb2eba74
BLAKE2b-256 05daf2f7caa6006499be8dcaed8812129f30b2bb90d834c23d3aa15f3be7e69a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page