Convert between PySpark schemas and SQLAlchemy/SQLModel classes
Project description
Bulbasaur
Bidirectional Unified Library Bridge And Schema Adaptation Utility Runtime
Convert between PySpark schemas and SQLAlchemy/SQLModel classes with ease.
Bulbasaur provides simple, bidirectional conversion functions to transform schemas between PySpark and SQLAlchemy, as well as SQLModel (optional dependency), supporting all common types. Perfect for data engineering workflows that need to bridge distributed data processing with ORM capabilities.
Table of Contents
- Features
- Installation
- Quick Start
- Supported Types
- Use Cases
- Advanced Examples
- Limitations
- Error Handling
- API Reference
- Development
- Contributing
- License
Features
- ✅ Bidirectional Conversion: Convert schemas in both directions seamlessly
- ✅ Comprehensive Type Support: Supports all common primitive types with precision preservation
- ✅ SQLModel Support: Optional SQLModel integration for modern Python type hints
- ✅ Type Safety: Clear error messages for unsupported types and invalid schemas
- ✅ Simple API: Functional, stateless functions with minimal dependencies
- ✅ Schema Validation: Automatic validation of schemas before conversion
- ✅ Primary Key Detection: Smart primary key detection and auto-generation
Installation
Basic Installation
pip install bulbasaur
With SQLModel Support
For SQLModel integration (optional):
pip install bulbasaur[sqlmodel]
Development Installation
git clone https://github.com/eddiethedean/bulbasaur.git
cd bulbasaur
pip install -e ".[dev]"
Requirements
- Python >= 3.8
- pyspark >= 3.0.0
- sqlalchemy >= 1.4.0
- sqlmodel >= 0.0.8 (optional, for SQLModel support)
Quick Start
Get started with Bulbasaur in just a few lines of code. Here are the most common conversion patterns:
Converting PySpark Schema to SQLAlchemy Model
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from bulbasaur import to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
# Define a PySpark schema
pyspark_schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("score", DoubleType(), True),
])
# Convert to SQLAlchemy model
Person = to_sqlalchemy_model(pyspark_schema, class_name="Person", base=Base)
print(Person.__tablename__) # "person"
print(Person.name) # Column definition
Converting SQLAlchemy Model to PySpark Schema
from sqlalchemy import Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_pyspark_schema
class Base(DeclarativeBase):
pass
class Person(Base):
__tablename__ = "person"
name = Column(String, primary_key=True)
age = Column(Integer)
score = Column(Float)
# Convert to PySpark schema
pyspark_schema = to_pyspark_schema(Person)
print(pyspark_schema)
# StructType([StructField('name', StringType(), True),
# StructField('age', IntegerType(), True),
# StructField('score', DoubleType(), True)])
Converting PySpark Schema to SQLModel Class
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from bulbasaur import to_sqlmodel_class
# Define a PySpark schema
pyspark_schema = StructType([
StructField("name", StringType(), False),
StructField("age", IntegerType(), False),
StructField("score", DoubleType(), True),
])
# Convert to SQLModel class
Person = to_sqlmodel_class(pyspark_schema, class_name="Person")
print(Person.__name__) # "Person"
print(Person.__annotations__) # Type annotations
Converting SQLModel Class to PySpark Schema
from sqlmodel import SQLModel
from bulbasaur import to_pyspark_schema
class Person(SQLModel):
name: str
age: int
score: float | None = None
# Convert to PySpark schema
pyspark_schema = to_pyspark_schema(Person)
print(pyspark_schema)
# StructType([StructField('name', StringType(), False),
# StructField('age', IntegerType(), False),
# StructField('score', DoubleType(), True)])
Use Cases
Bulbasaur is perfect for:
- Data Pipeline Integration: Convert PySpark schemas to SQLAlchemy models for database operations
- Schema Synchronization: Keep schemas consistent between Spark jobs and database models
- API Development: Generate SQLAlchemy models from PySpark DataFrames for REST APIs
- Data Validation: Use SQLModel classes for validation while working with PySpark DataFrames
- Migration Tools: Convert existing PySpark schemas to ORM models for legacy system migrations
Supported Types
Bulbasaur supports comprehensive type mappings between PySpark and SQLAlchemy/SQLModel. Precision and scale are preserved for decimal types, and nullability is maintained across conversions.
PySpark → SQLAlchemy
| PySpark Type | SQLAlchemy Type | Notes |
|---|---|---|
ByteType |
SmallInteger |
8-bit integer |
ShortType |
SmallInteger |
16-bit integer |
IntegerType |
Integer |
32-bit integer |
LongType |
BigInteger |
64-bit integer |
FloatType |
Float |
32-bit floating point |
DoubleType |
Float |
64-bit floating point |
BooleanType |
Boolean |
Boolean value |
StringType |
String |
Variable-length string |
DateType |
Date |
Date only |
TimestampType |
DateTime |
Date and time with timezone |
TimestampNTZType |
DateTime |
Date and time without timezone |
DecimalType(p,s) |
Numeric(p,s) |
Precision and scale preserved |
BinaryType |
LargeBinary |
Binary data |
NullType |
String |
Fallback to String |
SQLAlchemy → PySpark
| SQLAlchemy Type | PySpark Type | Notes |
|---|---|---|
SmallInteger |
ShortType |
16-bit integer |
Integer |
IntegerType |
32-bit integer |
BigInteger |
LongType |
64-bit integer |
Float |
DoubleType |
64-bit floating point |
Boolean |
BooleanType |
Boolean value |
String |
StringType |
Variable-length string |
Text |
StringType |
Long text as string |
Date |
DateType |
Date only |
DateTime |
TimestampType |
Date and time |
Time |
TimestampType |
Time as timestamp |
Numeric(p,s) |
DecimalType(p,s) |
Precision and scale preserved |
LargeBinary |
BinaryType |
Binary data |
Limitations
Unsupported Types
The following PySpark types are not directly supported in SQLAlchemy and will raise UnsupportedTypeError:
| Type | Reason | Workaround |
|---|---|---|
ArrayType |
SQLAlchemy doesn't have native array support | Use JSON or String type |
MapType |
SQLAlchemy doesn't have native map support | Use JSON or String type |
Nested StructType |
SQLAlchemy doesn't support nested structures | Use JSON or String type |
Type Conversions
Precision Preservation:
- ✅
DecimalType(p, s)↔Numeric(p, s): Precision and scale are fully preserved - ✅
Float↔DoubleType: Both represent 64-bit floating point numbers
Nullability Handling:
- ✅ PySpark → SQLAlchemy: Nullability is preserved from
StructField.nullable - ✅ SQLAlchemy → PySpark: Nullability is preserved from
Column.nullable - ✅ SQLModel: Optional types (
| NoneorOptional[T]) are converted to nullable fields
Input Validation
Bulbasaur performs comprehensive schema validation before conversion:
| Validation Rule | Error Type | Description |
|---|---|---|
| Duplicate field names | SchemaError |
Each field must have a unique name |
| Empty field names | SchemaError |
Field names must be non-empty strings |
Invalid field types (None) |
SchemaError |
All fields must have a valid data type |
| Invalid field name types | SchemaError |
Field names must be strings |
| Empty schema | SchemaError |
Schema must contain at least one field |
Advanced Examples
Custom Base Class
from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_sqlalchemy_model
class CustomBase(DeclarativeBase):
pass
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
])
Model = to_sqlalchemy_model(schema, class_name="MyModel", base=CustomBase)
Round-Trip Conversion
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_pyspark_schema, to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
# Start with PySpark schema
original = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# Convert to SQLAlchemy and back
model = to_sqlalchemy_model(original, base=Base)
converted_back = to_pyspark_schema(model)
# Verify types match
assert len(converted_back.fields) == len(original.fields)
assert converted_back.fields[0].name == original.fields[0].name
Error Handling
Bulbasaur provides clear error messages through custom exceptions:
from bulbasaur import ConversionError, UnsupportedTypeError, SchemaError
try:
schema = to_sqlalchemy_model(invalid_schema)
except SchemaError as e:
print(f"Invalid schema: {e}")
except UnsupportedTypeError as e:
print(f"Unsupported type: {e}")
except ConversionError as e:
print(f"Conversion error: {e}")
API Reference
to_sqlalchemy_model(pyspark_schema, class_name="GeneratedModel", base=None)
Convert a PySpark StructType to a SQLAlchemy model class.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
pyspark_schema |
pyspark.sql.types.StructType |
required | PySpark schema to convert |
class_name |
str |
"GeneratedModel" |
Name for the generated model class |
base |
Type[DeclarativeBase] |
DeclarativeBase |
Base class for the model (optional) |
Returns:
Type[DeclarativeBase]: SQLAlchemy model class with__tablename__attribute
Raises:
SchemaError: If the schema structure is invalid (duplicate fields, empty names, etc.)UnsupportedTypeError: If a type cannot be converted (ArrayType, MapType, nested StructType)
Example:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
Person = to_sqlalchemy_model(schema, class_name="Person", base=Base)
# Person is now a SQLAlchemy model class
to_pyspark_schema(model)
Convert a SQLAlchemy model class, instance, or SQLModel class to a PySpark StructType.
Parameters:
| Parameter | Type | Description |
|---|---|---|
model |
Type or instance |
SQLAlchemy model class/instance or SQLModel class |
Returns:
pyspark.sql.types.StructType: PySpark schema with all fields converted
Raises:
SchemaError: If the model structure is invalid (no__table__attribute, etc.)UnsupportedTypeError: If a type cannot be converted
Example:
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_pyspark_schema
class Base(DeclarativeBase):
pass
class Person(Base):
__tablename__ = "person"
name = Column(String, primary_key=True)
age = Column(Integer)
schema = to_pyspark_schema(Person)
# Returns StructType with name and age fields
to_sqlmodel_class(pyspark_schema, class_name="GeneratedModel")
Convert a PySpark StructType to a SQLModel class with type annotations.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
pyspark_schema |
pyspark.sql.types.StructType |
required | PySpark schema to convert |
class_name |
str |
"GeneratedModel" |
Name for the generated model class |
Returns:
Type[SQLModel]: SQLModel class with type annotations and default values
Raises:
SchemaError: If the schema structure is invalidUnsupportedTypeError: If a type cannot be convertedImportError: If SQLModel is not installed
Example:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_sqlmodel_class
schema = StructType([
StructField("name", StringType(), False),
StructField("age", IntegerType(), True),
])
Person = to_sqlmodel_class(schema, class_name="Person")
# Person is now a SQLModel class with type annotations
person = Person(name="Alice", age=30)
Development
Setup
Clone the repository and install in development mode:
git clone https://github.com/eddiethedean/bulbasaur.git
cd bulbasaur
pip install -e ".[dev]"
Running Tests
Run the full test suite:
pytest
Run tests with coverage:
pytest --cov=bulbasaur --cov-report=html
Code Quality
Format code with Black:
black bulbasaur tests
Lint code with Ruff:
ruff check bulbasaur tests
Project Structure
bulbasaur/
├── bulbasaur/ # Main package
│ ├── __init__.py # Public API exports
│ ├── converters.py # Core conversion functions
│ ├── type_mappings.py # Type mapping dictionaries
│ └── errors.py # Custom exceptions
├── tests/ # Test suite
│ ├── test_converters.py # Conversion function tests
│ ├── test_type_mappings.py # Type mapping tests
│ ├── test_errors.py # Error handling tests
│ └── test_comprehensive.py # Comprehensive integration tests
├── pyproject.toml # Package configuration
├── README.md # This file
└── LICENSE # MIT License
License
MIT License - see LICENSE file for details.
Contributing
Contributions are welcome! We appreciate your help in making Bulbasaur better.
How to Contribute
- Fork the repository and create a new branch for your feature or bugfix
- Make your changes following the existing code style
- Add tests for new functionality or bug fixes
- Run the test suite to ensure everything passes
- Submit a Pull Request with a clear description of your changes
Development Guidelines
- Follow the existing code style (Black formatting, 100 character line length)
- Write tests for all new features and bug fixes
- Update documentation as needed
- Ensure all tests pass before submitting
Reporting Issues
If you find a bug or have a feature request, please open an issue on GitHub with:
- A clear description of the problem or feature
- Steps to reproduce (for bugs)
- Expected vs actual behavior
- Python version and dependency versions
Inspiration
This project is part of a family of schema conversion libraries:
- 🦎 charmander - Convert between Polars and PySpark schemas
- 🐢 poldantic - Convert between Pydantic models and Polars schemas
- 🌱 bulbasaur - Convert between PySpark and SQLAlchemy/SQLModel schemas
About
Bulbasaur provides a bridge between PySpark's distributed data processing and SQLAlchemy's ORM capabilities, enabling seamless schema conversion for data engineering workflows. Whether you're building data pipelines, APIs, or migration tools, Bulbasaur makes it easy to work with schemas across different ecosystems.
Made with ❤️ by Odos Matthews
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bulbasaur-0.1.0.tar.gz.
File metadata
- Download URL: bulbasaur-0.1.0.tar.gz
- Upload date:
- Size: 22.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06a78a81847b4d39428fdacda94750dbcf94de8ba10d3666a06013aebb39d076
|
|
| MD5 |
60ec9930e4237bfecc275930f4a0b87e
|
|
| BLAKE2b-256 |
e925f7d2e5df3924dc8a3f6b6f2b77f886bcd84360a6d8829fdd8484f6b233e2
|
File details
Details for the file bulbasaur-0.1.0-py3-none-any.whl.
File metadata
- Download URL: bulbasaur-0.1.0-py3-none-any.whl
- Upload date:
- Size: 13.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7cb01712fb70d6f9ce8e8a32a891248860f4a80e963ca66126984a7945120514
|
|
| MD5 |
025b76839f4ef3a85f917c14a7eb779f
|
|
| BLAKE2b-256 |
bf50509d8bc959c6e207f9c443ad30fadafa195fc84fb3c00c43954fbccb85c1
|