Declarative relational test data as Spark DataFrames with referential integrity
Project description
rowsmyth
A blacksmith forges metal. A rowsmyth forges rows - mythical ones that exist only in your tests. rowsmyth is declarative relational test and seed data for Apache Spark: generate rows one at a time with real foreign-key integrity, then materialise ordinary DataFrames and temp views.
Install
uv add "rowsmyth[spark]"
# or
pip install "rowsmyth[spark]"
Requires Python 3.12+, PySpark 4.0+ and Java 17+ when running Spark locally.
The [spark] extra installs pyspark; omit it on Databricks or anywhere you
already have a compatible PySpark on the cluster (avoids version clashes):
uv add rowsmyth
# or
pip install rowsmyth
Java must be on your PATH or via JAVA_HOME when running Spark locally.
Quick start
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StringType, StructField, StructType
from rowsmyth import declarative_base, variant
spark = SparkSession.builder.master("local[*]").getOrCreate()
Base = declarative_base()
class Role(Base):
__table_name__ = "roles"
__primary_key__ = ("id",)
__definition__ = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), False),
])
def generator(self, ctx):
return {
"id": ctx.sequence(),
"name": ctx.random.choice(["admin", "user", "guest"]),
}
class User(Base):
__table_name__ = "users"
__primary_key__ = ("id",)
__definition__ = StructType([
StructField("id", LongType(), False),
StructField("role_id", LongType(), False),
StructField("email", StringType(), False),
])
def generator(self, ctx):
return {
"id": ctx.sequence(),
"role_id": ctx.pool("roles", "id").choice(),
"email": ctx.faker.unique.ascii_email(),
}
@variant
def inactive(self, ctx):
return {"email": "inactive@example.com"}
with Base.dataset(spark, seed=42) as dataset:
admin = Role.create(name="admin")
user = Role.create(name="user")
users = User.factory().count(10).variant("inactive").create()
role_ids = {admin.id, user.id}
assert all(created_user.role_id in role_ids for created_user in users)
users_df = dataset.dataframe("users")
# users_df is a DataFrame; temp view "users" is registered
Databricks Lakeflow
A Model subclass carries all the metadata your Lakeflow pipeline and Unity Catalog need - schema, comment, tags and data quality expectations - in one place.
Define a table
from pyspark.sql.types import LongType, StringType, StructField, StructType
from rowsmyth import declarative_base, variant
Base = declarative_base()
class Customer(Base):
__table_name__ = "customers"
__catalog__ = "main"
__schema__ = "commerce"
__comment__ = "One row per customer account"
__primary_key__ = ("id",)
__table_tags__ = {"layer": "silver", "pii": "true"}
__expectations__ = {
"id_not_null": "id IS NOT NULL",
"email_not_null": "email IS NOT NULL",
"valid_tier": "tier IN ('standard', 'premium')",
}
__definition__ = StructType([
StructField("id", LongType(), False),
StructField(
"email",
StringType(),
False,
metadata={
"comment": "Customer email, PII",
"tags": {"pii": "true", "classification": "restricted"},
},
),
StructField("tier", StringType(), False),
])
def generator(self, ctx):
return {
"id": ctx.sequence(),
"email": ctx.faker.unique.ascii_email(),
"tier": ctx.random.choices(["standard", "premium"], weights=[7, 3])[0],
}
@variant
def premium(self, ctx):
return {"tier": "premium"}
Lakeflow pipeline
Use the class attributes directly in your pipeline declaration:
from pyspark import pipelines as dp
from tables.customer import Customer
@dp.table(
name=Customer.__table_name__,
comment=Customer.__comment__,
schema=Customer.__definition__,
)
@dp.expect_all_or_fail(Customer.__expectations__)
def customers():
return spark.read.table("main.bronze.raw_customers")
Apply Unity Catalog metadata
After the pipeline materialises the table, apply tags from the same class:
for statement in Customer.uc_tag_sql():
spark.sql(statement)
uc_tag_sql() emits table comments, table tags, column comments and column tags.
Generate test fixtures
Write fixtures to the source your pipeline reads - either a Unity Catalog volume or a persistent bronze table:
from pyspark.sql import SparkSession
from tables.base import Base
from tables.customer import Customer
spark = SparkSession.builder.getOrCreate()
with Base.dataset(spark, seed=42) as dataset:
customers = Customer.factory().count(100).create()
customers_df = dataset.dataframe("customers")
# Option A - ingest volume (pipeline reads parquet from path)
customers_df.write.mode("overwrite").parquet(
"/Volumes/main/bronze/ingest/raw_customers/"
)
# Option B - persistent bronze table
customers_df.write.mode("overwrite").saveAsTable("main.bronze.raw_customers")
See docs/usage.md for the complete API reference.
Development
make install
make test # requires JAVA_HOME / java on PATH
make lint
make typecheck
make security
make pre-commit
make ci # local equivalent of CI checks
See CONTRIBUTING.md for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rowsmyth-1.1.0.tar.gz.
File metadata
- Download URL: rowsmyth-1.1.0.tar.gz
- Upload date:
- Size: 27.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f9ad26cb823997dab7168e95d7bd6ba72b09c0dab314e56f38efecbac9ba7f74
|
|
| MD5 |
e9bec1baffcb9ecaa94603ff39af463b
|
|
| BLAKE2b-256 |
1ff1dade6c6c7898b054f21d61cca4d298b2346c8b0932d2d6d46ac777e3bba8
|
Provenance
The following attestation bundles were made for rowsmyth-1.1.0.tar.gz:
Publisher:
cd.yml on LaurenceRawlings/rowsmyth
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rowsmyth-1.1.0.tar.gz -
Subject digest:
f9ad26cb823997dab7168e95d7bd6ba72b09c0dab314e56f38efecbac9ba7f74 - Sigstore transparency entry: 1718002753
- Sigstore integration time:
-
Permalink:
LaurenceRawlings/rowsmyth@91f56443fb48476069b10bb4e522f618c3cd5d23 -
Branch / Tag:
refs/tags/v1.1.0 - Owner: https://github.com/LaurenceRawlings
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
cd.yml@91f56443fb48476069b10bb4e522f618c3cd5d23 -
Trigger Event:
push
-
Statement type:
File details
Details for the file rowsmyth-1.1.0-py3-none-any.whl.
File metadata
- Download URL: rowsmyth-1.1.0-py3-none-any.whl
- Upload date:
- Size: 16.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
059294356f26581b809a9f465c92d3a7979564030a26096d7b1158baae2aafd3
|
|
| MD5 |
ae092782c4c334e6edebb61fa932a84e
|
|
| BLAKE2b-256 |
f7d4f3410777baa75e3f8a66bf3c042a8ab15faffd066fa387045e28a024a125
|
Provenance
The following attestation bundles were made for rowsmyth-1.1.0-py3-none-any.whl:
Publisher:
cd.yml on LaurenceRawlings/rowsmyth
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rowsmyth-1.1.0-py3-none-any.whl -
Subject digest:
059294356f26581b809a9f465c92d3a7979564030a26096d7b1158baae2aafd3 - Sigstore transparency entry: 1718002874
- Sigstore integration time:
-
Permalink:
LaurenceRawlings/rowsmyth@91f56443fb48476069b10bb4e522f618c3cd5d23 -
Branch / Tag:
refs/tags/v1.1.0 - Owner: https://github.com/LaurenceRawlings
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
cd.yml@91f56443fb48476069b10bb4e522f618c3cd5d23 -
Trigger Event:
push
-
Statement type: