AutoMapper for Spark
Project description
SparkAutoMapper
Fluent API to map data from one view to another in Spark.
Uses native Spark functions underneath so it is just as fast as hand writing the transformations.
Since this is just Python, you can use any Python editor. Since everything is typed using Python typings, most editors will auto-complete and warn you when you do something wrong
Usage
pip install sparkautomapper
SparkAutoMapper input and output
You can pass either a dataframe to SparkAutoMapper or specify the name of a Spark view to read from.
You can receive the result as a dataframe or (optionally) pass in the name of a view where you want the result.
Dynamic Typing Examples
Set a column in destination to a text value (read from pass in data frame and return the result in a new dataframe)
Set a column in destination to a text value
from spark_auto_mapper.automappers.automapper import AutoMapper
mapper = AutoMapper(
keys=["member_id"]
).columns(
dst1="hello"
)
Set a column in destination to a text value (read from a Spark view and put result in another Spark view)
Set a column in destination to a text value
from spark_auto_mapper.automappers.automapper import AutoMapper
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst1="hello"
)
Set a column in destination to an int value
Set a column in destination to a text value
from spark_auto_mapper.automappers.automapper import AutoMapper
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst1=1050
)
Copy a column (src1) from source_view to destination view column (dst1)
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst1=A.column("src1")
)
Or you can use the shortcut for specifying a column (wrap column name in [])
from spark_auto_mapper.automappers.automapper import AutoMapper
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst1="[src1]"
)
Convert data type for a column (or string literal)
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
birthDate=A.date(A.column("date_of_birth"))
)
Use a Spark SQL Expression (Any valid Spark SQL expression can be used)
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
gender=A.expression(
"""
CASE
WHEN `Member Sex` = 'F' THEN 'female'
WHEN `Member Sex` = 'M' THEN 'male'
ELSE 'other'
END
"""
)
)
Specify multiple transformations
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst1="[src1]",
birthDate=A.date("[date_of_birth]"),
gender=A.expression(
"""
CASE
WHEN `Member Sex` = 'F' THEN 'female'
WHEN `Member Sex` = 'M' THEN 'male'
ELSE 'other'
END
"""
)
)
Use variables or parameters
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
def mapping(parameters: dict):
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst1=A.column(parameters["my_column_name"])
)
Use conditional logic
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
def mapping(parameters: dict):
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst1=A.column(parameters["my_column_name"])
)
if parameters["customer"] == "Microsoft":
mapper = mapper.columns(
important_customer=1,
customer_name=parameters["customer"]
)
return mapper
Using nested array columns
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).withColumn(
dst2=A.list(
[
"address1",
"address2"
]
)
)
Using nested struct columns
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst2=A.complex(
use="usual",
family="imran"
)
)
Using lists of structs
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapper(
view="members",
source_view="patients",
keys=["member_id"]
).columns(
dst2=A.list(
[
A.complex(
use="usual",
family="imran"
),
A.complex(
use="usual",
family="[last_name]"
)
]
)
)
Executing the AutoMapper
spark.createDataFrame(
[
(1, 'Qureshi', 'Imran'),
(2, 'Vidal', 'Michael'),
],
['member_id', 'last_name', 'first_name']
).createOrReplaceTempView("patients")
source_df: DataFrame = spark.table("patients")
df = source_df.select("member_id")
df.createOrReplaceTempView("members")
result_df: DataFrame = mapper.transform(df=df)
Statically Typed Examples
To improve the auto-complete and syntax checking even more, you can define Complex types:
Define a custom data type:
from spark_auto_mapper.type_definitions.automapper_defined_types import AutoMapperTextInputType
from spark_auto_mapper.helpers.automapper_value_parser import AutoMapperValueParser
from spark_auto_mapper.data_types.date import AutoMapperDateDataType
from spark_auto_mapper.data_types.list import AutoMapperList
from spark_auto_mapper_fhir.fhir_types.automapper_fhir_data_type_complex_base import AutoMapperFhirDataTypeComplexBase
class AutoMapperFhirDataTypePatient(AutoMapperFhirDataTypeComplexBase):
# noinspection PyPep8Naming
def __init__(self,
id_: AutoMapperTextInputType,
birthDate: AutoMapperDateDataType,
name: AutoMapperList,
gender: AutoMapperTextInputType
) -> None:
super().__init__()
self.value = dict(
id=AutoMapperValueParser.parse_value(id_),
birthDate=AutoMapperValueParser.parse_value(birthDate),
name=AutoMapperValueParser.parse_value(name),
gender=AutoMapperValueParser.parse_value(gender)
)
Now you get auto-complete and syntax checking:
from spark_auto_mapper.automappers.automapper import AutoMapper
from spark_auto_mapper.helpers.automapper_helpers import AutoMapperHelpers as A
mapper = AutoMapperFhir(
view="members",
source_view="patients",
keys=["member_id"]
).withResource(
resource=F.patient(
id_=A.column("a.member_id"),
birthDate=A.date(
A.column("date_of_birth")
),
name=A.list(
F.human_name(
use="usual",
family=A.column("last_name")
)
),
gender="female"
)
)
Publishing a new package
- Edit VERSION to increment the version
- Create a new release
- The GitHub Action should automatically kick in and publish the package
- You can see the status in the Actions tab
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file sparkautomapper-0.1.78.tar.gz
.
File metadata
- Download URL: sparkautomapper-0.1.78.tar.gz
- Upload date:
- Size: 33.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.6.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 14117092034982dfbbaedb59f100c58c4a2b30bc5c10158543b45adedf33a2c0 |
|
MD5 | f5f9644f2a3cdee899d430ec2c5e893b |
|
BLAKE2b-256 | b87fc81c3d0c5e0ff8ee3634e5eaa61fd288912cf8b5fd70c8f7429dd17d2aad |
File details
Details for the file sparkautomapper-0.1.78-py3-none-any.whl
.
File metadata
- Download URL: sparkautomapper-0.1.78-py3-none-any.whl
- Upload date:
- Size: 95.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.6.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bd4faef00a458f6411c4b826eb60bc14a81f1bfa62565e5c151269f04e8e421e |
|
MD5 | d07f46b863f75827d07ffd177a8e6471 |
|
BLAKE2b-256 | 1e9a66654de1d21fbce78bd92de32edf6fa2dfa3591a517e974a640e3c0d6ddb |