Skip to main content

Mythical test data for SQLAlchemy models - no boilerplate required.

Project description

rowsmyth

Mythical test data for SQLAlchemy models - no boilerplate required.

A blacksmith forges metal. A rowsmyth forges rows - the mythical kind, purpose-built for tests. Pronounced row smith, rowsmyth is a thin orchestration layer on top of SQLAlchemy and factory-boy that eliminates the ceremony of wiring up test factories. Define generators once, co-located with your models. Generate hierarchical or flat datasets with a single call.

from rowsmyth import declarative_base, variant
import factory

Base = declarative_base()


class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String)
    tier: Mapped[str] = mapped_column(String)
    orders: Mapped[list["Order"]] = relationship(back_populates="user")

    @classmethod
    def generators(cls):
        return {
            cls.name: factory.Faker("name"),
            cls.tier: factory.fuzzy.FuzzyChoice(["standard", "premium"]),
        }

    @variant
    def admin(cls):
        return {cls.name: "admin", cls.tier: "premium"}


# Order and OrderItem are defined in the Complete Example section below

# 20 users, each with 1–5 orders, each order with 1–4 items
users = (
    User
    .factory(20)
    .has(Order.factory(1, 5).has(OrderItem.factory(1, 4)))
    .mix(admin=0.1)
    .random_seed(42)
    .create()
)

The Problem with factory-boy + SQLAlchemy

factory-boy is excellent at generating realistic values, but wiring it up to SQLAlchemy for multi-table test datasets requires a lot of scaffolding:

# The old way - three separate files just to generate related rows


class UserFactory(SQLAlchemyModelFactory):
    class Meta:
        model = User
        sqlalchemy_session = Session
        sqlalchemy_session_persistence = "commit"

    name = factory.Faker("name")


class OrderFactory(SQLAlchemyModelFactory):
    class Meta:
        model = Order
        sqlalchemy_session = Session
        sqlalchemy_session_persistence = "commit"

    total = factory.Faker("pyfloat", positive=True, max_value=500)
    user = factory.SubFactory(UserFactory)  # creates a NEW user every time


class OrderItemFactory(SQLAlchemyModelFactory):
    class Meta:
        model = OrderItem
        sqlalchemy_session = Session
        sqlalchemy_session_persistence = "commit"

    sku = factory.Faker("ean13")
    order = factory.SubFactory(OrderFactory)  # creates a NEW order every time


# Manual loops to avoid the SubFactory explosion
for _ in range(10):
    user = UserFactory()
    for _ in range(random.randint(1, 5)):
        order = OrderFactory(user=user)
        OrderItemFactory.create_batch(random.randint(1, 4), order=order)

Every project ends up maintaining a parallel hierarchy of factory classes, manual session management and hand-rolled loops to control which rows share foreign keys.

rowsmyth handles all of this automatically.


Installation

uv add rowsmyth

With PySpark schema support:

uv add "rowsmyth[spark]"

Requirements: Python ≥ 3.12, SQLAlchemy ≥ 2.0, factory-boy ≥ 3.3


Core Concepts

declarative_base()

A drop-in replacement for SQLAlchemy's declarative_base(). Returns a base class with rowsmyth capabilities mixed in - no changes to your model definitions required.

from rowsmyth import declarative_base

Base = declarative_base()

generators() - co-located factory declarations

Instead of a separate factory class, define value generators directly on the model:

class Product(Base):
    __tablename__ = "products"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String)
    price: Mapped[float]
    category: Mapped[str] = mapped_column(String)

    @classmethod
    def generators(cls):
        return {
            cls.name: factory.Faker("ecommerce_name"),
            cls.price: factory.Faker("pyfloat", positive=True, max_value=999),
            cls.category: factory.fuzzy.FuzzyChoice([
                "electronics",
                "clothing",
                "food",
            ]),
        }

Keys are column attributes (cls.name, cls.price) - not strings. Values are any factory-boy declaration.

@variant - named model variants

Variants override specific generators for a named sub-type of the model:

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String)
    role: Mapped[str] = mapped_column(String)
    is_active: Mapped[bool]

    @classmethod
    def generators(cls):
        return {
            cls.name: factory.Faker("name"),
            cls.role: "user",
            cls.is_active: True,
        }

    @variant
    def admin(cls):
        return {cls.role: "admin"}

    @variant
    def suspended(cls):
        return {cls.is_active: False}

Variants are applied on top of generators() - you only override what changes.


Generating Data

FactoryBuilder - hierarchical datasets

Use Model.factory(n) to build a FactoryBuilder. Call .create() to execute and get back a list of the root model instances.

# Exact count
users = User.factory(50).create()

# Random count in range
users = User.factory(40, 60).create()

.has() - parent → child relationships

Chain .has() to attach child builders. Foreign keys are resolved automatically from SQLAlchemy's relationship metadata:

users = User.factory(20).has(Order.factory(1, 5)).create()
# 20 users, each with 1–5 orders. Each order's user_id is set automatically.

Chains can be arbitrarily deep:

users = User.factory(20).has(Order.factory(1, 5).has(OrderItem.factory(1, 4))).create()

Multiple child types at the same level:

users = (
    User
    .factory(20)
    .has(
        Order.factory(1, 5),
        Address.factory(1, 3),
    )
    .create()
)

When a model has multiple relationships to the same parent, pass via= to disambiguate:

User.factory(10).has(Message.factory(5, via="sent_messages")).create()

.mix() - probabilistic variants

Distribute rows across variants using proportions. Proportions must sum to ≤ 1.0; the remainder uses generators() defaults:

# 10% admin, 5% suspended, 85% default
users = User.factory(100).mix(admin=0.1, suspended=0.05).create()

.where() - field overrides

Force specific values on every generated row. Overrides always win over variants and generators:

# All users are in the "enterprise" tier
users = User.factory(50).where({User.tier: "enterprise"}).create()

.random_seed() - reproducible output

Seed the RNG and Faker for deterministic datasets:

users = User.factory(100).mix(admin=0.2).random_seed(42).create()

Running the same code twice with the same seed produces identical rows.


Dataset - flat multi-table datasets

When tables reference each other laterally (not hierarchically), use Base.dataset(). It handles topological ordering and automatically samples foreign keys from already-created rows:

data = (
    Base
    .dataset(
        Customer.factory(20).mix(premium=0.3),
        Product.factory(50),
        Order.factory(200),
    )
    .random_seed(42)
    .create()
)

# Returns a dict keyed by tablename:
# {
#   "customers": [...],
#   "products":  [...],
#   "orders":    [...],   # each order's customer_id points to a real customer
# }

Dataset creates rows in dependency order, then randomly samples from the created pool when injecting foreign keys. All rows share one in-memory SQLite session.


Model Metadata

rowsmyth exposes standard SQLAlchemy comment and info fields as convenient classproperties. This is useful for data catalogues, data quality tooling and documentation generation.

class User(Base):
    __tablename__ = "users"
    __table_args__ = (
        CheckConstraint("tier IN ('standard', 'premium')", name="ck_users_tier"),
        {
            "comment": "Stores application users",
            "info": {"domain": "auth", "owner": "auth-team"},
        },
    )

    id: Mapped[int] = mapped_column(primary_key=True, info={"pii": False})
    name: Mapped[str] = mapped_column(
        String,
        comment="Display name",
        info={"pii": True, "owner": "auth-team"},
    )
    tier: Mapped[str] = mapped_column(String)
User.__comment__
# "Stores application users"

User.__table_info__
# {"domain": "auth", "owner": "auth-team"}

User.__column_info__
# {
#   "id":   {"pii": False},
#   "name": {"pii": True, "owner": "auth-team"},
#   "tier": {},
# }

User.__expectations__
# {"ck_users_tier": "tier IN ('standard', 'premium')"}

These are read directly from SQLAlchemy's Table.comment, Table.info, Column.comment, Column.info and CheckConstraint objects - no duplication required.


PySpark Schema Integration

When working with Spark pipelines, __spark_schema__ converts your SQLAlchemy model to a PySpark StructType automatically:

# uv add "rowsmyth[spark]"

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([], schema=User.__spark_schema__)

Supported type mappings: SmallIntegerShortType, IntegerIntegerType, BigIntegerLongType, String/Text/Unicode/UnicodeTextStringType, DoubleDoubleType, FloatFloatType, Numeric(p,s)DecimalType(p,s), BooleanBooleanType, DateTimeTimestampType, DateDateType, UuidStringType, LargeBinaryBinaryType.

Nullable columns are mapped to nullable Spark fields. Column-level info dict is merged into the Spark field metadata.


Complete Example

import uuid
import factory
import factory.fuzzy
from sqlalchemy import CheckConstraint, ForeignKey, String, Uuid
from sqlalchemy.orm import Mapped, mapped_column, relationship
from rowsmyth import declarative_base, variant

Base = declarative_base()


