Skip to main content

Convert between PySpark schemas and SQLAlchemy/SQLModel classes

Project description

Bulbasaur

Bidirectional Unified Library Bridge And Schema Adaptation Utility Runtime

Python Version License PyPI Version Code Style

Convert between PySpark schemas and SQLAlchemy/SQLModel classes with ease.

Bulbasaur provides simple, bidirectional conversion functions to transform schemas between PySpark and SQLAlchemy, as well as SQLModel (optional dependency), supporting all common types. Perfect for data engineering workflows that need to bridge distributed data processing with ORM capabilities.

Table of Contents

Features

  • Bidirectional Conversion: Convert schemas in both directions seamlessly
  • Comprehensive Type Support: Supports all common primitive types with precision preservation
  • SQLModel Support: Optional SQLModel integration for modern Python type hints
  • Type Safety: Clear error messages for unsupported types and invalid schemas
  • Simple API: Functional, stateless functions with minimal dependencies
  • Schema Validation: Automatic validation of schemas before conversion
  • Primary Key Detection: Smart primary key detection and auto-generation

Installation

Basic Installation

pip install bulbasaur

With SQLModel Support

For SQLModel integration (optional):

pip install bulbasaur[sqlmodel]

Development Installation

git clone https://github.com/eddiethedean/bulbasaur.git
cd bulbasaur
pip install -e ".[dev]"

Requirements

  • Python >= 3.8
  • pyspark >= 3.0.0
  • sqlalchemy >= 1.4.0
  • sqlmodel >= 0.0.8 (optional, for SQLModel support)

Quick Start

Get started with Bulbasaur in just a few lines of code. Here are the most common conversion patterns:

Converting PySpark Schema to SQLAlchemy Model

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from bulbasaur import to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

# Define a PySpark schema
pyspark_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("score", DoubleType(), True),
])

# Convert to SQLAlchemy model
Person = to_sqlalchemy_model(pyspark_schema, class_name="Person", base=Base)
print(Person.__tablename__)  # "person"
print(Person.name)  # Column definition

Converting SQLAlchemy Model to PySpark Schema

from sqlalchemy import Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_pyspark_schema

class Base(DeclarativeBase):
    pass

class Person(Base):
    __tablename__ = "person"
    
    name = Column(String, primary_key=True)
    age = Column(Integer)
    score = Column(Float)

# Convert to PySpark schema
pyspark_schema = to_pyspark_schema(Person)
print(pyspark_schema)
# StructType([StructField('name', StringType(), True),
#             StructField('age', IntegerType(), True),
#             StructField('score', DoubleType(), True)])

Converting PySpark Schema to SQLModel Class

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from bulbasaur import to_sqlmodel_class

# Define a PySpark schema
pyspark_schema = StructType([
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False),
    StructField("score", DoubleType(), True),
])

# Convert to SQLModel class
Person = to_sqlmodel_class(pyspark_schema, class_name="Person")
print(Person.__name__)  # "Person"
print(Person.__annotations__)  # Type annotations

Converting SQLModel Class to PySpark Schema

from sqlmodel import SQLModel
from bulbasaur import to_pyspark_schema

class Person(SQLModel):
    name: str
    age: int
    score: float | None = None

# Convert to PySpark schema
pyspark_schema = to_pyspark_schema(Person)
print(pyspark_schema)
# StructType([StructField('name', StringType(), False),
#             StructField('age', IntegerType(), False),
#             StructField('score', DoubleType(), True)])

Use Cases

Bulbasaur is perfect for:

  • Data Pipeline Integration: Convert PySpark schemas to SQLAlchemy models for database operations
  • Schema Synchronization: Keep schemas consistent between Spark jobs and database models
  • API Development: Generate SQLAlchemy models from PySpark DataFrames for REST APIs
  • Data Validation: Use SQLModel classes for validation while working with PySpark DataFrames
  • Migration Tools: Convert existing PySpark schemas to ORM models for legacy system migrations

Supported Types

Bulbasaur supports comprehensive type mappings between PySpark and SQLAlchemy/SQLModel. Precision and scale are preserved for decimal types, and nullability is maintained across conversions.

PySpark → SQLAlchemy

PySpark Type SQLAlchemy Type Notes
ByteType SmallInteger 8-bit integer
ShortType SmallInteger 16-bit integer
IntegerType Integer 32-bit integer
LongType BigInteger 64-bit integer
FloatType Float 32-bit floating point
DoubleType Float 64-bit floating point
BooleanType Boolean Boolean value
StringType String Variable-length string
DateType Date Date only
TimestampType DateTime Date and time with timezone
TimestampNTZType DateTime Date and time without timezone
DecimalType(p,s) Numeric(p,s) Precision and scale preserved
BinaryType LargeBinary Binary data
NullType String Fallback to String

