Reusable PySpark transformation functions for enterprise-grade ETL pipelines.
Project description
sparkxform
sparkxform is a lightweight and reusable PySpark transformation utility library designed for scalable and modular data engineering pipelines. It contains a growing collection of transformation functions for Spark DataFrames including ID generation, string cleaning, date parsing, and more.
📦 Features
- 🔢 Dynamic ID generation from existing tables
- ✂️ String cleaning (trimming, lowercasing, etc.)
- 📅 Date column parsing and standardization
- 🔄 Easily extendable with your own transformations
- 💼 Production-friendly Python packaging
📚 Installation
Clone the repository and install locally:
git clone https://github.com/RanjitM007/sparkxform.git
cd sparkxform
pip install -e .
Or
pip install sparkxform
🔢 assign_next_id
from sparkxform import assign_next_id
df = assign_next_id(spark, df, table_name='existing_table', column_name='storage_id')
📝 Description
assign_next_id() assigns a uniform incremental ID to all rows in the given DataFrame by:
- Querying the max ID from an external table via Spark SQL.
- Incrementing it by one.
- Creating (or overwriting) a column with that next ID value.
✅ Advantages
- Simple and fast for batch inserts with uniform IDs.
- Useful for tracking ingestion batches.
- Easily integrates with Hive/Spark SQL tables.
- Eliminates race conditions in single-threaded batch ETL flows.
⚠️ Disadvantages
- All rows get the same ID, so it is not suitable for row-level uniqueness.
- Requires the external table (
table_name) to exist and be readable. - Not thread-safe or atomic in distributed writes — use with care in concurrent workloads.
- Cannot assign unique row IDs (e.g., for primary keys) — for that use
monotonically_increasing_id()or UUIDs.
🧠 Best For
- Appending batch metadata.
- Managing incremental inserts.
- Logging ingestion batches with shared identifiers.
🔁 Related Functions (coming soon)
assign_partitioned_ids()assign_uuid_column()generate_sequential_ids_by_group()
Let me know if you'd like me to help you implement the row-level unique ID generator, partition-aware version, or add automated tests for this function.
📆 convert_to_date
from sparkxform import convert_to_date
df = convert_to_date(df, col_name='order_date', fmt='MM/dd/yyyy')
📝 Description
convert_to_date() transforms a string-based date column into a Spark SQL DateType using the format you specify.
-
Uses to_date() internally to parse and cast values.
-
Overwrites the existing column with parsed date values.
✅ Advantages
- Simplifies date parsing using consistent Spark syntax.
- Supports custom date formats (e.g.,
'MM/dd/yyyy','dd-MM-yyyy', etc.). - Avoids manual casting and parsing logic.
⚠️ Disadvantages
- If the input format is incorrect or mismatched,
nullvalues will be produced. - It overwrites the existing column; to preserve it, the function must be extended to use a
new_colparameter. - Works only with string columns containing valid date strings.
🧠 Best For
- Standardizing date columns from CSV, JSON, or unstructured sources.
- Preparing data for time-based operations (e.g., filtering, grouping by date).
- Quick date conversion in ETL pipelines.
🔁 Related Functions (coming soon)
convert_to_timestamp()add_date_part_columns()(e.g., year, month, day)format_date_column()
✂️ clean_string_column
from sparkxform import clean_string_column
df = clean_string_column(df, col_name='Name', new_col='cleaned_name')
📝 Description
clean_string_column() cleans a string column by:
-
Removing leading and trailing whitespace using
trim(). -
Converting all characters to lowercase using
lower(). -
Optionally writing the cleaned result to a new column
(new_col)or replacing the original.
✅ Advantages
- Simplifies standard string cleaning operations.
- Helps maintain consistency across categorical or textual data.
- Non-destructive if
new_colis specified — preserves original column.
⚠️ Disadvantages
- Assumes the input column is of string type; other types may raise errors.
- Only performs basic cleaning (
trim+lower) — not full normalization or regex cleaning.
🧠 Best For
- Preprocessing name, city, email, or category columns.
- Normalizing textual data for comparison, deduplication, or joins.
- Cleaning incoming data from CSV/Excel/JSON sources.
🔁 Related Functions (coming soon)
remove_special_chars()normalize_whitespace()standardize_column_case()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file sparkxform-0.1.0.tar.gz.
File metadata
- Download URL: sparkxform-0.1.0.tar.gz
- Upload date:
- Size: 5.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.11.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e1120c2a95f7f431e441c1cdcde9d1c9741e178936aff73ceb88f091aad9c20f
|
|
| MD5 |
244b45885674910a5cbe38b4675e8cad
|
|
| BLAKE2b-256 |
a1186f23c43578d1e072a191e8d11f9fb2b1d11a3f70b1d892f6f69e01d4cf5d
|