Skip to main content

Plug-and-play Data Quality + Unit Testing for PySpark (batch & streaming) with YAML config, profiling, and optional OpenTelemetry hooks.

Project description

open-spark-dlh-dq

Open source Plug-and-play Data Quality for Apache Spark (Batch + Streaming) with YAML checks, profiling, and OpenTelemetry.


๐Ÿ“Œ Project Overview

open-spark-dlh-dq is an open-source Python library providing a Data Quality (DQ) framework for Apache Spark.

It supports:

  • โœ… Batch & Streaming DQ with declarative YAML suites
  • โœ… Custom checks via Python (dq_check, unit_test)
  • โœ… CLI execution for datasets in directories or Spark DataFrames
  • โœ… Inline checks in PySpark scripts
  • โœ… Format support: Parquet, CSV, Iceberg, Delta, JSON, ORC
  • โœ… Profiler & OpenTelemetry for observability

Built on PySpark, PyDeequ, and Chispa, this library enables robust data validation pipelines.


โœ… Features

  • Batch DQ: Validate static datasets using YAML or inline rules.
  • Streaming DQ: Apply checks on micro-batches via foreachBatch.
  • Custom Checks: Extend with Python functions in user_checks/.
  • CLI Tool: Run suites via sparkdq run --yaml <suite.yml>.
  • Profiler: Generate summary stats and quantiles.
  • OpenTelemetry: Capture spans and traces for test cases.

๐Ÿ“‚ Repository Structure

open-spark-dlh-dq/
โ”œโ”€ pyproject.toml
โ”œโ”€ README.md
โ”œโ”€ LICENSE
โ”‚
โ”œโ”€ sparkdq/
โ”‚  โ”œโ”€ cli/main.py                          # CLI entry point
โ”‚  โ”œโ”€ config/                              # YAML loader, env vars, schema binding
โ”‚  โ”œโ”€ core/                                # Models, registry, Spark session, runner
โ”‚  โ”‚  โ””โ”€ validators/                       # Built-in + custom validator classes
โ”‚  โ”œโ”€ profiling/profiler.py                # Profiling utilities
โ”‚  โ”œโ”€ resources/open_spark_dlh_dq.yml      # Default YAML suite
โ”‚  โ”œโ”€ observability/otel.py                # OpenTelemetry integration
โ”‚  โ””โ”€ integrations/streaming.py            # foreachBatch wrapper
โ”‚
โ”œโ”€ user_checks/                            # User-defined checks
โ”‚  โ””โ”€ example_checks.py
โ”‚
โ”œโ”€ examples/                               # Usage examples
โ”‚  โ”œโ”€ suites/orders_dq.yml
โ”‚  โ”œโ”€ batch_example.py
โ”‚  โ””โ”€ streaming_example.py
โ”‚
โ””โ”€ tests/                                  # Unit tests
   โ”œโ”€ test_yaml_loader.py
   โ”œโ”€ test_chispa_integration.py
   โ”œโ”€ test_pydeequ_integration.py
   โ”œโ”€ test_runner.py
   โ”œโ”€ test_validators.py
   โ”œโ”€ test_validator_contracts.py
   โ””โ”€ test_cli.py


๐Ÿ›  Usage

Run CLI with YAML suite

sparkdq run --yaml ./sparkdq/resources/open_spark_dlh_dq.yml --suite-name orders_dq --format text

Inline checks in PySpark

from sparkdq.core.runner import run_suite
from sparkdq.config.loader import load_yaml_suite

suite = load_yaml_suite("./sparkdq/resources/open_spark_dlh_dq.yml")
df = spark.read.parquet("./data/orders")
run_suite(df, suite)

Streaming example

python examples/streaming_example.py

๐Ÿงฉ Custom Checks

Add Python methods in user_checks/example_checks.py:

from sparkdq.core.registry import dq_check, unit_test

@dq_check("amount_positive")
def amount_positive(df):
    return df.filter(df.amount > 0).count() == df.count()

Reference them in YAML:

test_cases:
  - name: amount_positive
    type: dq_check

๐Ÿ“Š Profiler & OpenTelemetry

Enable profiling and observability in your pipeline:

from sparkdq.profiling.profiler import profile_df
profile_df(df)

OpenTelemetry spans can be enabled via sparkdq/observability/otel.py.


๐Ÿ”จ Build & Publish

Build for PyPI (Windows)

./build.ps1

Build for PyPI (Linux)

./build.sh

Example Repository to understand how to use

https://github.com/aashish72it/spark-test

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

open_sparkdq-0.1.10.tar.gz (1.7 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

open_sparkdq-0.1.10-py3-none-any.whl (1.7 MB view details)

Uploaded Python 3

File details

Details for the file open_sparkdq-0.1.10.tar.gz.

File metadata

  • Download URL: open_sparkdq-0.1.10.tar.gz
  • Upload date:
  • Size: 1.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for open_sparkdq-0.1.10.tar.gz
Algorithm Hash digest
SHA256 b3f265a2a4152086fa2ec1c9cb009a5d447058643d2bc76cc8c91281100f4365
MD5 cd5668c457498fb9b01d7ebb9c5c2160
BLAKE2b-256 8bf05355ba6fbffbbfb1cfaf111cb03f8b5715c7f04142a67114a8248a2dfeb8

See more details on using hashes here.

File details

Details for the file open_sparkdq-0.1.10-py3-none-any.whl.

File metadata

  • Download URL: open_sparkdq-0.1.10-py3-none-any.whl
  • Upload date:
  • Size: 1.7 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for open_sparkdq-0.1.10-py3-none-any.whl
Algorithm Hash digest
SHA256 3fde613ddcc32a25c77dbff79be7db4760f30cf3fcf18d8841a6a284a89e4533
MD5 3b53805a8902cc93e69156bfb7a2516b
BLAKE2b-256 293a331e333e210090499147f6c225a6acc3bdb3c05bb9de9ca452d3c76e7065

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page