PyDeequ2 - aws clone
Project description
PyDeequ
PyDeequ is a Python API for Deequ, a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets. PyDeequ is written to support usage of Deequ in Python.
There are 4 main components of Deequ, and they are:
- Metrics Computation:
Profiles
leverages Analyzers to analyze each column of a dataset.Analyzers
serve here as a foundational module that computes metrics for data profiling and validation at scale.
- Constraint Suggestion:
- Specify rules for various groups of Analyzers to be run over a dataset to return back a collection of constraints suggested to run in a Verification Suite.
- Constraint Verification:
- Perform data validation on a dataset with respect to various constraints set by you.
- Metrics Repository
- Allows for persistence and tracking of Deequ runs over time.
🎉 Announcements 🎉
- With PyDeequ v0.1.8+, we now officially support Spark3 ! Just make sure you have an environment variable
SPARK_VERSION
to specify your Spark version! - We've release a blogpost on integrating PyDeequ onto AWS leveraging services such as AWS Glue, Athena, and SageMaker! Check it out: Monitor data quality in your data lake using PyDeequ and AWS Glue.
- Check out the PyDeequ Release Announcement Blogpost with a tutorial walkthrough the Amazon Reviews dataset!
- Join the PyDeequ community on PyDeequ Slack to chat with the devs!
Quickstart
The following will quickstart you with some basic usage. For more in-depth examples, take a look in the tutorials/
directory for executable Jupyter notebooks of each module. For documentation on supported interfaces, view the documentation
.
Installation
You can install PyDeequ via pip.
pip install pydeequ
Set up a PySpark session
from pyspark.sql import SparkSession, Row
import pydeequ
spark = (SparkSession
.builder
.config("spark.jars.packages", pydeequ.deequ_maven_coord)
.config("spark.jars.excludes", pydeequ.f2j_maven_coord)
.getOrCreate())
df = spark.sparkContext.parallelize([
Row(a="foo", b=1, c=5),
Row(a="bar", b=2, c=6),
Row(a="baz", b=3, c=None)]).toDF()
Analyzers
from pydeequ2.analyzers import *
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("b")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
Profile
from pydeequ2.profiles import *
result = ColumnProfilerRunner(spark) \
.onData(df) \
.run()
for col, profile in result.profiles.items():
print(profile)
Constraint Suggestions
from pydeequ2.suggestions import *
suggestionResult = ConstraintSuggestionRunner(spark) \
.onData(df) \
.addConstraintRule(DEFAULT()) \
.run()
# Constraint Suggestions in JSON format
print(suggestionResult)
Constraint Verification
from pydeequ2.checks import *
from pydeequ2.verification import *
check = Check(spark, CheckLevel.Warning, "Review Check")
checkResult = VerificationSuite(spark) \
.onData(df) \
.addCheck(
check.hasSize(lambda x: x >= 3) \
.hasMin("b", lambda x: x == 0) \
.isComplete("c") \
.isUnique("a") \
.isContainedIn("a", ["foo", "bar", "baz"]) \
.isNonNegative("b")) \
.run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()
Repository
Save to a Metrics Repository by adding the useRepository()
and saveOrAppendResult()
calls to your Analysis Runner.
from pydeequ2.repository import *
from pydeequ2.analyzers import *
metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json')
repository = FileSystemMetricsRepository(spark, metrics_file)
key_tags = {'tag': 'pydeequ hello world'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(ApproxCountDistinct('b')) \
.useRepository(repository) \
.saveOrAppendResult(resultKey) \
.run()
To load previous runs, use the repository
object to load previous results back in.
result_metrep_df = repository.load() \
.before(ResultKey.current_milli_time()) \
.forAnalyzers([ApproxCountDistinct('b')]) \
.getSuccessMetricsAsDataFrame()
Wrapping up
After you've ran your jobs with PyDeequ, be sure to shut down your Spark session to prevent any hanging processes.
spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()
Contributing
Please refer to the contributing doc for how to contribute to PyDeequ.
License
This library is licensed under the Apache 2.0 License.
Contributing Developer Setup
- Setup SDKMAN
- Setup Java
- Setup Apache Spark
- Install Poetry
- Run tests locally
Setup SDKMAN
SDKMAN is a tool for managing parallel Versions of multiple Software Development Kits on any Unix based system. It provides a convenient command line interface for installing, switching, removing and listing Candidates. SDKMAN! installs smoothly on Mac OSX, Linux, WSL, Cygwin, etc... Support Bash and ZSH shells. See documentation on the SDKMAN! website.
Open your favourite terminal and enter the following:
$ curl -s https://get.sdkman.io | bash
If the environment needs tweaking for SDKMAN to be installed,
the installer will prompt you accordingly and ask you to restart.
Next, open a new terminal or enter:
$ source "$HOME/.sdkman/bin/sdkman-init.sh"
Lastly, run the following code snippet to ensure that installation succeeded:
$ sdk version
Setup Java
Install Java Now open favourite terminal and enter the following:
List the AdoptOpenJDK OpenJDK versions
$ sdk list java
To install For Java 11
$ sdk install java 11.0.10.hs-adpt
To install For Java 11
$ sdk install java 8.0.292.hs-adpt
Setup Apache Spark
Install Java Now open favourite terminal and enter the following:
List the Apache Spark versions:
$ sdk list spark
To install For Spark 3
$ sdk install spark 3.0.2
Poetry
Poetry Commands
poetry install
poetry update
# --tree: List the dependencies as a tree.
# --latest (-l): Show the latest version.
# --outdated (-o): Show the latest version but only for packages that are outdated.
poetry show -o
Running Tests Locally
Take a look at tests in tests/dataquality
and tests/jobs
$ poetry run pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pydeequ2-1.0.2.tar.gz
.
File metadata
- Download URL: pydeequ2-1.0.2.tar.gz
- Upload date:
- Size: 36.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.7.9 Darwin/22.1.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bbeeb74522a0f7dbca7e4826e1be5e54826fce7938eb4015df7ce5d1195abd36 |
|
MD5 | 73fd6881cb133dffe9a885cb9415caf2 |
|
BLAKE2b-256 | ee019c92a70fcb4507128d2df96b7a8825d1a8a21d58eed05f329a860e7a953c |
File details
Details for the file pydeequ2-1.0.2-py3-none-any.whl
.
File metadata
- Download URL: pydeequ2-1.0.2-py3-none-any.whl
- Upload date:
- Size: 36.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.7.9 Darwin/22.1.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bd05bf6c9bd036a54ec0a16782cd180fd5bb1cf859dbbe6c37ff67d7dc722f77 |
|
MD5 | 06511acc7b582a342d2967c382ce778d |
|
BLAKE2b-256 | 12a0e6b75268af6021b6f6c78640be05161d16549f6738cb34b50a8f42cc3539 |