Skip to main content

Silex adds more sparks to your project!

Project description

Silex

Add more 🔥 to Apache Spark!

Python Manager: Poetry Test: BDD Test: Doctest

Code style: Black Imports: isort Linter: Flake8 try/except style: tryceratops

Typing: MyPy Security: Bandit

Git: Pre-commit Git: Conventional Version: Semantic

Apache Spark

TLDR

Silex is a Data Engineering library to extend PySpark.

You don't need another class, just use PySpark as usual and you have new functions to your DataFrames!

import silex
from pyspark.sql import DataFrame

# extends your DataFrame with silex functions!
# if for some reason you don't want to do that, check 'Without extending Dataframes' README section below
silex.extend_dataframes()

df: DataFrame = ...  # your regular Spark DataFrame
df: DataFrame = df.drop_col_if_na()  # new function! and still a regular Spark Dataframe!
# scroll for more information!

Available functions

# assertions (raises an Exception if not met /!\)
def expect_column(self, col: str) -> DataFrame: ...
def expect_columns(self, cols: Union[str, List[str]]) -> DataFrame: ...

def expect_distinct_values_equal_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> DataFrame: ...
def expect_distinct_values_in_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> DataFrame: ...

def expect_min_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...
def expect_avg_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...
def expect_max_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...

def expect_unique_id(self, cols: Union[str, List[str]]) -> DataFrame: ...

# boolean checks
def has_column(self, col: str) -> bool: ...
def has_columns(self, cols: Union[str, List[str]]) -> bool: ...

def has_distinct_values_equal_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> bool: ...
def has_distinct_values_in_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> bool: ...

def has_min_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...
def has_avg_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...
def has_max_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...

def has_unique_id(self, cols: Union[str, List[str]]) -> bool: ...

# dates
def with_date_column(self, col: str, fmt: str, new_col: Optional[str] = None) -> DataFrame: ...

# drop
def drop_col_if_na(self, max: int) -> DataFrame: ...
def drop_col_if_not_distinct(self, min: int) -> DataFrame: ...

# filters
def filter_on_range(self, col: str, from_: Any, to: Any, ...) -> DataFrame: ...

# joins
def join_closest_date(self, other: DataFrame, ...) -> DataFrame: ...

Getting started

Pre-requisites

  • Python 3.8 or above
  • Spark 3 or above

Installation

pip install < # TODO >

Usage

By extending DataFrames! ⚡

import silex
from pyspark.sql import DataFrame, SparkSession

# extends your DataFrame with silex functions!
# if for some reason you don't want to do that, check next example
silex.extend_dataframes()

spark = SparkSession.builder.getOrCreate()

data = [
    (0, "2022-01-01", "a", 1.0),
    (1, "2022-02-01", "b", 2.0),
    (2, "2022-03-01", "c", 3.0),
]
df: DataFrame = spark.createDataFrame(data, schema=["id", "date", "text", "value"])

df.show()
# +---+----------+----+-----+
# | id|      date|text|value|
# +---+----------+----+-----+
# |  0|2022-01-01|   a|  1.0|
# |  1|2022-02-01|   b|  2.0|
# |  2|2022-03-01|   c|  3.0|
# +---+----------+----+-----+

df.printSchema()
# root
#  |-- id: long (nullable = true)
#  |-- date: string (nullable = true)
#  |-- text: string (nullable = true)
#  |-- value: double (nullable = true)

df = df.with_date_column(col="date", fmt="yyyy-MM-dd")

df.show()
# +---+----------+----+-----+
# | id|      date|text|value|
# +---+----------+----+-----+
# |  0|2022-01-01|   a|  1.0|
# |  1|2022-02-01|   b|  2.0|
# |  2|2022-03-01|   c|  3.0|
# +---+----------+----+-----+

df.printSchema()
# root
#  |-- id: long (nullable = true)
#  |-- date: date (nullable = true)
#  |-- text: string (nullable = true)
#  |-- value: double (nullable = true)

Without extending Dataframes 🌧️

from silex.fn.date import with_date_column
from pyspark.sql import DataFrame, SparkSession

spark = SparkSession.builder.getOrCreate()

data = [
    (0, "2022-01-01", "a", 1.0),
    (1, "2022-02-01", "b", 2.0),
    (2, "2022-03-01", "c", 3.0),
]
df: DataFrame = spark.createDataFrame(data, schema=["id", "date", "text", "value"])

df.show()
# +---+----------+----+-----+
# | id|      date|text|value|
# +---+----------+----+-----+
# |  0|2022-01-01|   a|  1.0|
# |  1|2022-02-01|   b|  2.0|
# |  2|2022-03-01|   c|  3.0|
# +---+----------+----+-----+

df.printSchema()
# root
#  |-- id: long (nullable = true)
#  |-- date: string (nullable = true)
#  |-- text: string (nullable = true)
#  |-- value: double (nullable = true)

df = with_date_column(df=df, col="date", fmt="yyyy-MM-dd")

df.show()
# +---+----------+----+-----+
# | id|      date|text|value|
# +---+----------+----+-----+
# |  0|2022-01-01|   a|  1.0|
# |  1|2022-02-01|   b|  2.0|
# |  2|2022-03-01|   c|  3.0|
# +---+----------+----+-----+

df.printSchema()
# root
#  |-- id: long (nullable = true)
#  |-- date: date (nullable = true)
#  |-- text: string (nullable = true)
#  |-- value: double (nullable = true)

Contributing

# install poetry and python 3.8, using pyenv for instance

cd silex
poetry env use path/to/python3.8  # e.g. ~/.pyenv/versions/3.8.12/bin/python
poetry shell
poetry install
pre-commit install

make help
# or open Makefile to learn about available commands for development

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spark-silex-0.2.0.tar.gz (10.2 kB view hashes)

Uploaded Source

Built Distribution

spark_silex-0.2.0-py3-none-any.whl (11.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page