class User(Base):
    __tablename__ = "users"
    __table_args__ = (
        CheckConstraint("tier IN ('standard', 'premium')", name="ck_users_tier"),
        {"comment": "Stores application users", "info": {"domain": "auth"}},
    )

    id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
    name: Mapped[str] = mapped_column(
        String, comment="Display name", info={"pii": True}
    )
    tier: Mapped[str] = mapped_column(String)

    orders: Mapped[list["Order"]] = relationship(back_populates="user")

    @classmethod
    def generators(cls):
        return {
            cls.name: factory.Faker("name"),
            cls.tier: factory.fuzzy.FuzzyChoice(["standard", "premium"]),
        }

    @variant
    def admin(cls):
        return {cls.name: "admin", cls.tier: "premium"}


class Order(Base):
    __tablename__ = "orders"

    id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
    total: Mapped[float]
    user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey(User.id))

    user: Mapped["User"] = relationship(back_populates="orders")
    items: Mapped[list["OrderItem"]] = relationship(back_populates="order")

    @classmethod
    def generators(cls):
        return {
            cls.total: factory.Faker("pyfloat", positive=True, max_value=500),
        }


class OrderItem(Base):
    __tablename__ = "order_items"

    id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
    sku: Mapped[str] = mapped_column(String)
    order_id: Mapped[uuid.UUID] = mapped_column(ForeignKey(Order.id))

    order: Mapped["Order"] = relationship(back_populates="items")

    @classmethod
    def generators(cls):
        return {cls.sku: factory.Faker("ean13")}


# Hierarchical: 20 users → 1–5 orders each → 1–4 items each
users = (
    User
    .factory(20)
    .has(Order.factory(1, 5).has(OrderItem.factory(1, 4)))
    .mix(admin=0.1)
    .random_seed(42)
    .create()
)

# Flat: independent tables, FKs sampled from pool
data = (
    Base
    .dataset(
        User.factory(20).mix(admin=0.1),
        Order.factory(50, 100),
        OrderItem.factory(200, 400),
    )
    .random_seed(42)
    .create()
)

API Reference

declarative_base(metadata=None, type_annotation_map=None, registry=None)

Returns a SQLAlchemy declarative base with rowsmyth capabilities mixed in.

Argument Type Description
metadata MetaData | None Shared MetaData instance for all models.
type_annotation_map dict | None Extra Python type → SQLAlchemy type mappings for Mapped[] annotations.
registry registry | None Pre-existing mapper registry to share across multiple bases.

@variant

Decorator for methods in a model class body. The method receives cls and returns a dict in the same shape as generators(). Collected into cls.__variants__.

Model.factory(n) / Model.factory(min, max)

Returns a FactoryBuilder for the model. n generates exactly n rows; (min, max) generates a random count in that range per parent.

FactoryBuilder

Method Description
.has(*builders, via=None) Attach child builders. Use via="rel_name" to disambiguate multiple relationships to the same parent.
.mix(**proportions) Variant distribution. Values are proportions (0.0–1.0), must sum to ≤ 1.0.
.where(overrides) Force field values. {Model.column: value}.
.random_seed(value) Seed random and Faker for reproducibility.
.create() Execute. Returns list[RootModel]. Creates an in-memory SQLite DB automatically.

Base.dataset(*builders)

Returns a Dataset for flat multi-table generation.

Method Description
.random_seed(value) Seed random and Faker.
.create() Execute. Returns dict[str, list[Model]] keyed by __tablename__.

Model classproperties

Property Type Description
__comment__ str | None Table-level comment from __table_args__.
__table_info__ dict Table-level info dict from __table_args__.
__column_info__ dict[str, dict] Per-column info dicts, keyed by column name.
__expectations__ dict[str, str] CheckConstraint expressions, keyed by constraint name.
__spark_schema__ StructType PySpark schema. Requires rowsmyth[spark].

Real-World Use Case: Databricks Lakeflow Declarative Pipelines

When building data pipelines on Databricks with Lakeflow Declarative Pipelines (formerly DLT), your SQLAlchemy model becomes the single source of truth for the entire table lifecycle: schema, documentation, data quality rules, Unity Catalog tags and test fixtures.

Model definition

Define your model once with comments, info tags, check constraints and generators:

import factory
import factory.fuzzy
from sqlalchemy import CheckConstraint, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from rowsmyth import declarative_base, variant

Base = declarative_base()


