Skip to main content

Data contracts For Free using ODCS (Bitol)

Project description

dc43 — Data Contracts For Free!

Overview

  • Purpose: Generate, store, evolve and apply data contracts in Databricks pipelines, using standard Spark IO and Delta Live Tables (DLT) where possible.
  • Contract style: Bitol/ODCS v3.0.2 JSON. Maps to Spark schemas and DLT expectations.
  • Core features:
    • SemVer helpers for version checks
    • Validation and auto-casting for Spark DataFrames
    • IO helpers for read/write with contract enforcement
    • Storage backends: filesystem (DBFS/UC volumes) and Delta table
  • DLT helpers: build expectations from contracts
    • DQ orchestration: pluggable client interface; stub implementation provided
  • Bitol/ODCS support: relies on the official open-data-contract-standard models (v3.0.2). No internal stubs.

Install

  • As a source lib (Databricks Repos, workspace files) or package. No hard dependencies by default; bring your own pyspark on Databricks clusters.

Quickstart

  1. Define a contract (ODCS typed model)
from open_data_contract_standard.model import (
    OpenDataContractStandard, SchemaObject, SchemaProperty, Description
)

contract = OpenDataContractStandard(
    version="0.1.0",
    kind="DataContract",
    apiVersion="3.0.2",
    id="sales.orders",
    name="Orders",
    description=Description(usage="Orders facts"),
    schema_=[
        SchemaObject(
            name="orders",
            properties=[
                SchemaProperty(name="order_id", physicalType="bigint", required=True, unique=True),
                SchemaProperty(name="customer_id", physicalType="bigint", required=True),
                SchemaProperty(name="order_ts", physicalType="timestamp", required=True),
                SchemaProperty(name="amount", physicalType="double", required=True),
                SchemaProperty(
                    name="currency", physicalType="string", required=True,
                    logicalTypeOptions={"enum": ["EUR", "USD"]}
                ),
            ],
        )
    ],
)
  1. Validate and write with Spark
from dc43.integration.spark_io import write_with_contract

write_with_contract(
    df=orders_df,
    contract=contract,
    path="/mnt/gold/sales/orders",
    format=contract.servers[0].format,
    mode="append",
    enforce=True,
    auto_cast=True,
)
  1. DLT usage (inside a pipeline notebook)
import dlt
from dc43.integration.dlt_helpers import expectations_from_contract

@dlt.table(name="orders")
def orders():
    df = spark.read.stream.table("bronze.sales_orders_raw")
    exps = expectations_from_contract(contract)
    dlt.expect_all(exps)
    return df.select("order_id", "customer_id", "order_ts", "amount", "currency")
  1. Store and resolve contracts
from dc43.storage.fs import FSContractStore

store = FSContractStore(base_path="/mnt/contracts")
store.put(contract)
latest = store.latest("sales.orders")
  1. DQ/DO orchestration on read
from dc43.integration.spark_io import read_with_contract
from dc43.dq.stub import StubDQClient

dq = StubDQClient(base_path="/mnt/dq_state")
df, status = read_with_contract(
    spark,
    format="delta",
    path="/mnt/gold/sales/orders",
    contract=contract,
    expected_contract_version="==0.1.0",
    dq_client=dq,
    return_status=True,
)
print(status.status, status.reason)
  1. Draft contract proposal on mismatch (write)
from dc43.integration.spark_io import write_with_contract
from dc43.storage.fs import FSContractStore

store = FSContractStore("/mnt/contracts-drafts")
vr, draft = write_with_contract(
    df=orders_df,
    contract=contract,
    path="/mnt/gold/sales/orders",
    format=contract.servers[0].format,
    mode="append",
    enforce=False,                 # continue writing
    draft_on_mismatch=True,        # create a draft when schema diverges
    draft_store=store,             # persist the draft
    return_draft=True,
)
if draft:
    print("Draft created:", draft.id, draft.version)  # send to workflow

Demo application

A Vue-powered FastAPI application in dc43.demo_app offers a visual way to explore contracts, datasets and data quality results. Install the optional dependencies and launch the app with:

pip install ".[demo]"
dc43-demo

Visit http://localhost:8000 to:

  • Browse contracts and their versions with draft/active status.
  • Inspect dataset versions, their linked contract, validation status and detailed DQ metrics derived from contract rules.
  • Highlight datasets using draft contracts and trigger validation to promote them.

An additional Reveal.js presentation is available at http://localhost:8000/static/presentation.html to walk through the contract lifecycle and automation steps.

The application also exposes an example Spark pipeline in dc43.demo_app.pipeline used when registering new dataset versions.

Spark Flow (Mermaid)

flowchart TD
    subgraph Read
      U[User code / Notebook] --> RWC{read_with_contract}
      RWC --> SR["spark.read.format(...).load"]
      RWC --> EV["ensure_version(contract)"]
      EV --> VAL[validate_dataframe]
      VAL -->|ok| AC["apply_contract (cast/order)"]
      VAL -->|errors & enforce| E1[Raise]
      AC --> DF[DataFrame ready]
      RWC --> DQ{dq_client?}
      DQ -->|yes| GS["get_status(dataset@version, contract@version)"]
      GS -->|unknown/stale| CM[compute_metrics]
      CM --> SM[submit_metrics -> status]
      GS -->|up-to-date| SM
      SM -->|block & enforce| E2[Raise]
      SM -->|ok/warn| DF
    end

    DF --> T[Transformations / joins / aggregations]

    subgraph Write
      T --> WWC{write_with_contract}
      WWC --> V2[validate_dataframe]
      V2 -->|ok| AC2[apply_contract]
      V2 -->|errors & draft_on_mismatch| PD["propose draft (ODCS model)"]
      PD --> PS[draft_store.put]
      V2 -->|errors & enforce| E3[Raise]
      AC2 --> SW["spark.write.(format, options).mode.save"]
      SW --> DELTA[Delta table / UC]
    end

    classDef err fill:#ffe5e5,stroke:#ff4d4f,color:#000
    class E1,E2,E3 err

Notes

  • The library uses the official ODCS package and enforces $schema version 3.0.2 by default (configurable via DC43_ODCS_REQUIRED).
  • Validation focuses on practical checks: presence, types, nullability and common constraints (enum, regex, min/max, unique best-effort).
  • DLT helpers translate constraints into expect expressions when feasible.
  • DQ orchestration: the IO wrapper checks schema vs contract and consults the DQ client. If dataset version is newer than DQ’s known version, it computes the required metrics and submits them, then enforces the resulting status if requested.

Local Dev

  • Install dependencies locally (match your Databricks runtime where possible):
pip install open-data-contract-standard==3.0.2 pyspark
  • dc43 enforces apiVersion via DC43_ODCS_REQUIRED (default 3.0.2).

Tests

  • Install test extras and run
pip install -e ".[test]"
pytest

Publishing

  • Push a tag v* on a commit in main.
  • The workflow .github/workflows/release.yml runs tests, builds the package, waits for manual approval via environment release, publishes to PyPI, and creates a GitHub release with autogenerated notes.
  • Set PYPI_TOKEN secret and configure the release environment with required reviewers.

License

Apache 2

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dc43-0.0.2.0.tar.gz (43.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dc43-0.0.2.0-py3-none-any.whl (51.5 kB view details)

Uploaded Python 3

File details

Details for the file dc43-0.0.2.0.tar.gz.

File metadata

  • Download URL: dc43-0.0.2.0.tar.gz
  • Upload date:
  • Size: 43.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dc43-0.0.2.0.tar.gz
Algorithm Hash digest
SHA256 50a832467cf9f9483b5a47015c35fdf560f2c9a55b5b697c53a431c3a0c207bb
MD5 ae2e5ef662ec81ee5011008c9b5d7810
BLAKE2b-256 424eb4937a5a7734f869c0530a36849cfc35ca1ba5fed012d3c6fc2acb88103b

See more details on using hashes here.

File details

Details for the file dc43-0.0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dc43-0.0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 51.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dc43-0.0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b9034efc980e8b8b7ac438a9b3e8159c8db79b9c17e940d8673f265518627fd1
MD5 a8e398999706c606ba1915d5264d699b
BLAKE2b-256 d54583f65f1861c0464e2ced700da3f66dca023ff13281be4bcf99ab81faf93e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page