Skip to main content

A pydantic -> spark schema library

Project description

SparkDantic

codecov PyPI version

1️⃣ version: 2.7.0

✍️ author: Mitchell Lisle

PySpark Model Conversion Tool

This Python module provides a utility for converting Pydantic models to PySpark schemas. It's implemented as a class named SparkModel that extends the Pydantic's BaseModel.

Features

  • Conversion from Pydantic model to PySpark schema (StructType or JSON)
  • Type coercion
  • PySpark as an optional dependency

Installation

Without PySpark:

pip install sparkdantic

Note: only JSON schema generation features are available without PySpark installed. If you attempt to use PySpark-dependent schema generation features, SparkDantic will check that a supported version of Pyspark is installed.

With PySpark:

pip install "sparkdantic[pyspark]"

Supported PySpark versions

PySpark version 3.3.0 or higher, up to but not including 4.1.0

Usage

Creating a new SparkModel

A SparkModel is a Pydantic model, and you can define one by simply inheriting from SparkModel and defining some fields:

from sparkdantic import SparkModel
from typing import List

class MyModel(SparkModel):
    name: str
    age: int
    hobbies: List[str]

ℹ️ Enums are supported but they must be mixed with either int (IntEnum in Python ≥ 3.10) or str (StrEnum, in Python ≥ 3.11) built-in types:

from enum import Enum

class Switch(int, Enum):
    OFF = 0
    ON = 1

class MyEnumModel(SparkModel):
    switch: Switch

ℹ️ A field can be excluded from the Spark schema using the pydantic's exclude Field attribute. This is useful when e.g. the pydantic model has Spark incompatible fields. Note that exclude is a pydantic field attribute and not a sparkdantic feature. Setting it will exclude the field from any pydantic serialisation/deserialisation.

from pydantic import Field
from sparkdantic import SparkModel
from typing import Any

class MyModel(SparkModel):
    name: str
    age: int
    arbitrary_data: Any = Field(exclude=True)

Running MyModel.model_spark_schema(exclude_fields=True) should return the following schema:

StructType([
    StructField('name', StringType(), False),
    StructField('age', IntegerType(), False)
])

Calling model_spark_schema without the option raises exception due to incompatible types.

Generating a PySpark Schema

Using SparkModel

Pydantic has existing models for generating JSON schemas (with model_json_schema). With a SparkModel you can generate a PySpark schema from the model fields using the model_spark_schema() method:

spark_schema = MyModel.model_spark_schema()

Provides this schema:

StructType([
    StructField('name', StringType(), False),
    StructField('age', IntegerType(), False),
    StructField('hobbies', ArrayType(StringType(), False), False)
])

You can also generate a PySpark-compatible JSON schema from the model fields using the model_json_spark_schema method:

json_spark_schema = MyModel.model_json_spark_schema()

Provides this schema:

{
    "type": "struct",
    "fields": [
        {
            "name": "name",
            "type": "string",
            "nullable": False,
            "metadata": {}
        },
        {
            "name": "age",
            "type": "integer",
            "nullable": False,
            "metadata": {}
        },
        {
            "name": "hobbies",
            "type": {
                "type": "array",
                "elementType": "string",
                "containsNull": False
            },
            "nullable": False,
            "metadata": {}
        }
    ]
}

Using Pydantic BaseModel

You can also generate a PySpark schema for existing Pydantic models using the create_spark_schema function:

from sparkdantic import create_spark_schema, create_json_spark_schema

class EmployeeModel(BaseModel):
    id: int
    first_name: str
    last_name: str
    department_code: str

spark_schema = create_spark_schema(EmployeeModel)
json_spark_schema = create_json_spark_schema(EmployeeModel)

ℹ️ In addition to the automatic type conversion, you can also explicitly coerce data types to Spark native types by setting the spark_type attribute in the SparkField function (which extends the Pydantic Field function), like so: SparkField(spark_type=<datatype>). datatype accepts str (e.g. "bigint") or pyspark.sql.types.DataType (e.g. LongType). This is useful when you want to use a specific data type then the one that Sparkdantic infers by default.

Contributing

Contributions welcome! If you would like to add a new feature / fix a bug feel free to raise a PR and tag me (mitchelllisle) as a reviewer. Please setup your environment locally to ensure all styling and development flow is as close to the standards set in this project as possible. To do this, the main thing you'll need is poetry. You should also run make install-dev-local which will install the pre-commit-hooks as well as install the project locally. PRs won't be accepted without sufficient tests and we will be strict on maintaining a 100% test coverage.

ℹ️ Note that after you have run make install-dev-local and make a commit we run the test suite as part of the pre-commit hook checks. This is to ensure you don't commit code that breaks the tests. This will also try and commit changes to the COVERAGE.txt file so that we can compare coverage in each PR. Please ensure this file is commited with your changes

ℹ️ Versioning: We use bumpversion to maintain the version across various files. If you submit a PR please run bumpversion to the following rules:

  • bumpversion major: If you are making breaking changes (that is, anyone who already uses this library can no longer rely on existing methods / functionality)
  • bumpversion minor: If you are adding functionality or features that maintain existing methods and features
  • bumpversion patch: If you are fixing a bug or making some other small change

Note: ⚠️ You can ignore bumping the version if you like. I periodically do releases of any dependency updates anyway so if you can wait a couple of days for your code to be pushed to PyPi then just submit the change and I'll make sure it's included in the next release. I'll do my best to make sure it's released ASAP after your PR is merged.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkdantic-2.7.0.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkdantic-2.7.0-py3-none-any.whl (10.9 kB view details)

Uploaded Python 3

File details

Details for the file sparkdantic-2.7.0.tar.gz.

File metadata

  • Download URL: sparkdantic-2.7.0.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.14.0 Linux/6.11.0-1018-azure

File hashes

Hashes for sparkdantic-2.7.0.tar.gz
Algorithm Hash digest
SHA256 240e9ef12916a68691d9fea59411503172ca91be0641b82c5c2a47267016e0bb
MD5 9318977b8b1f4b0e0e0edefaec331fdb
BLAKE2b-256 315f6c8f3a0d8e5a2ca44802c3e7f6d0163646763b90be1c78138a91e1a87cc7

See more details on using hashes here.

File details

Details for the file sparkdantic-2.7.0-py3-none-any.whl.

File metadata

  • Download URL: sparkdantic-2.7.0-py3-none-any.whl
  • Upload date:
  • Size: 10.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.14.0 Linux/6.11.0-1018-azure

File hashes

Hashes for sparkdantic-2.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 20311e49378a6d1e92b5b712816645528609a460b338518a1b3d1f1defaa2a7b
MD5 79c78a039aa388eff9868ad932c1d7fd
BLAKE2b-256 5e8250645d82cd869ec5923d542fbd74919ca2cca212718479c5c89d4c3723b2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page