class Customer(Base):
    __tablename__ = "customers"
    __table_args__ = (
        CheckConstraint("tier IN ('standard', 'premium')", name="ck_customers_tier"),
        CheckConstraint("email LIKE '%@%'", name="ck_customers_email"),
        {
            "comment": "Registered customers with purchase history",
            "info": {"domain": "commerce", "pii": "true", "team": "data-platform"},
        },
    )

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(
        String, comment="Primary contact email", info={"pii": "true"}
    )
    name: Mapped[str] = mapped_column(
        String, comment="Full legal name", info={"pii": "true"}
    )
    tier: Mapped[str] = mapped_column(
        String, comment="Subscription tier", info={"pii": "false"}
    )

    @classmethod
    def generators(cls):
        return {
            cls.email: factory.Faker("email"),
            cls.name: factory.Faker("name"),
            cls.tier: factory.fuzzy.FuzzyChoice(["standard", "premium"]),
        }

    @variant
    def premium(cls):
        return {cls.tier: "premium"}

Lakeflow pipeline definition

Reference model metadata directly in your pipeline decorators - no duplication:

from pyspark import pipelines as dp
from tables.customer import Customer


@dp.table(
    name=Customer.__tablename__,
    comment=Customer.__comment__,
    schema=Customer.__spark_schema__,
)
@dp.expect_all_or_fail(Customer.__expectations__)
def customers():
    return spark.read.table("bronze.raw_customers")

__spark_schema__ gives Lakeflow the authoritative column types and nullability. __expectations__ maps directly to @dp.expect_all_or_fail - your CheckConstraint definitions become pipeline data quality rules with no extra work.

Unity Catalog tags

Set table-level and column-level tags in Unity Catalog from the same info dicts:

catalog = "main"
schema = "commerce"
table = Customer.__tablename__

# Table tags
table_tag_pairs = ", ".join(
    f"'{k}' = '{v}'" for k, v in Customer.__table_info__.items()
)
spark.sql(f"""
    ALTER TABLE {catalog}.{schema}.{table}
    SET TAGS ({table_tag_pairs})
""")

# Column tags
for column, info in Customer.__column_info__.items():
    if not info:
        continue
    col_tag_pairs = ", ".join(f"'{k}' = '{v}'" for k, v in info.items())
    spark.sql(f"""
        ALTER TABLE {catalog}.{schema}.{table}
        ALTER COLUMN {column}
        SET TAGS ({col_tag_pairs})
    """)

Test fixtures for pipeline unit tests

Use rowsmyth to generate realistic fixture data and write it to the bronze source your pipeline reads from. The same __spark_schema__ ensures the fixture DataFrame matches the pipeline's expected schema exactly:

from pyspark.sql import SparkSession
from tables.customer import Customer

spark = SparkSession.builder.getOrCreate()


def make_customer_fixture(spark, n=100, seed=42):
    rows = Customer.factory(n).mix(premium=0.3).random_seed(seed).create()
    records = [
        {col: getattr(row, col) for col in Customer.__table__.columns.keys()}
        for row in rows
    ]
    return spark.createDataFrame(records, schema=Customer.__spark_schema__)


# Write fixture to the bronze source the pipeline reads
make_customer_fixture(spark).write.mode("overwrite").saveAsTable("bronze.raw_customers")

# Run the pipeline and assert on the output
# ...

Because Customer.__spark_schema__ is derived from the same model as the pipeline decorator, fixture data and pipeline schema can never drift apart.


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rowsmyth-0.1.0.tar.gz (23.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rowsmyth-0.1.0-py3-none-any.whl (17.9 kB view details)

Uploaded Python 3

File details

Details for the file rowsmyth-0.1.0.tar.gz.

File metadata

  • Download URL: rowsmyth-0.1.0.tar.gz
  • Upload date:
  • Size: 23.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for rowsmyth-0.1.0.tar.gz
Algorithm Hash digest
SHA256 0a5853a88c34cdd101e183fa27ce9c707cfd192b4308c2873918071c6aa09648
MD5 46d965403c6aed5285f976bf13f8cf94
BLAKE2b-256 0f2864a426f055fb63f9f6c0bdb864f90de64f1ef10087969ef3babf54a4461a

See more details on using hashes here.

Provenance

The following attestation bundles were made for rowsmyth-0.1.0.tar.gz:

Publisher: cd.yml on LaurenceRawlings/rowsmyth

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rowsmyth-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rowsmyth-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for rowsmyth-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9700194c366c48dec9186ebb78c3ada10ca8ab3cbc2f6aaeb00a32848959b62b
MD5 b428980aa24b5401e09d26c1c90270b8
BLAKE2b-256 ad0db8980dbd1064840bee55afaf5791ed14983039ae65226e6f713ab8101179

See more details on using hashes here.

Provenance

The following attestation bundles were made for rowsmyth-0.1.0-py3-none-any.whl:

Publisher: cd.yml on LaurenceRawlings/rowsmyth

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page