A PySpark transform registry with MLflow integration.
Project description
PySpark Transform Registry
A simplified library for registering and loading PySpark transform functions using MLflow's model registry.
Installation
pip install pyspark-transform-registry
Quick Start
Register a Function
from pyspark_transform_registry import register_function
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, lit
def clean_data(df: DataFrame) -> DataFrame:
"""Remove invalid records and standardize data."""
return df.filter(col("amount") > 0).withColumn("status", lit("clean"))
# Register the function
logged_model = register_function(
func=clean_data,
name="analytics.etl.clean_data",
description="Data cleaning transformation"
)
Load and Use a Function
from pyspark_transform_registry import load_function
# Load the registered function
clean_data_func = load_function("analytics.etl.clean_data", version=1)
# Use it on your data
result = clean_data_func(your_dataframe)
Features
- Simple API: Just two main functions -
register_function()andload_function() - Direct Registration: Register functions directly from Python code
- File-based Registration: Load and register functions from Python files
- Automatic Versioning: Integer-based versioning with automatic incrementing
- MLflow Integration: Built on MLflow's model registry
- Multi-Parameter Support: Functions with additional parameters beyond the DataFrame
Usage Examples
Direct Function Registration
from pyspark_transform_registry import register_function
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when
def risk_scorer(df: DataFrame, threshold: float = 100.0) -> DataFrame:
"""Calculate risk scores based on amount."""
return df.withColumn(
"risk_score",
when(col("amount") > threshold, "high").otherwise("low")
)
# Register with metadata
register_function(
func=risk_scorer,
name="finance.scoring.risk_scorer",
description="Risk scoring transformation",
extra_pip_requirements=["numpy>=1.20.0"],
tags={"team": "finance", "category": "scoring"}
)
File-based Registration
# transforms/data_processors.py
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
def feature_engineer(df: DataFrame) -> DataFrame:
"""Create engineered features."""
return df.withColumn("feature_1", col("amount") * 2)
# Register from file
register_function(
file_path="transforms/data_processors.py",
function_name="feature_engineer",
name="ml.features.feature_engineer",
description="Feature engineering pipeline"
)
Multi-Parameter Functions
def filter_by_category(df: DataFrame, category: str, min_amount: float = 0.0) -> DataFrame:
"""Filter data by category and minimum amount."""
return df.filter(
(col("category") == category) &
(col("amount") >= min_amount)
)
# Register the function
register_function(
func=filter_by_category,
name="retail.filtering.filter_by_category"
)
# Load and use with parameters
filter_func = load_function("retail.filtering.filter_by_category", version=1)
electronics = filter_func(sample_df, category="electronics", min_amount=100.0)
Source Code Inspection
# Load a function
transform = load_function("retail.processing.process_orders", version=1)
# Get the original source code
source_code = transform.get_source()
print(source_code) # Shows the original function definition
# Get the original function for inspection
original_func = transform.get_original_function()
print(f"Function name: {original_func.__name__}")
print(f"Docstring: {original_func.__doc__}")
API Reference
register_function()
Register a PySpark transform function in MLflow's model registry.
Parameters:
func(Callable, optional): The function to register (for direct registration)name(str): Model name for registry (supports 3-part naming)file_path(str, optional): Path to Python file containing the functionfunction_name(str, optional): Name of function to extract from filedescription(str, optional): Model descriptionextra_pip_requirements(list, optional): Additional pip requirementstags(dict, optional): Tags to attach to the registered modelinfer_schema(bool, optional): Whether to automatically infer schema constraints (default: False)infer_requirements(bool, optional): Whether to automatically infer pip requirements (default: False)
Returns:
ModelInfo: The logged model information
load_function()
Load a previously registered PySpark transform function.
Parameters:
name(str): Model name in registryversion(int or str): Model version to load (required)
Returns:
Callable: The loaded transform function that supports:- Single param:
transform(df) - Multi param:
transform(df, param1=value1, param2=value2) - Source inspection:
transform.get_source()- Returns the original function source code - Function access:
transform.get_original_function()- Returns the unwrapped original function
- Single param:
get_latest_function_version()
Get the latest version number of a function in the registry.
Parameters:
name(str): Model name in registry
Returns:
str: Latest version number of the function
Model Discovery
To discover registered models, use MLflow's native model registry APIs:
import mlflow
client = mlflow.tracking.MlflowClient()
models = client.list_registered_models()
for model in models:
print(f"Model: {model.name}")
for version in model.latest_versions:
print(f" Version: {version.version}")
Requirements
- Python 3.9+
- PySpark 3.0+
- MLflow 3.0+
Development
# Install development dependencies
make install
# Run tests
make test
# Run linting and formatting
make check
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyspark_transform_registry-0.6.0.tar.gz.
File metadata
- Download URL: pyspark_transform_registry-0.6.0.tar.gz
- Upload date:
- Size: 11.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0980030a80b8f88a59744c450659069a8993ff22ef4e00572e1660a8bd7878d0
|
|
| MD5 |
1ca0ab1194ac85850cf0f3a48f393088
|
|
| BLAKE2b-256 |
10090edf77bcbe4dc70207244e1ef0991183f5f272262dcf8da3d84fcd64e6e1
|
File details
Details for the file pyspark_transform_registry-0.6.0-py3-none-any.whl.
File metadata
- Download URL: pyspark_transform_registry-0.6.0-py3-none-any.whl
- Upload date:
- Size: 8.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
31a8346a11be620f3632de97f8ea9f7cc7bc815be6465ab3328369ae0b326a9a
|
|
| MD5 |
245906193f235814ece5b2394531d59d
|
|
| BLAKE2b-256 |
6d422e26209cd06838cea2baf1da0f93b07d54c041fb4210c2ff4e41413094f4
|