Skip to main content

Runtime schema enforcement for Python DataFrames

Project description

dfguard

Catch DataFrame schema mismatches at the function call, not deep in your pipeline.

PyPI Python License CI Coverage Docs

Documentation | Quickstart | API Reference


Data pipelines fail late. A DataFrame with the wrong schema enters a function without complaint, the job runs, and the crash surfaces somewhere downstream with an error that tells you nothing about where the mismatch started.

dfguard moves that failure to the function call. The wrong DataFrame is rejected immediately with a precise error: which function, which argument, what schema was expected, what arrived. Lightweight: enforcement is pure metadata inspection — dfguard reads the schema struct from your DataFrame, no data is scanned, no Spark jobs triggered. Unlike pandera, which introduces its own type system, dfguard uses the types your library already ships with: T.LongType() for PySpark, pl.Int64 for Polars, np.dtype("int64") for pandas.

Compatibility

Backend Version Python
PySpark >= 3.3 >= 3.10
pandas >= 1.5 >= 3.10
Polars >= 0.20 >= 3.10

Install

pip install 'dfguard[pyspark]'            # PySpark
pip install 'dfguard[pandas]' pyarrow    # pandas (pyarrow recommended for nested types)
pip install 'dfguard[polars]'            # Polars
pip install 'dfguard[all]'               # all backends

Requires Python >= 3.10. No other mandatory dependencies.


PySpark

import dfguard.pyspark as dfg
from pyspark.sql import SparkSession, functions as F, types as T

spark = SparkSession.builder.getOrCreate()
raw_df = spark.createDataFrame(
    [(1, 10.0, 3), (2, 5.0, 7)],
    "order_id LONG, amount DOUBLE, quantity INT",
)

class RawSchema(dfg.SparkSchema):
    order_id = T.LongType()
    amount   = T.DoubleType()
    quantity = T.IntegerType()

@dfg.enforce
def enrich(df: RawSchema):
    return df.withColumn("revenue", F.col("amount") * F.col("quantity"))

EnrichedSchema = dfg.schema_of(enrich(raw_df))

@dfg.enforce
def flag_high_value(df: EnrichedSchema):
    return df.withColumn("is_vip", F.col("revenue") > 1000)

flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
#   expected: order_id:bigint, amount:double, quantity:int, revenue:double
#   received: order_id:bigint, amount:double, quantity:int

pandas

import numpy as np
import pandas as pd
import dfguard.pandas as dfg

raw_df = pd.DataFrame({
    "order_id": pd.array([1, 2, 3], dtype="int64"),
    "amount":   pd.array([10.0, 5.0, 8.5], dtype="float64"),
    "quantity": pd.array([3, 1, 2], dtype="int64"),
})

class RawSchema(dfg.PandasSchema):
    order_id = np.dtype("int64")
    amount   = np.dtype("float64")
    quantity = np.dtype("int64")

@dfg.enforce
def enrich(df: RawSchema):
    return df.assign(revenue=df["amount"] * df["quantity"])

EnrichedSchema = dfg.schema_of(enrich(raw_df))

@dfg.enforce
def flag_high_value(df: EnrichedSchema):
    return df.assign(is_vip=df["revenue"] > 1000)

flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
#   expected: order_id:int64, amount:float64, quantity:int64, revenue:float64
#   received: order_id:int64, amount:float64, quantity:int64

Polars

import polars as pl
import dfguard.polars as dfg

raw_df = pl.DataFrame({
    "order_id": pl.Series([1, 2, 3], dtype=pl.Int64),
    "amount":   pl.Series([10.0, 5.0, 8.5], dtype=pl.Float64),
    "quantity": pl.Series([3, 1, 2], dtype=pl.Int32),
})

class RawSchema(dfg.PolarsSchema):
    order_id = pl.Int64
    amount   = pl.Float64
    quantity = pl.Int32

@dfg.enforce
def enrich(df: RawSchema) -> pl.DataFrame:
    return df.with_columns(revenue=pl.col("amount") * pl.col("quantity"))

EnrichedSchema = dfg.schema_of(enrich(raw_df))

@dfg.enforce
def flag_high_value(df: EnrichedSchema) -> pl.DataFrame:
    return df.with_columns(is_vip=pl.col("revenue") > 1000)

flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
#   expected: order_id:Int64, amount:Float64, quantity:Int32, revenue:Float64
#   received: order_id:Int64, amount:Float64, quantity:Int32

No validation logic inside functions. The wrong DataFrame simply cannot enter the wrong function.

For package-wide enforcement without decorating each function, call dfg.arm() once from your package entry point.


Two ways to define a schema

Capture from a live DataFrame

RawSchema      = dfg.schema_of(raw_df)       # exact snapshot of this stage
EnrichedSchema = dfg.schema_of(enriched_df)  # new type after adding columns

Exact matching: a DataFrame with extra columns does not satisfy RawSchema. Capture a new type at each stage boundary.

Declare upfront

No live DataFrame needed. Subclasses inherit parent fields. All three backends support nested types fully.

PySpark — arrays, structs, maps via T.ArrayType / T.StructType / T.MapType:

from dfguard.pyspark import Optional
from pyspark.sql import types as T

class OrderSchema(dfg.SparkSchema):
    order_id   = T.LongType()
    amount     = T.DoubleType()
    line_items = T.ArrayType(T.StructType([          # array of structs
        T.StructField("sku",      T.StringType()),
        T.StructField("quantity", T.IntegerType()),
        T.StructField("price",    T.DoubleType()),
    ]))
    zip_code   = Optional[T.StringType()]            # nullable field

class EnrichedSchema(OrderSchema):                   # inherits all parent fields
    revenue = T.DoubleType()

df = spark.createDataFrame(rows, OrderSchema.to_struct())

Polarspl.List, pl.Struct, pl.Array are native first-class types:

from dfguard.polars import Optional

class OrderSchema(dfg.PolarsSchema):
    order_id   = pl.Int64
    amount     = pl.Float64
    line_items = pl.List(pl.Struct({                 # list of structs
        "sku":      pl.String,
        "quantity": pl.Int32,
        "price":    pl.Float64,
    }))
    zip_code   = Optional[pl.String]                 # nullable field

pandas — use pd.ArrowDtype (requires pyarrow) for nested types:

import pyarrow as pa
from dfguard.pandas import Optional

class OrderSchema(dfg.PandasSchema):
    order_id   = np.dtype("int64")
    amount     = np.dtype("float64")
    line_items = pd.ArrowDtype(pa.list_(pa.struct([  # nested via PyArrow
        pa.field("sku",      pa.string()),
        pa.field("quantity", pa.int32()),
        pa.field("price",    pa.float64()),
    ])))
    zip_code   = Optional[pd.StringDtype()]          # nullable field

pandas + PyArrow: pd.ArrowDtype gives pandas the same nested-type enforcement as PySpark and Polars — arrays, structs, and maps at arbitrary depth. Without PyArrow, pandas dtype enforcement is limited to flat scalar types (np.dtype, pd.StringDtype, etc.). Install with pip install 'dfguard[pandas]' pyarrow.


Enforcement

Arm once, protect everything

# my_pipeline/__init__.py
import dfguard.pyspark as dfg

dfg.arm()   # walks the package, wraps every annotated function

Functions with schema-annotated arguments are enforced automatically, no decorator needed on each one:

# my_pipeline/transforms.py
def enrich(df: OrderSchema):       # enforced automatically
    return df.withColumn(...)

def aggregate(df: EnrichedSchema): # also enforced
    return df.groupBy(...)

Per-function decoration

Use @dfg.enforce in scripts and notebooks, or when you want a function-level subset override:

@dfg.enforce                   # subset=True: extra columns fine (default)
def process(df: OrderSchema): ...

@dfg.enforce(subset=False)     # exact match: no extra columns allowed
def write_final(df: OrderSchema): ...

The subset flag

subset=True (default): all declared columns must be present with the right types; extra columns are fine. subset=False: declared columns must be present and no extras are allowed.

Set it globally via dfg.arm(subset=False). Override per function via @dfg.enforce(subset=True). Function level always wins. schema_of types always use exact matching regardless of subset.

Disabling enforcement

dfg.disarm() turns off all enforcement globally, whether wrapped by dfg.arm() or decorated with @dfg.enforce. Useful in tests.

dfg.arm()
enrich(wrong_df)   # raises

dfg.disarm()
enrich(wrong_df)   # passes: enforcement is off

Validate at load time

Use assert_valid right after reading from storage to catch upstream schema drift before processing starts:

raw = spark.read.parquet("/data/orders/raw.parquet")
OrderSchema.assert_valid(raw)   # raises SchemaValidationError if schema changed

enriched = enrich(raw)          # @dfg.enforce then guards the function call

Reports all problems at once, not just the first:

SchemaValidationError: Schema validation failed:
  ✗ Column 'revenue': type mismatch: expected double, got string
  ✗ Missing column 'is_high_value' (expected boolean, nullable=False)

Schema history

dfg.dataset(df) records every schema-changing operation. Call .schema_history.print() to see the full evolution:

ds = dfg.dataset(raw_df)
ds = ds.withColumn("revenue",  F.col("amount") * 1.1)
ds = ds.withColumn("discount", F.when(F.col("revenue") > 500, 50.0).otherwise(0.0))
ds = ds.drop("tags")
ds = ds.withColumnRenamed("customer", "customer_name")

ds.schema_history.print()
# ────────────────────────────────────────────────────────────
# Schema Evolution
# ────────────────────────────────────────────────────────────
#   [ 0] input
#        struct<order_id:bigint,customer:string,amount:double,...>  (no schema change)
#   [ 1] withColumn('revenue')
#        added: revenue:double
#   [ 2] withColumn('discount')
#        added: discount:double
#   [ 3] drop(['tags'])
#        dropped: tags
#   [ 4] withColumnRenamed('customer'→'customer_name')
#        added: customer_name:string | dropped: customer
# ────────────────────────────────────────────────────────────

Pipeline integrations

dfguard fits naturally into pipeline frameworks. See the full docs for working examples with runnable code:

  • Airflow: dfg.arm() globally, assert_valid after loading from storage, @dfg.enforce(subset=False) on functions that write to fixed-schema sinks
  • Kedro: dfg.arm() in settings.py, node functions need no decorators

Documentation

nitrajen.github.io/dfguard


License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dfguard-0.2.6.tar.gz (110.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dfguard-0.2.6-py3-none-any.whl (71.1 kB view details)

Uploaded Python 3

File details

Details for the file dfguard-0.2.6.tar.gz.

File metadata

  • Download URL: dfguard-0.2.6.tar.gz
  • Upload date:
  • Size: 110.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dfguard-0.2.6.tar.gz
Algorithm Hash digest
SHA256 d56c596df72b687001ee67406552982c99f431cfca44aa786fd90e0db3e92f76
MD5 cb5c832a2f8176229e329eba3ea7e95f
BLAKE2b-256 1036797cefbe451b8a286c38f06bd625de1608265e9f42422732f688856d6b77

See more details on using hashes here.

Provenance

The following attestation bundles were made for dfguard-0.2.6.tar.gz:

Publisher: release.yml on nitrajen/dfguard

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dfguard-0.2.6-py3-none-any.whl.

File metadata

  • Download URL: dfguard-0.2.6-py3-none-any.whl
  • Upload date:
  • Size: 71.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dfguard-0.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 2ca03fedb21d246632e5012bf44257f36e5155650c5320a4c421257c66b31327
MD5 1c66eb8867a7e992f06108ac89195458
BLAKE2b-256 e03d7802bce51e6062f24adcfe04fe073a0bd03a8adc6ed0bc6d7eb03ff91c9a

See more details on using hashes here.

Provenance

The following attestation bundles were made for dfguard-0.2.6-py3-none-any.whl:

Publisher: release.yml on nitrajen/dfguard

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page