Skip to main content

PyDeequ2 - aws clone

Project description

PyDeequ

PyDeequ is a Python API for Deequ, a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets. PyDeequ is written to support usage of Deequ in Python.

License Coverage

There are 4 main components of Deequ, and they are:

  • Metrics Computation:
    • Profiles leverages Analyzers to analyze each column of a dataset.
    • Analyzers serve here as a foundational module that computes metrics for data profiling and validation at scale.
  • Constraint Suggestion:
    • Specify rules for various groups of Analyzers to be run over a dataset to return back a collection of constraints suggested to run in a Verification Suite.
  • Constraint Verification:
    • Perform data validation on a dataset with respect to various constraints set by you.
  • Metrics Repository
    • Allows for persistence and tracking of Deequ runs over time.

🎉 Announcements 🎉

Quickstart

The following will quickstart you with some basic usage. For more in-depth examples, take a look in the tutorials/ directory for executable Jupyter notebooks of each module. For documentation on supported interfaces, view the documentation.

Installation

You can install PyDeequ via pip.

pip install pydeequ

Set up a PySpark session

from pyspark.sql import SparkSession, Row
import pydeequ

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()

Analyzers

from pydeequ2.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("b")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

Profile

from pydeequ2.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

Constraint Suggestions

from pydeequ2.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(suggestionResult)

Constraint Verification

from pydeequ2.checks import *
from pydeequ2.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("b", lambda x: x == 0) \
        .isComplete("c")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

Repository

Save to a Metrics Repository by adding the useRepository() and saveOrAppendResult() calls to your Analysis Runner.

from pydeequ2.repository import *
from pydeequ2.analyzers import *

metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json')
repository = FileSystemMetricsRepository(spark, metrics_file)
key_tags = {'tag': 'pydeequ hello world'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(ApproxCountDistinct('b')) \
    .useRepository(repository) \
    .saveOrAppendResult(resultKey) \
    .run()

To load previous runs, use the repository object to load previous results back in.

result_metrep_df = repository.load() \
    .before(ResultKey.current_milli_time()) \
    .forAnalyzers([ApproxCountDistinct('b')]) \
    .getSuccessMetricsAsDataFrame()

Wrapping up

After you've ran your jobs with PyDeequ, be sure to shut down your Spark session to prevent any hanging processes.

spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()

Contributing

Please refer to the contributing doc for how to contribute to PyDeequ.

License

This library is licensed under the Apache 2.0 License.


Contributing Developer Setup

  1. Setup SDKMAN
  2. Setup Java
  3. Setup Apache Spark
  4. Install Poetry
  5. Run tests locally

Setup SDKMAN

SDKMAN is a tool for managing parallel Versions of multiple Software Development Kits on any Unix based system. It provides a convenient command line interface for installing, switching, removing and listing Candidates. SDKMAN! installs smoothly on Mac OSX, Linux, WSL, Cygwin, etc... Support Bash and ZSH shells. See documentation on the SDKMAN! website.

Open your favourite terminal and enter the following:

$ curl -s https://get.sdkman.io | bash
If the environment needs tweaking for SDKMAN to be installed,
the installer will prompt you accordingly and ask you to restart.

Next, open a new terminal or enter:

$ source "$HOME/.sdkman/bin/sdkman-init.sh"

Lastly, run the following code snippet to ensure that installation succeeded:

$ sdk version

Setup Java

Install Java Now open favourite terminal and enter the following:

List the AdoptOpenJDK OpenJDK versions
$ sdk list java

To install For Java 11
$ sdk install java 11.0.10.hs-adpt

To install For Java 11
$ sdk install java 8.0.292.hs-adpt

Setup Apache Spark

Install Java Now open favourite terminal and enter the following:

List the Apache Spark versions:
$ sdk list spark

To install For Spark 3
$ sdk install spark 3.0.2

Poetry

Poetry Commands

poetry install

poetry update

# --tree: List the dependencies as a tree.
# --latest (-l): Show the latest version.
# --outdated (-o): Show the latest version but only for packages that are outdated.
poetry show -o

Running Tests Locally

Take a look at tests in tests/dataquality and tests/jobs

$ poetry run pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydeequ2-1.0.2.tar.gz (36.1 kB view details)

Uploaded Source

Built Distribution

pydeequ2-1.0.2-py3-none-any.whl (36.8 kB view details)

Uploaded Python 3

File details

Details for the file pydeequ2-1.0.2.tar.gz.

File metadata

  • Download URL: pydeequ2-1.0.2.tar.gz
  • Upload date:
  • Size: 36.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.3.2 CPython/3.7.9 Darwin/22.1.0

File hashes

Hashes for pydeequ2-1.0.2.tar.gz
Algorithm Hash digest
SHA256 bbeeb74522a0f7dbca7e4826e1be5e54826fce7938eb4015df7ce5d1195abd36
MD5 73fd6881cb133dffe9a885cb9415caf2
BLAKE2b-256 ee019c92a70fcb4507128d2df96b7a8825d1a8a21d58eed05f329a860e7a953c

See more details on using hashes here.

File details

Details for the file pydeequ2-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: pydeequ2-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 36.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.3.2 CPython/3.7.9 Darwin/22.1.0

File hashes

Hashes for pydeequ2-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 bd05bf6c9bd036a54ec0a16782cd180fd5bb1cf859dbbe6c37ff67d7dc722f77
MD5 06511acc7b582a342d2967c382ce778d
BLAKE2b-256 12a0e6b75268af6021b6f6c78640be05161d16549f6738cb34b50a8f42cc3539

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page