Skip to main content

Spalah is a set of PySpark dataframe helpers

Project description

spalah

Spalah is a set of python helpers to deal with PySpark dataframes, transformations, schemas and Delta Tables.

The word "spalah" means "spark" in Ukrainian 🇺🇦

Installation

Use the package manager pip to install spalah.

pip install spalah

Examples of use

Spalah currently has two different groups of helpers: dataframe and datalake.

spalah.dataframe

slice_dataframe

from spalah.dataframe import slice_dataframe

df = spark.sql(
    'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df.printSchema()

""" output:
root
 |-- ID: integer (nullable = false)
 |-- Name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)
 |    |-- Line2: string (nullable = false)
"""

# Create a new dataframe by cutting of root and nested attributes
df_result = slice_dataframe(
    input_dataframe=df,
    columns_to_include=["Name", "Address"],
    columns_to_exclude=["Address.Line2"]
)
df_result.printSchema()

""" output:
root
 |-- Name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)
"""

Beside of nested regular structs it also supported slicing of structs in arrays, including multiple levels of nesting

flatten_schema

from spalah.dataframe import flatten_schema

# Pass the sample dataframe to get the list of all attributes as single dimension list
flatten_schema(df_complex_schema.schema)

""" output:
['ID', 'Name', 'Address.Line1', 'Address.Line2']
"""


# Alternatively, the function can return data types of the attributes
flatten_schema(
    schema=df_complex_schema.schema,
    include_datatype=True
)

""" output:
[
    ('ID', 'IntegerType'),
    ('Name', 'StringType'),
    ('Address.Line1', 'StringType'),
    ('Address.Line2', 'StringType')
]
"""

script_dataframe

from spalah.dataframe import script_dataframe

script = script_dataframe(df)

print(script)

""" output:
from pyspark.sql import Row
import datetime
from decimal import Decimal
from pyspark.sql.types import *

# Scripted data and schema:
__data = [Row(ID=1, Name='John', Address=Row(Line1='line1', Line2='line2'))]

__schema = {'type': 'struct', 'fields': [{'name': 'ID', 'type': 'integer', 'nullable': False, 'metadata': {}}, {'name': 'Name', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'Address', 'type': {'type': 'struct', 'fields': [{'name': 'Line1', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'Line2', 'type': 'string', 'nullable': False, 'metadata': {}}]}, 'nullable': False, 'metadata': {}}]}

outcome_dataframe = spark.createDataFrame(__data, StructType.fromJson(__schema))
"""

SchemaComparer

from spalah.dataframe import SchemaComparer

schema_comparer = SchemaComparer(
    source_schema = df_source.schema,
    target_schema = df_target.schema
)

schema_comparer.compare()

# The comparison results are stored in the class instance properties `matched` and `not_matched`

# Contains a list of matched columns:
schema_comparer.matched

""" output:
[MatchedColumn(name='Address.Line1',  data_type='StringType')]
"""

# Contains a list of all not matched columns with a reason as description of non-match:
schema_comparer.not_matched

""" output:
[
    NotMatchedColumn(
        name='name', 
        data_type='StringType', 
        reason="The column exists in source and target schemas but it's name is case-mismatched"
    ),
    NotMatchedColumn(
        name='ID', 
        data_type='IntegerType <=> StringType', 
        reason='The column exists in source and target schemas but it is not matched by a data type'
    ),
    NotMatchedColumn(
        name='Address.Line2', 
        data_type='StringType', 
        reason='The column exists only in the source schema'
    )
]
"""

spalah.dataset

Get delta table properties

from spalah.dataset import DeltaTableConfig

dp = DeltaTableConfig(table_path="/path/dataset")

print(dp.properties) 

# output: 
# {'delta.deletedFileRetentionDuration': 'interval 15 days'}

Set delta table properties

rom spalah.dataset import DeltaTableConfig

dp = DeltaTableConfig(table_path="/path/dataset")

dp.properties = {
    "delta.logRetentionDuration": "interval 10 days",
    "delta.deletedFileRetentionDuration": "interval 15 days"
}

and the standard output is:

2023-05-20 18:27:42,070 INFO      Applying check constraints on 'delta.`/tmp/nested_schema_dataset`':
2023-05-20 18:27:42,071 INFO      Checking if constraint 'id_is_not_null' was already set on delta.`/tmp/nested_schema_dataset`
2023-05-20 18:27:42,433 INFO      The constraint id_is_not_null has been successfully added to 'delta.`/tmp/nested_schema_dataset`

Please note that check constraints can be retrieved and set using property: .check_constraints

Check for more information in examples: dataframe, examples: datalake pages and related notebook

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spalah-1.0.6.tar.gz (13.7 kB view details)

Uploaded Source

Built Distribution

spalah-1.0.6-py3-none-any.whl (13.9 kB view details)

Uploaded Python 3

File details

Details for the file spalah-1.0.6.tar.gz.

File metadata

  • Download URL: spalah-1.0.6.tar.gz
  • Upload date:
  • Size: 13.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.9.6 readme-renderer/42.0 requests/2.31.0 requests-toolbelt/1.0.0 urllib3/2.1.0 tqdm/4.66.1 importlib-metadata/7.0.1 keyring/24.3.0 rfc3986/2.0.0 colorama/0.4.6 CPython/3.11.6

File hashes

Hashes for spalah-1.0.6.tar.gz
Algorithm Hash digest
SHA256 83eb9b04b1f2e08d2f30e05eac78bf3df4af702681fc9553d9b51f0916e3bfb7
MD5 379c69c838cb629a33d1bb14a5c77bb8
BLAKE2b-256 2871d26d864786463ff3a48b6c17fcd606a8483da79706c597947210838b28f6

See more details on using hashes here.

File details

Details for the file spalah-1.0.6-py3-none-any.whl.

File metadata

  • Download URL: spalah-1.0.6-py3-none-any.whl
  • Upload date:
  • Size: 13.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.9.6 readme-renderer/42.0 requests/2.31.0 requests-toolbelt/1.0.0 urllib3/2.1.0 tqdm/4.66.1 importlib-metadata/7.0.1 keyring/24.3.0 rfc3986/2.0.0 colorama/0.4.6 CPython/3.11.6

File hashes

Hashes for spalah-1.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 95299b08de4706f431e24676abc18b5fbcbc61912fa7472d14a662a28df7b738
MD5 0dff9e19c8ade2b5da28d19b5f3a011f
BLAKE2b-256 23ac330ff9a8c4516c19e3779ffcc98da7da753c1db6d8f5593fe582058b1d4a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page