Skip to main content

Utility functions to manipulate nested structures using pyspark

Project description

Python tests License PyPI version PYPI - Downloads PYPI - Python Version

Nested fields transformation for pyspark

Motivation

Applying transformations for nested structures in spark is tricky. Assuming we have JSON data with highly nested structure:

[
  {
    "data": {
      "city": {
        "addresses": [
          {
            "id": "my-id"
          },
          {
            "id": "my-id2"
          }
        ]
      }
    }
  }
]

To hash nested "id" field you need to write following spark code

import pyspark.sql.functions as F

hashed = df.withColumn("data",
                       (F.col("data")
                        .withField("city", F.col("data.city")
                                   .withField("addresses", F.transform("data.city.addresses",
                                                                       lambda c: c.withField("id",
                                                                                             F.sha2(c.getField("id"),
                                                                                                    256)))))))

With the library the code above could be simplified to

import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from nestedfunctions.functions.terminal_operations import apply_terminal_operation

hashed = apply_terminal_operation(df, "data.city.addresses.id", lambda c, t: F.sha2(c.cast(StringType()), 256))

Instead of dealing of nested transformation functions you could specify terminal operation as 'lambda' and field hierarchy in flat format and library will generate spark codebase for you.

Install

To install the current release

$ pip install pyspark-nested-functions

Available functions

Add nested field

Adding a nested field called new_column_name based on a lambda function working on the column_to_process nested field. Fields column_to_process and new_column_name need have the same parent or be at the root!

from nestedfunctions.functions.add_nested_field import add_nested_field
from pyspark.sql.functions import when
processed = add_nested_field(
      df,
      column_to_process="payload.array.booleanField",
      new_column_name="payload.filingPacket.booleanFieldAsString",
      f=lambda column: when(column, "Y").when(~column, "N").otherwise(""),
  )

Date Format

Format a nested date field from to current_date_format to target_date_format.

from nestedfunctions.functions.date_format import format_date
date_formatted_df = format_date(
      df,
      field="customDimensions.value",
      current_date_format="y-d-M",
      target_date_format="y-MM"
  )

Drop

Recursively drop fields on any nested level (including child)

from nestedfunctions.functions.drop import drop

dropped_df = drop(df, field="root_level.children1.children2")

Duplicate

Duplicate the nested field column_to_duplicate as duplicated_column_name. Fields column_to_duplicate and duplicated_column_name need have the same parent or be at the root!

from nestedfunctions.functions.duplicate import duplicate
duplicated_df = duplicate(
      df,
      column_to_duplicate="payload.lineItems.comments",
      duplicated_column_name="payload.lineItems.commentsDuplicate"
  )
# Usage details not available in the provided workspace

Expr

Add or overwrite a nested field based on an expression.

from nestedfunctions.functions.expr import expr
field = "emails.unverified"
processed = expr(df, field=field, expr=f"transform({field}, x -> (upper(x)))")

Field Rename

Rename all the fields based on any rename function.

from nestedfunctions.functions.field_rename import rename
def capitalize_field_name(field_name: str, suffix: str) -> str:
  return field_name.upper()
renamed_df = rename(df, rename_func=capitalize_field_name())

Fillna

This function mimics the vanilla pyspark fillna functionality with added support for filling nested fields. The use of the input parameters value and subset is exactly the same as for the vanilla pyspark implementation.

from nestedfunctions.functions.fillna import fillna
# Fill all null boolean fields with False
filled_df = fillna(df, value=False)
# Fill nested field with value
filled_df = fillna(df, subset="payload.lineItems.availability.stores.availableQuantity", value=0)
# To fill array which is null specify list of values
filled_df = fillna(df, value={"payload.comments" : ["Automatically triggered stock check"]})
# To fill elements of array that are null specify single value
filled_df = fillna(df, value={"payload.comments" : "Empty comment"})

Flattener

Return flattened representation of the data frame.

from nestedfunctions.spark_schema.utility import SparkSchemaUtility

flatten_schema = SparkSchemaUtility().flatten_schema(df.schema)
# flatten_schema = ["root-element",
#                   "root-element-array-primitive",
#                   "root-element-array-of-structs.d1.d2",
#                   "nested-structure.n1",
#                   "nested-structure.d1.d2"]

Hash

Replace a nested field by its SHA-2 hash value. By default the number of bits in the output hash value will be 256 but a different value can be set.

from nestedfunctions.functions.hash import hash_field
hashed_df = hash_field(df, "data.city.addresses.id", num_bits=256)

Nullify

Making field null on any nested level

from nestedfunctions.functions.nullify import nullify

nullified_df = nullify(df, field="creditCard.id")

Redact

Replace a field by the default value of its data type. The default value of a data type is typically its min or max value.

from nestedfunctions.functions.redact import redact
redacted_df = redact(df, field="customDimensions.metabolicsConditions")

Whitelist

Preserving all fields listed in parameters. All other fields will be dropped

from nestedfunctions.functions.whitelist import whitelist

whitelisted_df = whitelist(df, ["addresses.postalCode", "creditCard"]) 

License

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_nested_functions-0.1.4.tar.gz (22.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_nested_functions-0.1.4-py3-none-any.whl (31.3 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_nested_functions-0.1.4.tar.gz.

File metadata

  • Download URL: pyspark_nested_functions-0.1.4.tar.gz
  • Upload date:
  • Size: 22.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.7

File hashes

Hashes for pyspark_nested_functions-0.1.4.tar.gz
Algorithm Hash digest
SHA256 76a0a31ac62aec0f0074a55be5e610113ba5a7078da9cd3f916b7ad2529f73c8
MD5 9ddcd8248ebc4599ff3c9c5d8fc349d7
BLAKE2b-256 f68f48ff2416e372f32f1c08f29861c67b41b986e2d7eec41f32a99b154756b1

See more details on using hashes here.

File details

Details for the file pyspark_nested_functions-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_nested_functions-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 0162b46884b2532f5ad21358e70a0f35d2e0fa3262bea612a8dfe8b2d83a4392
MD5 1c4e52695caec469970408c2f3b69fcb
BLAKE2b-256 71d9d173d3b89330ff584de6d916e37e3e4a8bd212006e0c2030d8ed0bcecd3d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page