Modern Spark distribution fitting library with efficient parallel processing
Project description
spark-bestfit
Modern Spark distribution fitting library with efficient parallel processing
Efficiently fit ~100 scipy.stats distributions to your data using Spark's parallel processing with optimized Pandas UDFs and broadcast variables.
Features
- Parallel Processing: Fits distributions in parallel using Spark
- Multi-Column Fitting: Fit multiple columns efficiently in a single operation
- ~100 Continuous Distributions: Access to nearly all scipy.stats continuous distributions
- 16 Discrete Distributions: Fit count data with Poisson, negative binomial, geometric, and more
- Histogram-Based Fitting: Efficient fitting using histogram representation
- Multiple Metrics: Compare fits using K-S statistic, A-D statistic, SSE, AIC, and BIC
- Statistical Validation: Kolmogorov-Smirnov and Anderson-Darling tests for goodness-of-fit
- Confidence Intervals: Bootstrap confidence intervals for fitted parameters
- Progress Tracking: Monitor long-running fits with customizable callbacks
- Results API: Filter, sort, and export results easily
- Visualization: Built-in plotting for distribution comparison, Q-Q plots and P-P plots
- Flexible Configuration: Customize bins, sampling, and distribution selection
Scope & Limitations
spark-bestfit is designed for batch processing of statistical distribution fitting on Spark DataFrames.
What it does well:
- Fit ~100 continuous and 16 discrete scipy.stats distributions in parallel
- Provide robust goodness-of-fit metrics (KS, A-D, AIC, BIC, SSE)
- Generate publication-ready visualizations (histograms, Q-Q plots, P-P plots)
- Compute bootstrap confidence intervals for parameters
Known limitations:
- No real-time/streaming support (batch processing only)
- Custom distribution support planned for 1.3.0
Installation
pip install spark-bestfit
This installs spark-bestfit without PySpark. You are responsible for providing a compatible Spark environment (see Compatibility Matrix below).
With PySpark included (for users without a managed Spark environment):
pip install spark-bestfit[spark]
Quick Start
from spark_bestfit import DistributionFitter
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Generate sample data
data = np.random.normal(loc=50, scale=10, size=10_000)
# Create fitter
fitter = DistributionFitter(spark)
df = spark.createDataFrame([(float(x),) for x in data], ["value"])
# Fit distributions
results = fitter.fit(df, column="value")
# Get best fit (by K-S statistic, the default)
best = results.best(n=1)[0]
print(f"Best: {best.distribution} (KS={best.ks_statistic:.4f}, p={best.pvalue:.4f})")
# Plot
fitter.plot(best, df, "value", title="Best Fit Distribution")
Compatibility Matrix
| Spark Version | Python Versions | NumPy | Pandas | PyArrow |
|---|---|---|---|---|
| 3.5.x | 3.11, 3.12 | 1.24+ (< 2.0) | 1.5+ | 12.0 - 16.x |
| 4.x | 3.12, 3.13 | 2.0+ | 2.2+ | 17.0+ |
Note: Spark 3.5.x does not support NumPy 2.0. If using Spark 3.5 with Python 3.12, ensure
setuptoolsis installed (providesdistutils).
API Overview
Fitting Distributions
from spark_bestfit import DistributionFitter
fitter = DistributionFitter(spark, random_seed=123)
results = fitter.fit(
df,
column="value",
bins=100, # Number of histogram bins
support_at_zero=True, # Only fit non-negative distributions
enable_sampling=True, # Enable adaptive sampling
sample_fraction=0.3, # Sample 30% of data
max_distributions=50, # Limit distributions to fit
)
Multi-Column Fitting
Fit multiple columns efficiently in a single operation:
from spark_bestfit import DistributionFitter
# Create DataFrame with multiple columns
df = spark.createDataFrame([
(1.0, 10.0, 100.0),
(2.0, 20.0, 200.0),
# ...
], ["col_a", "col_b", "col_c"])
fitter = DistributionFitter(spark)
# Fit all columns in one call - shares Spark overhead
results = fitter.fit(df, columns=["col_a", "col_b", "col_c"])
# Get results for a specific column
col_a_results = results.for_column("col_a")
best_a = col_a_results.best(n=1)[0]
# Get best distribution per column
best_per_col = results.best_per_column(n=1)
for col_name, fits in best_per_col.items():
print(f"{col_name}: {fits[0].distribution} (KS={fits[0].ks_statistic:.4f})")
# List all columns in results
print(results.column_names) # ['col_a', 'col_b', 'col_c']
Multi-column fitting is more efficient than fitting columns separately because it:
- Performs a single
df.count()call for all columns - Shares the data sample across all fitting operations
- Minimizes Spark job overhead
Benchmark: Fitting 3 columns together is ~1.3× faster than 3 separate fits (4.8s vs 6.5s). See Performance & Scaling for details.
Working with Results
# Get top 5 distributions (by K-S statistic, the default)
top_5 = results.best(n=5)
# Get best by other metrics
best_sse = results.best(n=1, metric="sse")[0]
best_aic = results.best(n=1, metric="aic")[0]
best_ad = results.best(n=1, metric="ad_statistic")[0]
# Filter by goodness-of-fit
good_fits = results.filter(ks_threshold=0.05) # K-S statistic < 0.05
significant = results.filter(pvalue_threshold=0.05) # p-value > 0.05
good_ad = results.filter(ad_threshold=1.0) # A-D statistic < 1.0
# Convert to pandas for analysis
df_pandas = results.df.toPandas()
# Use fitted distribution
samples = best.sample(size=10000) # Generate samples
pdf_values = best.pdf(x_array) # Evaluate PDF
cdf_values = best.cdf(x_array) # Evaluate CDF
# Access all goodness-of-fit metrics
print(f"K-S: {best.ks_statistic}, p-value: {best.pvalue}")
print(f"A-D: {best.ad_statistic}, A-D p-value: {best.ad_pvalue}")
Note: Anderson-Darling p-values are only available for 5 distributions (norm, expon, logistic, gumbel_r, gumbel_l) where scipy has critical value tables. For other distributions,
ad_pvaluewill beNonebutad_statisticis still valid for ranking fits.
Progress Tracking
Monitor long-running fits with the built-in console_progress() utility:
from spark_bestfit.progress import console_progress
results = fitter.fit(df, column="value", progress_callback=console_progress())
print() # Newline after completion
# Output: Progress: 45/100 tasks (45.0%)
For custom callbacks or tqdm integration:
# Custom callback
def on_progress(completed: int, total: int, percent: float) -> None:
print(f"\rFitting: {completed}/{total} ({percent:.1f}%)", end="", flush=True)
results = fitter.fit(df, column="value", progress_callback=on_progress)
# tqdm integration
from tqdm import tqdm
pbar = None
def tqdm_callback(completed: int, total: int, percent: float) -> None:
global pbar
if pbar is None:
pbar = tqdm(total=total, desc="Fitting")
pbar.n = completed
pbar.refresh()
results = fitter.fit(df, column="value", progress_callback=tqdm_callback)
if pbar:
pbar.close()
Note: Progress percentages may fluctuate during fitting as new Spark stages add tasks. See Progress Tracking for details.
Parameter Confidence Intervals
# Compute 95% bootstrap confidence intervals
ci = best.confidence_intervals(df, column="value", alpha=0.05, n_bootstrap=1000, random_seed=42)
# Display with parameter names
print(f"Distribution: {best.distribution}")
for param, (lower, upper) in ci.items():
print(f" {param}: [{lower:.4f}, {upper:.4f}]")
Custom Plotting
fitter.plot(
best,
df,
"value",
figsize=(16, 10),
dpi=300,
histogram_alpha=0.6,
pdf_linewidth=3,
title="Distribution Fit",
xlabel="Value",
ylabel="Density",
save_path="output/distribution.png",
)
Q-Q Plots
# Create Q-Q plot for goodness-of-fit assessment
fitter.plot_qq(
best,
df,
"value",
max_points=1000, # Sample size for plotting
title="Q-Q Plot",
save_path="output/qq_plot.png",
)
P-P Plots
# Create P-P plot for goodness-of-fit assessment
fitter.plot_pp(
best,
df,
"value",
max_points=1000, # Sample size for plotting
title="P-P Plot",
save_path="output/pp_plot.png",
)
Discrete Distributions
For count data (integers), use DiscreteDistributionFitter:
from spark_bestfit import DiscreteDistributionFitter
import numpy as np
# Generate count data
data = np.random.poisson(lam=7, size=10_000)
df = spark.createDataFrame([(int(x),) for x in data], ["counts"])
# Fit discrete distributions
fitter = DiscreteDistributionFitter(spark)
results = fitter.fit(df, column="counts")
# Get best fit - use AIC for model selection (recommended for discrete)
best = results.best(n=1, metric="aic")[0]
print(f"Best: {best.distribution} (AIC={best.aic:.2f})")
# Plot fitted PMF
fitter.plot(best, df, "counts", title="Best Discrete Fit")
Metric Selection for Discrete Distributions:
| Metric | Use Case |
|---|---|
aic |
Recommended - Proper model selection criterion with complexity penalty |
bic |
Similar to AIC but stronger penalty for complex models |
ks_statistic |
Valid for ranking fits, but p-values are not reliable for discrete data |
ad_statistic |
Valid for ranking fits (not computed for discrete distributions) |
sse |
Simple comparison metric |
Note: The K-S and A-D tests assume continuous distributions. For discrete data, the K-S statistic can still rank fits, but p-values are conservative and should not be used for hypothesis testing. A-D statistics are not computed for discrete distributions. Use AIC/BIC for proper model selection.
Excluding Distributions
from spark_bestfit import DistributionFitter, DEFAULT_EXCLUDED_DISTRIBUTIONS
# View default exclusions
print(DEFAULT_EXCLUDED_DISTRIBUTIONS)
# Include a specific distribution by removing it from exclusions
exclusions = tuple(d for d in DEFAULT_EXCLUDED_DISTRIBUTIONS if d != "wald")
fitter = DistributionFitter(spark, excluded_distributions=exclusions)
# Or exclude nothing (fit all distributions - may be slow)
fitter = DistributionFitter(spark, excluded_distributions=())
Documentation
Full documentation is available at spark-bestfit.readthedocs.io.
Contributing
Contributions are welcome! Please read our Contributing Guide and Code of Conduct before submitting a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feat/amazing-feature) - Commit your changes (
git commit -m 'feat: add amazing feature') - Push to the branch (
git push origin feat/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file spark_bestfit-1.2.0.tar.gz.
File metadata
- Download URL: spark_bestfit-1.2.0.tar.gz
- Upload date:
- Size: 1.2 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c65e0fffa24a14a6177604f085844fbe656cc2d3f8e3a271d563750e5690707
|
|
| MD5 |
8f7a6d718ca127db298425ab76bc34b3
|
|
| BLAKE2b-256 |
a0dd23eb61f4da68357f21da3456cca64f9dd18aeaad2553953c5fdb823703ea
|
Provenance
The following attestation bundles were made for spark_bestfit-1.2.0.tar.gz:
Publisher:
release.yml on dwsmith1983/spark-bestfit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spark_bestfit-1.2.0.tar.gz -
Subject digest:
9c65e0fffa24a14a6177604f085844fbe656cc2d3f8e3a271d563750e5690707 - Sigstore transparency entry: 781992084
- Sigstore integration time:
-
Permalink:
dwsmith1983/spark-bestfit@5477558d718f9cdb48243b64b68b4f4e3b84ebbb -
Branch / Tag:
refs/heads/main - Owner: https://github.com/dwsmith1983
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@5477558d718f9cdb48243b64b68b4f4e3b84ebbb -
Trigger Event:
push
-
Statement type:
File details
Details for the file spark_bestfit-1.2.0-py3-none-any.whl.
File metadata
- Download URL: spark_bestfit-1.2.0-py3-none-any.whl
- Upload date:
- Size: 45.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e71ddaba2469f6207d2cb7bb5dbd9b073f9f64a96bc57c415798c483a48a3712
|
|
| MD5 |
3ef6fe3ab54c94da347b8e54349c5494
|
|
| BLAKE2b-256 |
80e7d0016a7a54a72461506e072b61894d2e8660824dac3a0b796ddfa7e4a579
|
Provenance
The following attestation bundles were made for spark_bestfit-1.2.0-py3-none-any.whl:
Publisher:
release.yml on dwsmith1983/spark-bestfit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spark_bestfit-1.2.0-py3-none-any.whl -
Subject digest:
e71ddaba2469f6207d2cb7bb5dbd9b073f9f64a96bc57c415798c483a48a3712 - Sigstore transparency entry: 781992087
- Sigstore integration time:
-
Permalink:
dwsmith1983/spark-bestfit@5477558d718f9cdb48243b64b68b4f4e3b84ebbb -
Branch / Tag:
refs/heads/main - Owner: https://github.com/dwsmith1983
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@5477558d718f9cdb48243b64b68b4f4e3b84ebbb -
Trigger Event:
push
-
Statement type: