Runtime schema enforcement for Python DataFrames
Project description
dfguard
Catch DataFrame schema mismatches at the function call, not deep in your pipeline.
Data pipelines fail late. A DataFrame with the wrong schema enters a function without complaint, the job runs, and the crash surfaces somewhere downstream with an error that tells you nothing about where the mismatch started.
dfguard moves that failure to the function call. The wrong DataFrame is rejected immediately with a precise error: which function, which argument, what schema was expected, what arrived.
Currently supports PySpark. pandas and polars support coming soon.
import dfguard.pyspark as dfg
from pyspark.sql import SparkSession, functions as F
from pyspark.sql import types as T
spark = SparkSession.builder.getOrCreate()
raw_df = spark.createDataFrame(
[(1, 10.0, 3), (2, 5.0, 7)],
"order_id LONG, amount DOUBLE, quantity INT",
)
# Declare the input contract upfront -- no live DataFrame needed
class RawSchema(dfg.SparkSchema):
order_id: T.LongType()
amount: T.DoubleType()
quantity: T.IntegerType()
@dfg.enforce
def enrich(df: RawSchema):
return df.withColumn("revenue", F.col("amount") * F.col("quantity"))
# Capture the output schema from the live result
EnrichedSchema = dfg.schema_of(enrich(raw_df))
@dfg.enforce
def flag_high_value(df: EnrichedSchema):
return df.withColumn("is_vip", F.col("revenue") > 1000)
flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
# expected: order_id:bigint, amount:double, quantity:int, revenue:double
# received: order_id:bigint, amount:double, quantity:int
No validation logic inside functions. The wrong DataFrame simply cannot enter the wrong function.
For package-wide enforcement without decorating each function, call dfg.arm() once from your package entry point.
Install
pip install dfguard[pyspark]
Requires Python >= 3.10, PySpark >= 3.3. No other dependencies.
Two ways to define a schema
Capture from a live DataFrame
RawSchema = dfg.schema_of(raw_df) # exact snapshot of this stage
EnrichedSchema = dfg.schema_of(enriched_df) # new type after adding columns
Exact matching: a DataFrame with extra columns does not satisfy RawSchema. Capture a new type at each stage boundary.
Declare upfront
from dfguard.pyspark import Optional
from pyspark.sql import types as T
class OrderSchema(dfg.SparkSchema):
order_id: T.LongType()
amount: T.DoubleType()
tags: T.ArrayType(T.StringType())
address: Optional[T.StringType()] # nullable field
class EnrichedSchema(OrderSchema): # inherits all parent fields
revenue: T.DoubleType()
No live DataFrame needed. Subclasses inherit parent fields. Works with nested structs, arrays, and maps.
Use the schema when creating a DataFrame:
df = spark.createDataFrame(rows, OrderSchema.to_struct())
Enforcement
Arm once, protect everything
# my_pipeline/__init__.py
import dfguard.pyspark as dfg
dfg.arm() # walks the package, wraps every annotated function
Functions with schema-annotated arguments are enforced automatically -- no decorator needed on each one:
# my_pipeline/transforms.py
def enrich(df: OrderSchema): # enforced automatically
return df.withColumn(...)
def aggregate(df: EnrichedSchema): # also enforced
return df.groupBy(...)
Per-function decoration
Use @dfg.enforce in scripts, notebooks, or when you want a function-level subset override:
@dfg.enforce # subset=True: extra columns fine (default)
def process(df: OrderSchema): ...
@dfg.enforce(subset=False) # exact match: no extra columns allowed
def write_final(df: OrderSchema): ...
The subset flag
subset=True (default): all declared columns must be present with the right types; extra columns are fine.
subset=False: declared columns must be present and no extras are allowed.
Set it globally via dfg.arm(subset=False). Override per function via @dfg.enforce(subset=True). Function level always wins. schema_of types always use exact matching regardless of subset.
Disabling enforcement
dfg.disarm() turns off all enforcement globally -- every guarded function passes through without checking, whether wrapped by dfg.arm() or decorated with @dfg.enforce. Useful in tests where you want to exercise transformation logic without schema-valid fixtures.
dfg.arm()
enrich(wrong_df) # raises
dfg.disarm()
enrich(wrong_df) # passes: enforcement is off
Validate at load time
Use assert_valid right after reading from storage to catch upstream schema drift before processing starts:
raw = spark.read.parquet("/data/orders/raw.parquet")
OrderSchema.assert_valid(raw) # raises SchemaValidationError if schema changed
enriched = enrich(raw) # @dfg.enforce then guards the function call
Reports all problems at once, not just the first:
SchemaValidationError: Schema validation failed:
✗ Column 'revenue': type mismatch: expected double, got string
✗ Missing column 'is_high_value' (expected boolean, nullable=False)
Schema history
dfg.dataset(df) records every schema-changing operation. Call .schema_history.print() to see the full evolution:
ds = dfg.dataset(raw_df)
ds = ds.withColumn("revenue", F.col("amount") * 1.1)
ds = ds.withColumn("discount", F.when(F.col("revenue") > 500, 50.0).otherwise(0.0))
ds = ds.drop("tags")
ds = ds.withColumnRenamed("customer", "customer_name")
ds.schema_history.print()
# ────────────────────────────────────────────────────────────
# Schema Evolution
# ────────────────────────────────────────────────────────────
# [ 0] input
# struct<order_id:bigint,customer:string,amount:double,...> (no schema change)
# [ 1] withColumn('revenue')
# added: revenue:double
# [ 2] withColumn('discount')
# added: discount:double
# [ 3] drop(['tags'])
# dropped: tags
# [ 4] withColumnRenamed('customer'→'customer_name')
# added: customer_name:string | dropped: customer
# ────────────────────────────────────────────────────────────
Pipeline integrations
dfguard fits naturally into pipeline frameworks. See the full docs for working examples with runnable code:
- Airflow:
dfg.arm()globally,assert_validafter loading from storage,@dfg.enforce(subset=False)on functions that write to fixed-schema sinks - Kedro:
dfg.arm()insettings.py, node functions need no decorators
Documentation
- Quickstart: nested structs, multi-stage pipelines, subset flag, schema history
- API reference:
arm,disarm,enforce,schema_of,SparkSchema,dataset - Airflow integration
- Kedro integration
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dfguard-0.1.0.tar.gz.
File metadata
- Download URL: dfguard-0.1.0.tar.gz
- Upload date:
- Size: 54.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
08d2536bb42845f60175e21ec423ee00e5e9ab4fc6d1449f452b14c1eccf6b0c
|
|
| MD5 |
9a74e7569790d64afd04c36ff13db35e
|
|
| BLAKE2b-256 |
3029bfbe15325a32203e8a52a837c6e39c07b9aaa813d8d954aa3b0832d65845
|
Provenance
The following attestation bundles were made for dfguard-0.1.0.tar.gz:
Publisher:
release.yml on nitrajen/dfguard
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dfguard-0.1.0.tar.gz -
Subject digest:
08d2536bb42845f60175e21ec423ee00e5e9ab4fc6d1449f452b14c1eccf6b0c - Sigstore transparency entry: 1277670575
- Sigstore integration time:
-
Permalink:
nitrajen/dfguard@0512ed525de72e2f58b2dd5c858e67449f7751e1 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/nitrajen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@0512ed525de72e2f58b2dd5c858e67449f7751e1 -
Trigger Event:
push
-
Statement type:
File details
Details for the file dfguard-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dfguard-0.1.0-py3-none-any.whl
- Upload date:
- Size: 34.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
37855bd5828cef7603c466531cd7ff59d8b77beda3ddf798bcefd13d10c7c6e1
|
|
| MD5 |
aa73559ae0ff32bfe4fbdddf1d7ceaac
|
|
| BLAKE2b-256 |
5a80b49f72b14a43d26fc900e240730334839d9f98be7c0454df3b8de1ba93b1
|
Provenance
The following attestation bundles were made for dfguard-0.1.0-py3-none-any.whl:
Publisher:
release.yml on nitrajen/dfguard
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dfguard-0.1.0-py3-none-any.whl -
Subject digest:
37855bd5828cef7603c466531cd7ff59d8b77beda3ddf798bcefd13d10c7c6e1 - Sigstore transparency entry: 1277670620
- Sigstore integration time:
-
Permalink:
nitrajen/dfguard@0512ed525de72e2f58b2dd5c858e67449f7751e1 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/nitrajen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@0512ed525de72e2f58b2dd5c858e67449f7751e1 -
Trigger Event:
push
-
Statement type: