Skip to main content

Modern Spark distribution fitting library with efficient parallel processing

Project description

spark-bestfit

CI PyPI version Documentation Status Production Ready License: MIT Code style: black Ruff

Modern Spark distribution fitting library with efficient parallel processing

Efficiently fit ~100 scipy.stats distributions to your data using Spark's parallel processing with optimized Pandas UDFs and broadcast variables.

Features

  • Parallel Processing: Fits distributions in parallel using Spark
  • Multi-Column Fitting: Fit multiple columns efficiently in a single operation
  • ~100 Continuous Distributions: Access to nearly all scipy.stats continuous distributions
  • 16 Discrete Distributions: Fit count data with Poisson, negative binomial, geometric, and more
  • Histogram-Based Fitting: Efficient fitting using histogram representation
  • Multiple Metrics: Compare fits using K-S statistic, A-D statistic, SSE, AIC, and BIC
  • Statistical Validation: Kolmogorov-Smirnov and Anderson-Darling tests for goodness-of-fit
  • Confidence Intervals: Bootstrap confidence intervals for fitted parameters
  • Progress Tracking: Monitor long-running fits with customizable callbacks
  • Results API: Filter, sort, and export results easily
  • Visualization: Built-in plotting for distribution comparison, Q-Q plots and P-P plots
  • Flexible Configuration: Customize bins, sampling, and distribution selection

Scope & Limitations

spark-bestfit is designed for batch processing of statistical distribution fitting on Spark DataFrames.

What it does well:

  • Fit ~100 continuous and 16 discrete scipy.stats distributions in parallel
  • Provide robust goodness-of-fit metrics (KS, A-D, AIC, BIC, SSE)
  • Generate publication-ready visualizations (histograms, Q-Q plots, P-P plots)
  • Compute bootstrap confidence intervals for parameters

Known limitations:

  • No real-time/streaming support (batch processing only)
  • Custom distribution support planned for 1.3.0

Installation

pip install spark-bestfit

This installs spark-bestfit without PySpark. You are responsible for providing a compatible Spark environment (see Compatibility Matrix below).

With PySpark included (for users without a managed Spark environment):

pip install spark-bestfit[spark]

Quick Start

from spark_bestfit import DistributionFitter
import numpy as np
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Generate sample data
data = np.random.normal(loc=50, scale=10, size=10_000)

# Create fitter
fitter = DistributionFitter(spark)
df = spark.createDataFrame([(float(x),) for x in data], ["value"])

# Fit distributions
results = fitter.fit(df, column="value")

# Get best fit (by K-S statistic, the default)
best = results.best(n=1)[0]
print(f"Best: {best.distribution} (KS={best.ks_statistic:.4f}, p={best.pvalue:.4f})")

# Plot
fitter.plot(best, df, "value", title="Best Fit Distribution")

Compatibility Matrix

Spark Version Python Versions NumPy Pandas PyArrow
3.5.x 3.11, 3.12 1.24+ (< 2.0) 1.5+ 12.0 - 16.x
4.x 3.12, 3.13 2.0+ 2.2+ 17.0+

Note: Spark 3.5.x does not support NumPy 2.0. If using Spark 3.5 with Python 3.12, ensure setuptools is installed (provides distutils).

API Overview

Fitting Distributions

from spark_bestfit import DistributionFitter

fitter = DistributionFitter(spark, random_seed=123)
results = fitter.fit(
    df,
    column="value",
    bins=100,                    # Number of histogram bins
    support_at_zero=True,        # Only fit non-negative distributions
    enable_sampling=True,        # Enable adaptive sampling
    sample_fraction=0.3,         # Sample 30% of data
    max_distributions=50,        # Limit distributions to fit
)

Multi-Column Fitting

Fit multiple columns efficiently in a single operation:

from spark_bestfit import DistributionFitter

# Create DataFrame with multiple columns
df = spark.createDataFrame([
    (1.0, 10.0, 100.0),
    (2.0, 20.0, 200.0),
    # ...
], ["col_a", "col_b", "col_c"])

fitter = DistributionFitter(spark)

# Fit all columns in one call - shares Spark overhead
results = fitter.fit(df, columns=["col_a", "col_b", "col_c"])

# Get results for a specific column
col_a_results = results.for_column("col_a")
best_a = col_a_results.best(n=1)[0]

# Get best distribution per column
best_per_col = results.best_per_column(n=1)
for col_name, fits in best_per_col.items():
    print(f"{col_name}: {fits[0].distribution} (KS={fits[0].ks_statistic:.4f})")

# List all columns in results
print(results.column_names)  # ['col_a', 'col_b', 'col_c']

Multi-column fitting is more efficient than fitting columns separately because it:

  • Performs a single df.count() call for all columns
  • Shares the data sample across all fitting operations
  • Minimizes Spark job overhead

Benchmark: Fitting 3 columns together is ~1.3× faster than 3 separate fits (4.8s vs 6.5s). See Performance & Scaling for details.

Working with Results

# Get top 5 distributions (by K-S statistic, the default)
top_5 = results.best(n=5)

# Get best by other metrics
best_sse = results.best(n=1, metric="sse")[0]
best_aic = results.best(n=1, metric="aic")[0]
best_ad = results.best(n=1, metric="ad_statistic")[0]

# Filter by goodness-of-fit
good_fits = results.filter(ks_threshold=0.05)        # K-S statistic < 0.05
significant = results.filter(pvalue_threshold=0.05)  # p-value > 0.05
good_ad = results.filter(ad_threshold=1.0)           # A-D statistic < 1.0

# Convert to pandas for analysis
df_pandas = results.df.toPandas()

# Use fitted distribution
samples = best.sample(size=10000)  # Generate samples
pdf_values = best.pdf(x_array)     # Evaluate PDF
cdf_values = best.cdf(x_array)     # Evaluate CDF

# Access all goodness-of-fit metrics
print(f"K-S: {best.ks_statistic}, p-value: {best.pvalue}")
print(f"A-D: {best.ad_statistic}, A-D p-value: {best.ad_pvalue}")

Note: Anderson-Darling p-values are only available for 5 distributions (norm, expon, logistic, gumbel_r, gumbel_l) where scipy has critical value tables. For other distributions, ad_pvalue will be None but ad_statistic is still valid for ranking fits.

Progress Tracking

Monitor long-running fits with the built-in console_progress() utility:

from spark_bestfit.progress import console_progress

results = fitter.fit(df, column="value", progress_callback=console_progress())
print()  # Newline after completion
# Output: Progress: 45/100 tasks (45.0%)

For custom callbacks or tqdm integration:

# Custom callback
def on_progress(completed: int, total: int, percent: float) -> None:
    print(f"\rFitting: {completed}/{total} ({percent:.1f}%)", end="", flush=True)

results = fitter.fit(df, column="value", progress_callback=on_progress)

# tqdm integration
from tqdm import tqdm

pbar = None

def tqdm_callback(completed: int, total: int, percent: float) -> None:
    global pbar
    if pbar is None:
        pbar = tqdm(total=total, desc="Fitting")
    pbar.n = completed
    pbar.refresh()

results = fitter.fit(df, column="value", progress_callback=tqdm_callback)
if pbar:
    pbar.close()

Note: Progress percentages may fluctuate during fitting as new Spark stages add tasks. See Progress Tracking for details.

Parameter Confidence Intervals

# Compute 95% bootstrap confidence intervals
ci = best.confidence_intervals(df, column="value", alpha=0.05, n_bootstrap=1000, random_seed=42)

# Display with parameter names
print(f"Distribution: {best.distribution}")
for param, (lower, upper) in ci.items():
    print(f"  {param}: [{lower:.4f}, {upper:.4f}]")

Custom Plotting

fitter.plot(
    best,
    df,
    "value",
    figsize=(16, 10),
    dpi=300,
    histogram_alpha=0.6,
    pdf_linewidth=3,
    title="Distribution Fit",
    xlabel="Value",
    ylabel="Density",
    save_path="output/distribution.png",
)

Q-Q Plots

# Create Q-Q plot for goodness-of-fit assessment
fitter.plot_qq(
    best,
    df,
    "value",
    max_points=1000,           # Sample size for plotting
    title="Q-Q Plot",
    save_path="output/qq_plot.png",
)

P-P Plots

# Create P-P plot for goodness-of-fit assessment
fitter.plot_pp(
    best,
    df,
    "value",
    max_points=1000,           # Sample size for plotting
    title="P-P Plot",
    save_path="output/pp_plot.png",
)

Discrete Distributions

For count data (integers), use DiscreteDistributionFitter:

from spark_bestfit import DiscreteDistributionFitter
import numpy as np

# Generate count data
data = np.random.poisson(lam=7, size=10_000)
df = spark.createDataFrame([(int(x),) for x in data], ["counts"])

# Fit discrete distributions
fitter = DiscreteDistributionFitter(spark)
results = fitter.fit(df, column="counts")

# Get best fit - use AIC for model selection (recommended for discrete)
best = results.best(n=1, metric="aic")[0]
print(f"Best: {best.distribution} (AIC={best.aic:.2f})")

# Plot fitted PMF
fitter.plot(best, df, "counts", title="Best Discrete Fit")

Metric Selection for Discrete Distributions:

Metric Use Case
aic Recommended - Proper model selection criterion with complexity penalty
bic Similar to AIC but stronger penalty for complex models
ks_statistic Valid for ranking fits, but p-values are not reliable for discrete data
ad_statistic Valid for ranking fits (not computed for discrete distributions)
sse Simple comparison metric

Note: The K-S and A-D tests assume continuous distributions. For discrete data, the K-S statistic can still rank fits, but p-values are conservative and should not be used for hypothesis testing. A-D statistics are not computed for discrete distributions. Use AIC/BIC for proper model selection.

Excluding Distributions

from spark_bestfit import DistributionFitter, DEFAULT_EXCLUDED_DISTRIBUTIONS

# View default exclusions
print(DEFAULT_EXCLUDED_DISTRIBUTIONS)

# Include a specific distribution by removing it from exclusions
exclusions = tuple(d for d in DEFAULT_EXCLUDED_DISTRIBUTIONS if d != "wald")
fitter = DistributionFitter(spark, excluded_distributions=exclusions)

# Or exclude nothing (fit all distributions - may be slow)
fitter = DistributionFitter(spark, excluded_distributions=())

Documentation

Full documentation is available at spark-bestfit.readthedocs.io.

Contributing

Contributions are welcome! Please read our Contributing Guide and Code of Conduct before submitting a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feat/amazing-feature)
  3. Commit your changes (git commit -m 'feat: add amazing feature')
  4. Push to the branch (git push origin feat/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spark_bestfit-1.2.0.tar.gz (1.2 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spark_bestfit-1.2.0-py3-none-any.whl (45.1 kB view details)

Uploaded Python 3

File details

Details for the file spark_bestfit-1.2.0.tar.gz.

File metadata

  • Download URL: spark_bestfit-1.2.0.tar.gz
  • Upload date:
  • Size: 1.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spark_bestfit-1.2.0.tar.gz
Algorithm Hash digest
SHA256 9c65e0fffa24a14a6177604f085844fbe656cc2d3f8e3a271d563750e5690707
MD5 8f7a6d718ca127db298425ab76bc34b3
BLAKE2b-256 a0dd23eb61f4da68357f21da3456cca64f9dd18aeaad2553953c5fdb823703ea

See more details on using hashes here.

Provenance

The following attestation bundles were made for spark_bestfit-1.2.0.tar.gz:

Publisher: release.yml on dwsmith1983/spark-bestfit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spark_bestfit-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: spark_bestfit-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 45.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spark_bestfit-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e71ddaba2469f6207d2cb7bb5dbd9b073f9f64a96bc57c415798c483a48a3712
MD5 3ef6fe3ab54c94da347b8e54349c5494
BLAKE2b-256 80e7d0016a7a54a72461506e072b61894d2e8660824dac3a0b796ddfa7e4a579

See more details on using hashes here.

Provenance

The following attestation bundles were made for spark_bestfit-1.2.0-py3-none-any.whl:

Publisher: release.yml on dwsmith1983/spark-bestfit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page