Spalah is a set of PySpark dataframe helpers
Project description
spalah
Spalah is a set of python helpers to deal with PySpark dataframes, transformations, schemas etc.
The word "spalah" means "spark" in Ukrainian 🇺🇦
Installation
Use the package manager pip to install spalah.
pip install spalah
Examples of use
spalah.datalake
get_delta_properties
from spalah.datalake import get_delta_properties
table_properties = get_delta_properties(table_path="/path/dataset")
print(table_properties)
# output:
# {'delta.deletedFileRetentionDuration': 'interval 15 days'}
set_delta_properties
from spalah.datalake import set_delta_properties
set_delta_properties(
table_path='/path/dataset',
properties={
"delta.logRetentionDuration": "interval 10 days",
"delta.deletedFileRetentionDuration": "interval 15 days"
}
)
and the standard output is:
Applying table properties on 'delta.`/path/dataset`':
- Checking if 'delta.logRetentionDuration = interval 10 days' is set on delta.`/path/dataset`
Result: The property has been set
- Checking if 'delta.deletedFileRetentionDuration = interval 15 days' is set on delta.`/path/dataset`
Result: The property has been set
spalah.dataframe
SchemaComparer
from spalah.dataframe import SchemaComparer
schema_comparer = SchemaComparer(
source_schema = df_source.schema,
target_schema = df_target.schema
)
schema_comparer.compare()
# The comparison results are stored in the class instance properties `matched` and `not_matched`
# Contains a list of matched columns:
schema_comparer.matched
""" output:
[MatchedColumn(name='Address.Line1', data_type='StringType')]
"""
# Contains a list of all not matched columns with a reason as description of non-match:
schema_comparer.not_matched
""" output:
[
NotMatchedColumn(
name='name',
data_type='StringType',
reason="The column exists in source and target schemas but it's name is case-mismatched"
),
NotMatchedColumn(
name='ID',
data_type='IntegerType <=> StringType',
reason='The column exists in source and target schemas but it is not matched by a data type'
),
NotMatchedColumn(
name='Address.Line2',
data_type='StringType',
reason='The column exists only in the source schema'
)
]
"""
flatten_schema
from spalah.dataframe import flatten_schema
# Pass the sample dataframe to get the list of all attributes as single dimension list
flatten_schema(df_complex_schema.schema)
""" output:
['ID', 'Name', 'Address.Line1', 'Address.Line2']
"""
# Alternatively, the function can return data types of the attributes
flatten_schema(
schema=df_complex_schema.schema,
include_datatype=True
)
""" output:
[
('ID', 'IntegerType'),
('Name', 'StringType'),
('Address.Line1', 'StringType'),
('Address.Line2', 'StringType')
]
"""
script_dataframe
from spalah.dataframe import script_dataframe
script = script_dataframe(df)
print(script)
""" output:
from pyspark.sql import Row
import datetime
from decimal import Decimal
from pyspark.sql.types import *
# Scripted data and schema:
__data = [Row(ID=1, Name='John', Address=Row(Line1='line1', Line2='line2'))]
__schema = {'type': 'struct', 'fields': [{'name': 'ID', 'type': 'integer', 'nullable': False, 'metadata': {}}, {'name': 'Name', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'Address', 'type': {'type': 'struct', 'fields': [{'name': 'Line1', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'Line2', 'type': 'string', 'nullable': False, 'metadata': {}}]}, 'nullable': False, 'metadata': {}}]}
outcome_dataframe = spark.createDataFrame(__data, StructType.fromJson(__schema))
"""
slice_dataframe
from spalah.dataframe import slice_dataframe
df = spark.sql(
'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df.printSchema()
""" output:
root
|-- ID: integer (nullable = false)
|-- Name: string (nullable = false)
|-- Address: struct (nullable = false)
| |-- Line1: string (nullable = false)
| |-- Line2: string (nullable = false)
"""
# Create a new dataframe by cutting of root and nested attributes
df_result = slice_dataframe(
input_dataframe=df,
columns_to_include=["Name", "Address"],
columns_to_exclude=["Address.Line2"]
)
df_result.printSchema()
""" output:
root
|-- Name: string (nullable = false)
|-- Address: struct (nullable = false)
| |-- Line1: string (nullable = false)
"""
Check for more information in examples: dataframe, examples: datalake pages and related notebook
Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
spalah-0.4.1.tar.gz
(25.2 kB
view hashes)
Built Distribution
Close
Hashes for spalah-0.4.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 74093cb4f5bd6aad7f7ee5daeabfa2d428683dd50d06724091a4ec39ecdd711d |
|
MD5 | 8f5fdaf0b0425aaef471e7458481ecd9 |
|
BLAKE2b-256 | 6f1bdeb03a4f25af0da5d6716d8c2f973eab397aa4cc7b077e2fa65503365416 |