PySpark dataframes based workflow validator
Project description
Valido 🌩
PySpark âš¡ dataframe workflow âš’ validator
Description
In projects using PySpark, it's very common to have functions that take Spark DataFrames as input or produce them as output. It's hard to figure out quickly what these DataFrames contain. This library offers simple decorators to annotate your functions so that they document themselves and that documentation is kept up-to-date by validating the input and output on runtime.
For example,
@df_in(columns=["Brand", "Price"]) # the function expects a DataFrame as input parameter with columns Brand and Price
@df_out(columns=["Brand", "Price"]) # the function will return a DataFrame with columns Brand and Price
def filter_cars(car_df):
# before this code is executed, the input DataFrame is validated according to the above decorator
# filter some cars..
return filtered_cars_df
Table of Contents
Installation
Install with your favorite Python dependency manager (pip) like
pip install valido
Usage
Start by importing the needed decorators:
from valido import df_in, df_out
To check a DataFrame input to a function, annotate the function with @df_in
. For example the following function
expects to get a DataFrame with columns Brand
and Price
:
@df_in(columns=["Brand", "Price"])
def process_cars(car_df):
# do stuff with cars
If your function takes multiple arguments, specify the field to be checked with it's name
:
@df_in(name="car_df", columns=["Brand", "Price"])
def process_cars(year, style, car_df):
# do stuff with cars
Note: Since this will evaluate it at runtime please use named arguments in the function call like this:
process_cars(year = 2021, style = "Mazda", car_df = mydf)
To check that a function returns a DataFrame with specific columns, use @df_out
decorator:
@df_out(columns=["Brand", "Price"])
def get_all_cars():
# get those cars
return all_cars_df
In case one of the listed columns is missing from the DataFrame, a helpful assertion error is thrown:
AssertionError("Column Price missing from DataFrame. Got columns: ['Brand']")
To check both input and output, just use both annotations on the same function:
@df_in(columns=["Brand", "Price"])
@df_out(columns=["Brand", "Price"])
def filter_cars(car_df):
# filter some cars
return filtered_cars_df
If you want to also check the data types of each column, you can replace the column array:
columns = ["Brand", "Price"]
with a dict:
columns = {"Brand": "string", "Price": "int"}
This will not only check that the specified columns are found from the DataFrame but also that their dtype
is the
expected. In case of a wrong dtype
, an error message similar to following will explain the mismatch:
AssertionError("Column Price has wrong dtype. Was int, expected double")
You can enable strict-mode for both @df_in
and @df_out
. This will raise an error if the DataFrame contains columns
not defined in the annotation:
@df_in(columns=["Brand"], strict=True)
def process_cars(car_df):
# do stuff with cars
will raise an error when car_df
contains columns ["Brand", "Price"]
:
AssertionError: DataFrame contained unexpected column(s): Price
To quickly check what the incoming and outgoing dataframes contain, you can add a @df_log
annotation to the function.
For example adding @df_log
to the above filter_cars
function will product log lines:
Function filter_cars parameters contained a DataFrame: columns: ['Brand', 'Price']
Function filter_cars returned a DataFrame: columns: ['Brand', 'Price']
or with @df_log(include_dtypes=True)
you get:
Function filter_cars parameters contained a DataFrame: columns: ['Brand', 'Price'] with dtypes ['object', 'int64']
Function filter_cars returned a DataFrame: columns: ['Brand', 'Price'] with dtypes ['object', 'int64']
Note:
@df_log(include_dtypes=True)
also takes the name
parameter like df_in
for the multi-param functions validation
Contributing
Contributions are accepted. Include tests in PR's.
Development
To run the tests, clone the repository, install dependencies with pip and run tests with PyTest:
python -m pytest --import-mode=append tests/
License
BSD 3-Clause License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file valido-0.1.0.tar.gz
.
File metadata
- Download URL: valido-0.1.0.tar.gz
- Upload date:
- Size: 8.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.9.0 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1eba63815920e3a2058872ab80d69b98b26566feed1d4cca7ff9cc1642ef37e3 |
|
MD5 | d736f2c7b3d1736f0b596d1a7aef6628 |
|
BLAKE2b-256 | 30c999b08eec74e4eb6e75dcb78e7106e04e5ca6b8146fc51aaeae13483f5422 |
File details
Details for the file valido-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: valido-0.1.0-py3-none-any.whl
- Upload date:
- Size: 7.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.9.0 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ee42e569f12b5602e550af566d5bd300d6616e2dde5395907ac75f7d2ba8e4b3 |
|
MD5 | d95a06297622736d066a4d687c0fd588 |
|
BLAKE2b-256 | f6ff9249408a0ba912242c9d7dd3553afb573211e85bc6c1ffab513876b59648 |