SQLAlchemy → PySpark

SQLAlchemy Type PySpark Type Notes
SmallInteger ShortType 16-bit integer
Integer IntegerType 32-bit integer
BigInteger LongType 64-bit integer
Float DoubleType 64-bit floating point
Boolean BooleanType Boolean value
String StringType Variable-length string
Text StringType Long text as string
Date DateType Date only
DateTime TimestampType Date and time
Time TimestampType Time as timestamp
Numeric(p,s) DecimalType(p,s) Precision and scale preserved
LargeBinary BinaryType Binary data

Limitations

Unsupported Types

The following PySpark types are not directly supported in SQLAlchemy and will raise UnsupportedTypeError:

Type Reason Workaround
ArrayType SQLAlchemy doesn't have native array support Use JSON or String type
MapType SQLAlchemy doesn't have native map support Use JSON or String type
Nested StructType SQLAlchemy doesn't support nested structures Use JSON or String type

Type Conversions

Precision Preservation:

  • DecimalType(p, s)Numeric(p, s): Precision and scale are fully preserved
  • FloatDoubleType: Both represent 64-bit floating point numbers

Nullability Handling:

  • PySpark → SQLAlchemy: Nullability is preserved from StructField.nullable
  • SQLAlchemy → PySpark: Nullability is preserved from Column.nullable
  • SQLModel: Optional types (| None or Optional[T]) are converted to nullable fields

Input Validation

Bulbasaur performs comprehensive schema validation before conversion:

Validation Rule Error Type Description
Duplicate field names SchemaError Each field must have a unique name
Empty field names SchemaError Field names must be non-empty strings
Invalid field types (None) SchemaError All fields must have a valid data type
Invalid field name types SchemaError Field names must be strings
Empty schema SchemaError Schema must contain at least one field

Advanced Examples

Custom Base Class

from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_sqlalchemy_model

class CustomBase(DeclarativeBase):
    pass

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
])

Model = to_sqlalchemy_model(schema, class_name="MyModel", base=CustomBase)

Round-Trip Conversion

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_pyspark_schema, to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

# Start with PySpark schema
original = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# Convert to SQLAlchemy and back
model = to_sqlalchemy_model(original, base=Base)
converted_back = to_pyspark_schema(model)

# Verify types match
assert len(converted_back.fields) == len(original.fields)
assert converted_back.fields[0].name == original.fields[0].name

Error Handling

Bulbasaur provides clear error messages through custom exceptions:

from bulbasaur import ConversionError, UnsupportedTypeError, SchemaError

try:
    schema = to_sqlalchemy_model(invalid_schema)
except SchemaError as e:
    print(f"Invalid schema: {e}")
except UnsupportedTypeError as e:
    print(f"Unsupported type: {e}")
except ConversionError as e:
    print(f"Conversion error: {e}")

API Reference

to_sqlalchemy_model(pyspark_schema, class_name="GeneratedModel", base=None)

Convert a PySpark StructType to a SQLAlchemy model class.

Parameters:

Parameter Type Default Description
pyspark_schema pyspark.sql.types.StructType required PySpark schema to convert
class_name str "GeneratedModel" Name for the generated model class
base Type[DeclarativeBase] DeclarativeBase Base class for the model (optional)

Returns:

  • Type[DeclarativeBase]: SQLAlchemy model class with __tablename__ attribute

Raises:

  • SchemaError: If the schema structure is invalid (duplicate fields, empty names, etc.)
  • UnsupportedTypeError: If a type cannot be converted (ArrayType, MapType, nested StructType)

Example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

Person = to_sqlalchemy_model(schema, class_name="Person", base=Base)
# Person is now a SQLAlchemy model class

to_pyspark_schema(model)

Convert a SQLAlchemy model class, instance, or SQLModel class to a PySpark StructType.

Parameters:

Parameter Type Description
model Type or instance SQLAlchemy model class/instance or SQLModel class

Returns:

  • pyspark.sql.types.StructType: PySpark schema with all fields converted

Raises:

  • SchemaError: If the model structure is invalid (no __table__ attribute, etc.)
  • UnsupportedTypeError: If a type cannot be converted

Example:

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_pyspark_schema

class Base(DeclarativeBase):
    pass

class Person(Base):
    __tablename__ = "person"
    name = Column(String, primary_key=True)
    age = Column(Integer)

schema = to_pyspark_schema(Person)
# Returns StructType with name and age fields

to_sqlmodel_class(pyspark_schema, class_name="GeneratedModel")

Convert a PySpark StructType to a SQLModel class with type annotations.

Parameters:

Parameter Type Default Description
pyspark_schema pyspark.sql.types.StructType required PySpark schema to convert
class_name str "GeneratedModel" Name for the generated model class

Returns:

  • Type[SQLModel]: SQLModel class with type annotations and default values

Raises:

  • SchemaError: If the schema structure is invalid
  • UnsupportedTypeError: If a type cannot be converted
  • ImportError: If SQLModel is not installed

Example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_sqlmodel_class

schema = StructType([
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), True),
])

Person = to_sqlmodel_class(schema, class_name="Person")
# Person is now a SQLModel class with type annotations
person = Person(name="Alice", age=30)

Development

Setup

Clone the repository and install in development mode:

git clone https://github.com/eddiethedean/bulbasaur.git
cd bulbasaur
pip install -e ".[dev]"

Running Tests

Run the full test suite:

pytest

Run tests with coverage:

pytest --cov=bulbasaur --cov-report=html

Code Quality

Format code with Black:

black bulbasaur tests

Lint code with Ruff:

ruff check bulbasaur tests

Project Structure

bulbasaur/
├── bulbasaur/                # Main package
│   ├── __init__.py          # Public API exports
│   ├── converters.py         # Core conversion functions
│   ├── type_mappings.py      # Type mapping dictionaries
│   └── errors.py             # Custom exceptions
├── tests/                    # Test suite
│   ├── test_converters.py    # Conversion function tests
│   ├── test_type_mappings.py # Type mapping tests
│   ├── test_errors.py        # Error handling tests
│   └── test_comprehensive.py # Comprehensive integration tests
├── pyproject.toml            # Package configuration
├── README.md                 # This file
└── LICENSE                   # MIT License

License

MIT License - see LICENSE file for details.

Contributing

Contributions are welcome! We appreciate your help in making Bulbasaur better.

How to Contribute

  1. Fork the repository and create a new branch for your feature or bugfix
  2. Make your changes following the existing code style
  3. Add tests for new functionality or bug fixes
  4. Run the test suite to ensure everything passes
  5. Submit a Pull Request with a clear description of your changes

Development Guidelines

  • Follow the existing code style (Black formatting, 100 character line length)
  • Write tests for all new features and bug fixes
  • Update documentation as needed
  • Ensure all tests pass before submitting

Reporting Issues

If you find a bug or have a feature request, please open an issue on GitHub with:

  • A clear description of the problem or feature
  • Steps to reproduce (for bugs)
  • Expected vs actual behavior
  • Python version and dependency versions

Inspiration

This project is part of a family of schema conversion libraries:

  • 🦎 charmander - Convert between Polars and PySpark schemas
  • 🐢 poldantic - Convert between Pydantic models and Polars schemas
  • 🌱 bulbasaur - Convert between PySpark and SQLAlchemy/SQLModel schemas

About

Bulbasaur provides a bridge between PySpark's distributed data processing and SQLAlchemy's ORM capabilities, enabling seamless schema conversion for data engineering workflows. Whether you're building data pipelines, APIs, or migration tools, Bulbasaur makes it easy to work with schemas across different ecosystems.


Made with ❤️ by Odos Matthews

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bulbasaur-0.1.0.tar.gz (22.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bulbasaur-0.1.0-py3-none-any.whl (13.7 kB view details)

Uploaded Python 3

File details

Details for the file bulbasaur-0.1.0.tar.gz.

File metadata

  • Download URL: bulbasaur-0.1.0.tar.gz
  • Upload date:
  • Size: 22.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for bulbasaur-0.1.0.tar.gz
Algorithm Hash digest
SHA256 06a78a81847b4d39428fdacda94750dbcf94de8ba10d3666a06013aebb39d076
MD5 60ec9930e4237bfecc275930f4a0b87e
BLAKE2b-256 e925f7d2e5df3924dc8a3f6b6f2b77f886bcd84360a6d8829fdd8484f6b233e2

See more details on using hashes here.

File details

Details for the file bulbasaur-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: bulbasaur-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 13.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for bulbasaur-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7cb01712fb70d6f9ce8e8a32a891248860f4a80e963ca66126984a7945120514
MD5 025b76839f4ef3a85f917c14a7eb779f
BLAKE2b-256 bf50509d8bc959c6e207f9c443ad30fadafa195fc84fb3c00c43954fbccb85c1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page