Skip to main content

Runtime schema enforcement for Python DataFrames

Project description

dfguard

Catch DataFrame schema mismatches at the function call, not deep in your pipeline.

PyPI Python License CI Coverage Docs

Documentation | Quickstart | API Reference


Data pipelines fail late. A DataFrame with the wrong schema enters a function without complaint, the job runs, and the crash surfaces somewhere downstream with an error that tells you nothing about where the mismatch started.

dfguard moves that failure to the function call. The wrong DataFrame is rejected immediately with a precise error: which function, which argument, what schema was expected, what arrived. Lightweight: enforcement is pure metadata inspection — dfguard reads the schema struct from your DataFrame, no data is scanned, no Spark jobs triggered. Unlike pandera, which introduces its own type system, dfguard uses the types your library already ships with: T.LongType() for PySpark, pl.Int64 for Polars, np.dtype("int64") for pandas.

Compatibility

Backend Version Python
PySpark >= 3.3 >= 3.10
pandas >= 1.5 >= 3.10
Polars >= 0.20 >= 3.10

PySpark

import dfguard.pyspark as dfg
from pyspark.sql import SparkSession, functions as F, types as T

spark = SparkSession.builder.getOrCreate()
raw_df = spark.createDataFrame(
    [(1, 10.0, 3), (2, 5.0, 7)],
    "order_id LONG, amount DOUBLE, quantity INT",
)

class RawSchema(dfg.SparkSchema):
    order_id = T.LongType()
    amount   = T.DoubleType()
    quantity = T.IntegerType()

@dfg.enforce
def enrich(df: RawSchema):
    return df.withColumn("revenue", F.col("amount") * F.col("quantity"))

EnrichedSchema = dfg.schema_of(enrich(raw_df))

@dfg.enforce
def flag_high_value(df: EnrichedSchema):
    return df.withColumn("is_vip", F.col("revenue") > 1000)

flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
#   expected: order_id:bigint, amount:double, quantity:int, revenue:double
#   received: order_id:bigint, amount:double, quantity:int

pandas

import numpy as np
import pandas as pd
import dfguard.pandas as dfg

raw_df = pd.DataFrame({
    "order_id": pd.array([1, 2, 3], dtype="int64"),
    "amount":   pd.array([10.0, 5.0, 8.5], dtype="float64"),
    "quantity": pd.array([3, 1, 2], dtype="int64"),
})

class RawSchema(dfg.PandasSchema):
    order_id = np.dtype("int64")
    amount   = np.dtype("float64")
    quantity = np.dtype("int64")

@dfg.enforce
def enrich(df: RawSchema):
    return df.assign(revenue=df["amount"] * df["quantity"])

EnrichedSchema = dfg.schema_of(enrich(raw_df))

@dfg.enforce
def flag_high_value(df: EnrichedSchema):
    return df.assign(is_vip=df["revenue"] > 1000)

flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
#   expected: order_id:int64, amount:float64, quantity:int64, revenue:float64
#   received: order_id:int64, amount:float64, quantity:int64

Polars

import polars as pl
import dfguard.polars as dfg

raw_df = pl.DataFrame({
    "order_id": pl.Series([1, 2, 3], dtype=pl.Int64),
    "amount":   pl.Series([10.0, 5.0, 8.5], dtype=pl.Float64),
    "quantity": pl.Series([3, 1, 2], dtype=pl.Int32),
})

class RawSchema(dfg.PolarsSchema):
    order_id = pl.Int64
    amount   = pl.Float64
    quantity = pl.Int32

@dfg.enforce
def enrich(df: RawSchema) -> pl.DataFrame:
    return df.with_columns(revenue=pl.col("amount") * pl.col("quantity"))

EnrichedSchema = dfg.schema_of(enrich(raw_df))

@dfg.enforce
def flag_high_value(df: EnrichedSchema) -> pl.DataFrame:
    return df.with_columns(is_vip=pl.col("revenue") > 1000)

flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
#   expected: order_id:Int64, amount:Float64, quantity:Int32, revenue:Float64
#   received: order_id:Int64, amount:Float64, quantity:Int32

No validation logic inside functions. The wrong DataFrame simply cannot enter the wrong function.

For package-wide enforcement without decorating each function, call dfg.arm() once from your package entry point.


Install

pip install dfguard[pyspark]   # PySpark
pip install dfguard[pandas]    # pandas
pip install dfguard[polars]    # Polars
pip install dfguard[all]       # all backends

Requires Python >= 3.10. No other mandatory dependencies.


Two ways to define a schema

Capture from a live DataFrame

RawSchema      = dfg.schema_of(raw_df)       # exact snapshot of this stage
EnrichedSchema = dfg.schema_of(enriched_df)  # new type after adding columns

Exact matching: a DataFrame with extra columns does not satisfy RawSchema. Capture a new type at each stage boundary.

Declare upfront

No live DataFrame needed. Subclasses inherit parent fields. All three backends support nested types fully.

PySpark — arrays, structs, maps via T.ArrayType / T.StructType / T.MapType:

from dfguard.pyspark import Optional
from pyspark.sql import types as T

class OrderSchema(dfg.SparkSchema):
    order_id   = T.LongType()
    amount     = T.DoubleType()
    line_items = T.ArrayType(T.StructType([          # array of structs
        T.StructField("sku",      T.StringType()),
        T.StructField("quantity", T.IntegerType()),
        T.StructField("price",    T.DoubleType()),
    ]))
    zip_code   = Optional[T.StringType()]            # nullable field

class EnrichedSchema(OrderSchema):                   # inherits all parent fields
    revenue = T.DoubleType()

df = spark.createDataFrame(rows, OrderSchema.to_struct())

Polarspl.List, pl.Struct, pl.Array are native first-class types:

from dfguard.polars import Optional

class OrderSchema(dfg.PolarsSchema):
    order_id   = pl.Int64
    amount     = pl.Float64
    line_items = pl.List(pl.Struct({                 # list of structs
        "sku":      pl.String,
        "quantity": pl.Int32,
        "price":    pl.Float64,
    }))
    zip_code   = Optional[pl.String]                 # nullable field

pandas — use pd.ArrowDtype (requires pyarrow) for nested types:

import pyarrow as pa
from dfguard.pandas import Optional

class OrderSchema(dfg.PandasSchema):
    order_id   = np.dtype("int64")
    amount     = np.dtype("float64")
    line_items = pd.ArrowDtype(pa.list_(pa.struct([  # nested via PyArrow
        pa.field("sku",      pa.string()),
        pa.field("quantity", pa.int32()),
        pa.field("price",    pa.float64()),
    ])))
    zip_code   = Optional[pd.StringDtype()]          # nullable field

pandas + PyArrow: pd.ArrowDtype gives pandas the same nested-type enforcement as PySpark and Polars — arrays, structs, and maps at arbitrary depth. Without PyArrow, pandas dtype enforcement is limited to flat scalar types (np.dtype, pd.StringDtype, etc.). Install with pip install dfguard[pandas] pyarrow.


Enforcement

Arm once, protect everything

# my_pipeline/__init__.py
import dfguard.pyspark as dfg

dfg.arm()   # walks the package, wraps every annotated function

Functions with schema-annotated arguments are enforced automatically, no decorator needed on each one:

# my_pipeline/transforms.py
def enrich(df: OrderSchema):       # enforced automatically
    return df.withColumn(...)

def aggregate(df: EnrichedSchema): # also enforced
    return df.groupBy(...)

Per-function decoration

Use @dfg.enforce in scripts and notebooks, or when you want a function-level subset override:

@dfg.enforce                   # subset=True: extra columns fine (default)
def process(df: OrderSchema): ...

@dfg.enforce(subset=False)     # exact match: no extra columns allowed
def write_final(df: OrderSchema): ...

The subset flag

subset=True (default): all declared columns must be present with the right types; extra columns are fine. subset=False: declared columns must be present and no extras are allowed.

Set it globally via dfg.arm(subset=False). Override per function via @dfg.enforce(subset=True). Function level always wins. schema_of types always use exact matching regardless of subset.

Disabling enforcement

dfg.disarm() turns off all enforcement globally, whether wrapped by dfg.arm() or decorated with @dfg.enforce. Useful in tests.

dfg.arm()
enrich(wrong_df)   # raises

dfg.disarm()
enrich(wrong_df)   # passes: enforcement is off

Validate at load time

Use assert_valid right after reading from storage to catch upstream schema drift before processing starts:

raw = spark.read.parquet("/data/orders/raw.parquet")
OrderSchema.assert_valid(raw)   # raises SchemaValidationError if schema changed

enriched = enrich(raw)          # @dfg.enforce then guards the function call

Reports all problems at once, not just the first:

SchemaValidationError: Schema validation failed:
  ✗ Column 'revenue': type mismatch: expected double, got string
  ✗ Missing column 'is_high_value' (expected boolean, nullable=False)

Schema history

dfg.dataset(df) records every schema-changing operation. Call .schema_history.print() to see the full evolution:

ds = dfg.dataset(raw_df)
ds = ds.withColumn("revenue",  F.col("amount") * 1.1)
ds = ds.withColumn("discount", F.when(F.col("revenue") > 500, 50.0).otherwise(0.0))
ds = ds.drop("tags")
ds = ds.withColumnRenamed("customer", "customer_name")

ds.schema_history.print()
# ────────────────────────────────────────────────────────────
# Schema Evolution
# ────────────────────────────────────────────────────────────
#   [ 0] input
#        struct<order_id:bigint,customer:string,amount:double,...>  (no schema change)
#   [ 1] withColumn('revenue')
#        added: revenue:double
#   [ 2] withColumn('discount')
#        added: discount:double
#   [ 3] drop(['tags'])
#        dropped: tags
#   [ 4] withColumnRenamed('customer'→'customer_name')
#        added: customer_name:string | dropped: customer
# ────────────────────────────────────────────────────────────

Pipeline integrations

dfguard fits naturally into pipeline frameworks. See the full docs for working examples with runnable code:

  • Airflow: dfg.arm() globally, assert_valid after loading from storage, @dfg.enforce(subset=False) on functions that write to fixed-schema sinks
  • Kedro: dfg.arm() in settings.py, node functions need no decorators

Documentation

nitrajen.github.io/dfguard


License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dfguard-0.2.3.tar.gz (109.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dfguard-0.2.3-py3-none-any.whl (71.1 kB view details)

Uploaded Python 3

File details

Details for the file dfguard-0.2.3.tar.gz.

File metadata

  • Download URL: dfguard-0.2.3.tar.gz
  • Upload date:
  • Size: 109.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dfguard-0.2.3.tar.gz
Algorithm Hash digest
SHA256 a1e9160fcea6d1ea429172c9c91cd9a4f38f13ed01da1581e69cd23ae9cf31e1
MD5 4bd7a09a032b2f4452573866d380b582
BLAKE2b-256 a44d3a812c3a1ac8a8c090cfd145b41e85e6343b374c1b8fec130df5d4febb46

See more details on using hashes here.

Provenance

The following attestation bundles were made for dfguard-0.2.3.tar.gz:

Publisher: release.yml on nitrajen/dfguard

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dfguard-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: dfguard-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 71.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dfguard-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 e18fd01598a5724725f308e2ee65fb00e374764c7f403c56ed01bc9c2c385147
MD5 9f4fcd781c0a5486dfdb92ae50392e79
BLAKE2b-256 6195ec4910f3f2a72d778e7b210370e0480e9d7a6fc6167cc4ddfe7d4b19b7eb

See more details on using hashes here.

Provenance

The following attestation bundles were made for dfguard-0.2.3-py3-none-any.whl:

Publisher: release.yml on nitrajen/dfguard

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page