Skip to main content

AutoMapper for Spark

Project description

Build and Test

Upload Python Package

Known Vulnerabilities

SparkAutoMapper

Fluent API to map data from one view to another in Spark.

Uses native Spark functions underneath so it is just as fast as hand writing the transformations.

Since this is just Python, you can use any Python editor. Since everything is typed using Python typings, most editors will auto-complete and warn you when you do something wrong

Usage

pip install sparkautomapper

Documentation

https://icanbwell.github.io/SparkAutoMapper/

SparkAutoMapper input and output

You can pass either a dataframe to SparkAutoMapper or specify the name of a Spark view to read from.

You can receive the result as a dataframe or (optionally) pass in the name of a view where you want the result.

Dynamic Typing Examples

Set a column in destination to a text value (read from pass in data frame and return the result in a new dataframe)

Set a column in destination to a text value

from spark_auto_mapper.automappers.automapper import AutoMapper

mapper = AutoMapper(
    keys=["member_id"]
).columns(
    dst1="hello"
)

Set a column in destination to a text value (read from a Spark view and put result in another Spark view)

Set a column in destination to a text value

from spark_auto_mapper.automappers.automapper import AutoMapper

mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    dst1="hello"
)

Set a column in destination to an int value

Set a column in destination to a text value

from spark_auto_mapper.automappers.automapper import AutoMapper

mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    dst1=1050
)

Copy a column (src1) from source_view to destination view column (dst1)

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A

mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    dst1=A.column("src1")
)

Or you can use the shortcut for specifying a column (wrap column name in [])

from spark_auto_mapper.automappers.automapper import AutoMapper

mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    dst1="[src1]"
)

Convert data type for a column (or string literal)

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A

mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    birthDate=A.date(A.column("date_of_birth"))
)

Use a Spark SQL Expression (Any valid Spark SQL expression can be used)

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A

mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    gender=A.expression(
    """
    CASE
        WHEN `Member Sex` = 'F' THEN 'female'
        WHEN `Member Sex` = 'M' THEN 'male'
        ELSE 'other'
    END
    """
    )
)

Specify multiple transformations

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A

mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    dst1="[src1]",
    birthDate=A.date("[date_of_birth]"),
    gender=A.expression(
                """
    CASE
        WHEN `Member Sex` = 'F' THEN 'female'
        WHEN `Member Sex` = 'M' THEN 'male'
        ELSE 'other'
    END
    """
    )
)

Use variables or parameters

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A

def mapping(parameters: dict):
    mapper = AutoMapper(
        view="members",
        source_view="patients",
        keys=["member_id"]
    ).columns(
        dst1=A.column(parameters["my_column_name"])
    )

Use conditional logic

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A

def mapping(parameters: dict):
    mapper = AutoMapper(
        view="members",
        source_view="patients",
        keys=["member_id"]
    ).columns(
        dst1=A.column(parameters["my_column_name"])
    )
    
    if parameters["customer"] == "Microsoft":
        mapper = mapper.columns(
            important_customer=1,
            customer_name=parameters["customer"]
        )
    return mapper

Using nested array columns

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).withColumn(
    dst2=A.list(
        [
            "address1",
            "address2"
        ]
    )
)

Using nested struct columns

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    dst2=A.complex(
        use="usual",
        family="imran"
    )
)

Using lists of structs

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
    view="members",
    source_view="patients",
    keys=["member_id"]
).columns(
    dst2=A.list(
        [
            A.complex(
                use="usual",
                family="imran"
            ),
            A.complex(
                use="usual",
                family="[last_name]"
            )
        ]
    )
)

Executing the AutoMapper

spark.createDataFrame(
    [
        (1, 'Qureshi', 'Imran'),
        (2, 'Vidal', 'Michael'),
    ],
    ['member_id', 'last_name', 'first_name']
).createOrReplaceTempView("patients")

source_df: DataFrame = spark.table("patients")

df = source_df.select("member_id")
df.createOrReplaceTempView("members")

result_df: DataFrame = mapper.transform(df=df)

Statically Typed Examples

To improve the auto-complete and syntax checking even more, you can define Complex types:

Define a custom data type:

from spark_auto_mapper.type_definitions.automapper_defined_types import AutoMapperTextInputType
from spark_auto_mapper.helpers.automapper_value_parser import AutoMapperValueParser
from spark_auto_mapper.data_types.date import AutoMapperDateDataType
from spark_auto_mapper.data_types.list import AutoMapperList
from spark_auto_mapper_fhir.fhir_types.automapper_fhir_data_type_complex_base import AutoMapperFhirDataTypeComplexBase


class AutoMapperFhirDataTypePatient(AutoMapperFhirDataTypeComplexBase):
    # noinspection PyPep8Naming
    def __init__(self,
                 id_: AutoMapperTextInputType,
                 birthDate: AutoMapperDateDataType,
                 name: AutoMapperList,
                 gender: AutoMapperTextInputType
                 ) -> None:
        super().__init__()
        self.value = dict(
            id=AutoMapperValueParser.parse_value(id_),
            birthDate=AutoMapperValueParser.parse_value(birthDate),
            name=AutoMapperValueParser.parse_value(name),
            gender=AutoMapperValueParser.parse_value(gender)
        )

Now you get auto-complete and syntax checking:

from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapperFhir(
    view="members",
    source_view="patients",
    keys=["member_id"]
).withResource(
    resource=F.patient(
        id_=A.column("a.member_id"),
        birthDate=A.date(
            A.column("date_of_birth")
        ),
        name=A.list(
            F.human_name(
                use="usual",
                family=A.column("last_name")
            )
        ),
        gender="female"
    )
)

Publishing a new package

  1. Edit VERSION to increment the version
  2. Create a new release
  3. The GitHub Action should automatically kick in and publish the package
  4. You can see the status in the Actions tab

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkautomapper-1.0.7.tar.gz (65.5 kB view details)

Uploaded Source

Built Distribution

sparkautomapper-1.0.7-py3-none-any.whl (189.2 kB view details)

Uploaded Python 3

File details

Details for the file sparkautomapper-1.0.7.tar.gz.

File metadata

  • Download URL: sparkautomapper-1.0.7.tar.gz
  • Upload date:
  • Size: 65.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/33.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.2.0 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.7.12

File hashes

Hashes for sparkautomapper-1.0.7.tar.gz
Algorithm Hash digest
SHA256 2fa41a63f47f8e06f7e9b0c592f07050802e706b202c3971191be9efd550e7dc
MD5 44d118a741877231b4ef529f144a4b7f
BLAKE2b-256 f5f27648e229c4502bdde147c695c57d5fa8691408a02cb2d6b22dcea1078980

See more details on using hashes here.

File details

Details for the file sparkautomapper-1.0.7-py3-none-any.whl.

File metadata

  • Download URL: sparkautomapper-1.0.7-py3-none-any.whl
  • Upload date:
  • Size: 189.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/33.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.2.0 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.7.12

File hashes

Hashes for sparkautomapper-1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 5eebb4519b1d53546f37684342592dcd8b9257d71acbd79540859941b941b9c3
MD5 8dc10f5a1ac2fb8deea7eec67954d62d
BLAKE2b-256 a1a49efa0140c737959e16f8757a0847cc876f193a687a85fadd290913bb8012

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page