Skip to main content

Modular toolkit for Data Engineering with PySpark and Delta Lake — schema management, auditing, profiling, normalization, JSON handling, window functions, and Delta Lake operations.

Project description

pyspark-data-toolkit — Utilities Library for PySpark & Delta Lake

PyPI version Python License Build

A production-ready utility library to accelerate Data Engineering workflows with PySpark and Delta Lake.

Stop rewriting boilerplate code. pyspark-data-toolkit delivers robust, modular functions for schema management, audit trails, string normalization, profiling, union operations, windowing, and Delta Lake management — all with logging support and data governance best practices built-in.


📦 Installation

pip install pyspark-data-toolkit

For local development:

git clone https://github.com/<your-org>/pyspark-data-toolkit.git
cd pyspark-data-toolkit
pip install -e ".[dev]"

✨ Key Features

  • 📦 Modular design — Import only what you need (audit, schema, delta, json, profiling, etc.)
  • Production-grade — Exception handling and structured logging
  • 🧪 Profiling & validation — Null analysis, stats, outliers, schema diffs
  • 🧼 Normalization utilities — Clean strings and standardize column names
  • 🔄 Schema enforcement — Apply, validate, and cast schemas safely
  • 🧱 Delta Lake utilities — Merge, replace, optimize, vacuum, z-order
  • 🪟 Window-based deduplication — Keep the latest records per key
  • 📊 Audit metadata — Control columns, row hashes, batch IDs
  • 🔗 JSON manipulation — Nested structure extraction, flattening
  • 🧠 Diff utilities — Compare DataFrames and tag changes

📋 Modules Overview

Module Description
audit_utils Add control/audit columns and validate ingestion metadata.
dataframe_utils Common transformations for DataFrames.
datetime_utils Date/time parsing, formatting, and timezone conversion.
delta_table_utils Delta Lake management: merges, optimizations, partitions.
diff_utils Compare DataFrames and schemas, summarize differences.
json_utils Extract and flatten JSON/nested structures in columns.
normalization_utils Normalize strings and column names, safe numeric conversion.
profiling_utils Null analysis, stats, outliers, cardinality, skew, correlations.
schema_utils Apply, validate, and cast schemas from specs.
union_utils Schema-aligned DataFrame unions or JSON merges.
window_utils Latest-record selection and deduplication by window.

🚀 Quick Start Example

from pyspark.sql import SparkSession, Row
from pyspark_data_toolkit.audit_utils import add_control_columns
from pyspark_data_toolkit.profiling_utils import profile_nulls

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(id=1, value="A"),
    Row(id=2, value=None)
])

# Add control columns (e.g., ingestion timestamp, batch ID)
df = add_control_columns(df)

# Profile null values
null_report = profile_nulls(df)
print(null_report)

📖 Module Usage Examples

💡 Below are short, illustrative snippets. Full pipelines and advanced use cases are available in EXAMPLES.md.cd

Audit

from pyspark_data_toolkit.audit_utils import *
df = add_control_columns(df, add_hash=True, version='v2')
df = check_row_duplicates(df)
df = add_audit_trail_columns(df)

Delta Lake

from pyspark_data_toolkit.delta_table_utils import write_delta_table, merge_delta_table
write_delta_table(spark, df, "db.table", "/path", arq_format="delta", mode="overwrite", partition_cols=("part",))
merge_delta_table(spark, df_updates, "db.table", "/path", merge_cols=("id",))

Profiling

from pyspark_data_toolkit.profiling_utils import profile_nulls, profile_numeric_stats
nulls = profile_nulls(df)
stats = profile_numeric_stats(df)

Normalization

from pyspark_data_toolkit.normalization_utils import normalize_strings, normalize_column_names
df = normalize_strings(df, columns=["name"])
df = normalize_column_names(df)

Union

from pyspark_data_toolkit.union_utils import union_all_with_schema
df_union = union_all_with_schema([df1, df2])

Window

from pyspark_data_toolkit.window_utils import drop_duplicates_keep_latest
df_latest = drop_duplicates_keep_latest(df, keys=["id"], order_col="timestamp")

🏆 Best Practices

  • Be modular — Import only the needed functions for clarity and performance.
  • Profile early — Run profiling after ingestion to catch anomalies quickly.
  • Validate schemas — Apply and validate schemas before transformations.
  • Governance first — Use audit utilities to ensure traceability.
  • Delta Lake safety — When overwriting, always set replace_where to avoid unwanted partition drops.

🔧 Dependencies

  • Python >= 3.8
  • PySpark >= 3.0
  • Delta Lake (optional, required only for Delta modules)

🧪 Testing

# Install dev dependencies
pip install -e ".[dev]"

# Run all tests
make test

📝 Changelog

v0.1.0

  • Initial release
  • Schema management utilities
  • Profiling (nulls, stats, outliers, diffs)
  • Normalization functions
  • Delta Lake operations (merge, optimize, vacuum)
  • JSON extraction and flattening

🤝 Contributing

Contributions are welcome!

  1. Fork the project
  2. Create your feature branch (git checkout -b feature/pyspark-data-toolkit)
  3. Commit your changes (git commit -m 'Add new feature')
  4. Push to your branch (git push origin feature/pyspark-data-toolkit)
  5. Open a Pull Request

📜 License

MIT License — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_data_toolkit-0.3.0.tar.gz (40.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_data_toolkit-0.3.0-py3-none-any.whl (42.4 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_data_toolkit-0.3.0.tar.gz.

File metadata

  • Download URL: pyspark_data_toolkit-0.3.0.tar.gz
  • Upload date:
  • Size: 40.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for pyspark_data_toolkit-0.3.0.tar.gz
Algorithm Hash digest
SHA256 a4e0ca945bb0b99ecd60b4b4a1e19c22346271aab8f726e0a41329b2ce6f9aca
MD5 3e2f93eeaebf8ef1edab251c33385b27
BLAKE2b-256 3f8a18d10403afad84e52a5d1a194b322edcf880f97fcd4c2b65eda43b94a4f8

See more details on using hashes here.

File details

Details for the file pyspark_data_toolkit-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_data_toolkit-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b908b05733a840d600eb9be795e9b0b853e3d4e09a1ba3966bbe60666f014f32
MD5 209f349b0f42b50ec70d63a9a8d28be9
BLAKE2b-256 189789ca0247cd948c25bf9a62fa49aad938ba60291cf460cdaac4e4ff472708

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page