Skip to main content

Declarative relational test data as Spark DataFrames with referential integrity

Project description

rowsmyth

A blacksmith forges metal. A rowsmyth forges rows - mythical ones that exist only in your tests. rowsmyth is declarative relational test and seed data for Apache Spark: generate rows one at a time with real foreign-key integrity, then materialise ordinary DataFrames and temp views.

Install

uv add rowsmyth
# or
pip install rowsmyth

Requires Python 3.12+, PySpark 4.0+ and Java 17+. pyspark is installed as a runtime dependency; Java must be available on your PATH or via JAVA_HOME when running Spark locally.

Quick start

from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StringType, StructField, StructType

from rowsmyth import declarative_base, variant

spark = SparkSession.builder.master("local[*]").getOrCreate()
Base = declarative_base()


class Role(Base):
    __table_name__ = "roles"
    __primary_key__ = ("id",)
    __definition__ = StructType([
        StructField("id", LongType(), False),
        StructField("name", StringType(), False),
    ])

    def generator(self, ctx):
        return {
            "id": ctx.sequence(),
            "name": ctx.random.choice(["admin", "user", "guest"]),
        }


class User(Base):
    __table_name__ = "users"
    __primary_key__ = ("id",)
    __definition__ = StructType([
        StructField("id", LongType(), False),
        StructField("role_id", LongType(), False),
        StructField("email", StringType(), False),
    ])

    def generator(self, ctx):
        return {
            "id": ctx.sequence(),
            "role_id": ctx.pool("roles", "id").choice(),
            "email": ctx.faker.unique.ascii_email(),
        }

    @variant
    def inactive(self, ctx):
        return {"email": "inactive@example.com"}


with Base.dataset(spark, seed=42) as dataset:
    admin = Role.create(name="admin")
    user = Role.create(name="user")
    users = User.factory().count(10).variant("inactive").create()

    role_ids = {admin.id, user.id}
    assert all(created_user.role_id in role_ids for created_user in users)
    users_df = dataset.dataframe("users")
    # users_df is a DataFrame; temp view "users" is registered

Databricks Lakeflow

A Model subclass carries all the metadata your Lakeflow pipeline and Unity Catalog need - schema, comment, tags and data quality expectations - in one place.

Define a table

from pyspark.sql.types import LongType, StringType, StructField, StructType

from rowsmyth import declarative_base, variant

Base = declarative_base()


class Customer(Base):
    __table_name__ = "customers"
    __catalog__ = "main"
    __schema__ = "commerce"
    __comment__ = "One row per customer account"
    __primary_key__ = ("id",)
    __table_tags__ = {"layer": "silver", "pii": "true"}
    __expectations__ = {
        "id_not_null": "id IS NOT NULL",
        "email_not_null": "email IS NOT NULL",
        "valid_tier": "tier IN ('standard', 'premium')",
    }
    __definition__ = StructType([
        StructField("id", LongType(), False),
        StructField(
            "email",
            StringType(),
            False,
            metadata={
                "comment": "Customer email, PII",
                "tags": {"pii": "true", "classification": "restricted"},
            },
        ),
        StructField("tier", StringType(), False),
    ])

    def generator(self, ctx):
        return {
            "id": ctx.sequence(),
            "email": ctx.faker.unique.ascii_email(),
            "tier": ctx.random.choices(["standard", "premium"], weights=[7, 3])[0],
        }

    @variant
    def premium(self, ctx):
        return {"tier": "premium"}

Lakeflow pipeline

Use the class attributes directly in your pipeline declaration:

from pyspark import pipelines as dp

from tables.customer import Customer


@dp.table(
    name=Customer.__table_name__,
    comment=Customer.__comment__,
    schema=Customer.__definition__,
)
@dp.expect_all_or_fail(Customer.__expectations__)
def customers():
    return spark.read.table("main.bronze.raw_customers")

Apply Unity Catalog metadata

After the pipeline materialises the table, apply tags from the same class:

for statement in Customer.uc_tag_sql():
    spark.sql(statement)

uc_tag_sql() emits table comments, table tags, column comments and column tags.

Generate test fixtures

Write fixtures to the source your pipeline reads - either a Unity Catalog volume or a persistent bronze table:

from pyspark.sql import SparkSession

from tables.base import Base
from tables.customer import Customer

spark = SparkSession.builder.getOrCreate()

with Base.dataset(spark, seed=42) as dataset:
    customers = Customer.factory().count(100).create()
    customers_df = dataset.dataframe("customers")

# Option A - ingest volume (pipeline reads parquet from path)
customers_df.write.mode("overwrite").parquet(
    "/Volumes/main/bronze/ingest/raw_customers/"
)

# Option B - persistent bronze table
customers_df.write.mode("overwrite").saveAsTable("main.bronze.raw_customers")

See docs/usage.md for the complete API reference.

Development

make install
make test        # requires JAVA_HOME / java on PATH
make lint
make typecheck
make security
make pre-commit
make ci          # local equivalent of CI checks

See CONTRIBUTING.md for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rowsmyth-1.0.1.tar.gz (26.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rowsmyth-1.0.1-py3-none-any.whl (15.9 kB view details)

Uploaded Python 3

File details

Details for the file rowsmyth-1.0.1.tar.gz.

File metadata

  • Download URL: rowsmyth-1.0.1.tar.gz
  • Upload date:
  • Size: 26.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for rowsmyth-1.0.1.tar.gz
Algorithm Hash digest
SHA256 b4f3f6eb0904a85c908b090a131c3a449db140987d815b285356b285165bd456
MD5 3d7591c33c96aba245f0d9884599f0da
BLAKE2b-256 ba0b0459dccc55aac6e423ab1bf8ce8130c70c89430ff5adeaa45e7d45c38863

See more details on using hashes here.

Provenance

The following attestation bundles were made for rowsmyth-1.0.1.tar.gz:

Publisher: cd.yml on LaurenceRawlings/rowsmyth

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rowsmyth-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: rowsmyth-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 15.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for rowsmyth-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7d76ba3f5fca6f879ba07dd138feeba9ced37879c606fb39a68782471943f5e5
MD5 cf71d2f95ad2ec9c5076896eda1de9f6
BLAKE2b-256 84beab5fe204b92934da23fb52e5339836d486e946c5c5fd5076b3795c178e3b

See more details on using hashes here.

Provenance

The following attestation bundles were made for rowsmyth-1.0.1-py3-none-any.whl:

Publisher: cd.yml on LaurenceRawlings/rowsmyth

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page