Data contracts For Free using ODCS (Bitol)
Project description
dc43 — Data Contracts For Free!
Overview
- Purpose: Generate, store, evolve and apply data contracts in Databricks pipelines, using standard Spark IO and Delta Live Tables (DLT) where possible.
- Contract style: Bitol/ODCS v3.0.2 JSON. Maps to Spark schemas and DLT expectations.
- Core features:
- SemVer helpers for version checks
- Validation and auto-casting for Spark DataFrames
- IO helpers for read/write with contract enforcement
- Storage backends: filesystem (DBFS/UC volumes) and Delta table
- DLT helpers: build expectations from contracts
- DQ orchestration: pluggable client interface; stub implementation provided
- Bitol/ODCS support: relies on the official
open-data-contract-standardmodels (v3.0.2). No internal stubs.
Install
- As a source lib (Databricks Repos, workspace files) or package. No hard dependencies by default; bring your own
pysparkon Databricks clusters.
Quickstart
- Define a contract (ODCS typed model)
from open_data_contract_standard.model import (
OpenDataContractStandard, SchemaObject, SchemaProperty, Description
)
contract = OpenDataContractStandard(
version="0.1.0",
kind="DataContract",
apiVersion="3.0.2",
id="sales.orders",
name="Orders",
description=Description(usage="Orders facts"),
schema_=[
SchemaObject(
name="orders",
properties=[
SchemaProperty(name="order_id", physicalType="bigint", required=True, unique=True),
SchemaProperty(name="customer_id", physicalType="bigint", required=True),
SchemaProperty(name="order_ts", physicalType="timestamp", required=True),
SchemaProperty(name="amount", physicalType="double", required=True),
SchemaProperty(
name="currency", physicalType="string", required=True,
logicalTypeOptions={"enum": ["EUR", "USD"]}
),
],
)
],
)
- Validate and write with Spark
from dc43.integration.spark_io import write_with_contract
write_with_contract(
df=orders_df,
contract=contract,
path="/mnt/gold/sales/orders",
format=contract.servers[0].format,
mode="append",
enforce=True,
auto_cast=True,
)
- DLT usage (inside a pipeline notebook)
import dlt
from dc43.integration.dlt_helpers import expectations_from_contract
@dlt.table(name="orders")
def orders():
df = spark.read.stream.table("bronze.sales_orders_raw")
exps = expectations_from_contract(contract)
dlt.expect_all(exps)
return df.select("order_id", "customer_id", "order_ts", "amount", "currency")
- Store and resolve contracts
from dc43.storage.fs import FSContractStore
store = FSContractStore(base_path="/mnt/contracts")
store.put(contract)
latest = store.latest("sales.orders")
- DQ/DO orchestration on read
from dc43.integration.spark_io import read_with_contract
from dc43.dq.stub import StubDQClient
dq = StubDQClient(base_path="/mnt/dq_state")
df, status = read_with_contract(
spark,
format="delta",
path="/mnt/gold/sales/orders",
contract=contract,
expected_contract_version="==0.1.0",
dq_client=dq,
return_status=True,
)
print(status.status, status.reason)
- Draft contract proposal on mismatch (write)
from dc43.integration.spark_io import write_with_contract
from dc43.storage.fs import FSContractStore
store = FSContractStore("/mnt/contracts-drafts")
vr, draft = write_with_contract(
df=orders_df,
contract=contract,
path="/mnt/gold/sales/orders",
format=contract.servers[0].format,
mode="append",
enforce=False, # continue writing
draft_on_mismatch=True, # create a draft when schema diverges
draft_store=store, # persist the draft
return_draft=True,
)
if draft:
print("Draft created:", draft.id, draft.version) # send to workflow
Demo application
A Vue-powered FastAPI application in dc43.demo_app offers a visual way to
explore contracts, datasets and data quality results. Install the optional
dependencies and launch the app with:
pip install ".[demo]"
dc43-demo
Visit http://localhost:8000 to:
- Browse contracts and their versions with draft/active status.
- Inspect dataset versions, their linked contract, validation status and detailed DQ metrics derived from contract rules.
- Highlight datasets using draft contracts and trigger validation to promote them.
An additional Reveal.js presentation is available at
http://localhost:8000/static/presentation.html to walk through the
contract lifecycle and automation steps.
The application also exposes an example Spark pipeline in
dc43.demo_app.pipeline used when registering new dataset versions.
Spark Flow (Mermaid)
flowchart TD
subgraph Read
U[User code / Notebook] --> RWC{read_with_contract}
RWC --> SR["spark.read.format(...).load"]
RWC --> EV["ensure_version(contract)"]
EV --> VAL[validate_dataframe]
VAL -->|ok| AC["apply_contract (cast/order)"]
VAL -->|errors & enforce| E1[Raise]
AC --> DF[DataFrame ready]
RWC --> DQ{dq_client?}
DQ -->|yes| GS["get_status(dataset@version, contract@version)"]
GS -->|unknown/stale| CM[compute_metrics]
CM --> SM[submit_metrics -> status]
GS -->|up-to-date| SM
SM -->|block & enforce| E2[Raise]
SM -->|ok/warn| DF
end
DF --> T[Transformations / joins / aggregations]
subgraph Write
T --> WWC{write_with_contract}
WWC --> V2[validate_dataframe]
V2 -->|ok| AC2[apply_contract]
V2 -->|errors & draft_on_mismatch| PD["propose draft (ODCS model)"]
PD --> PS[draft_store.put]
V2 -->|errors & enforce| E3[Raise]
AC2 --> SW["spark.write.(format, options).mode.save"]
SW --> DELTA[Delta table / UC]
end
classDef err fill:#ffe5e5,stroke:#ff4d4f,color:#000
class E1,E2,E3 err
Notes
- The library uses the official ODCS package and enforces
$schemaversion3.0.2by default (configurable viaDC43_ODCS_REQUIRED). - Validation focuses on practical checks: presence, types, nullability and common constraints (enum, regex, min/max, unique best-effort).
- DLT helpers translate constraints into
expectexpressions when feasible. - DQ orchestration: the IO wrapper checks schema vs contract and consults the DQ client. If dataset version is newer than DQ’s known version, it computes the required metrics and submits them, then enforces the resulting status if requested.
Local Dev
- Install dependencies locally (match your Databricks runtime where possible):
pip install open-data-contract-standard==3.0.2 pyspark
- dc43 enforces
apiVersionviaDC43_ODCS_REQUIRED(default3.0.2).
Tests
- Install test extras and run
pip install -e ".[test]"
pytest
Publishing
- Push a tag
v*on a commit inmain. - The workflow
.github/workflows/release.ymlruns tests, builds the package, waits for manual approval via environmentrelease, publishes to PyPI, and creates a GitHub release with autogenerated notes. - Set
PYPI_TOKENsecret and configure thereleaseenvironment with required reviewers.
License
Apache 2
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dc43-0.0.2.0.tar.gz.
File metadata
- Download URL: dc43-0.0.2.0.tar.gz
- Upload date:
- Size: 43.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
50a832467cf9f9483b5a47015c35fdf560f2c9a55b5b697c53a431c3a0c207bb
|
|
| MD5 |
ae2e5ef662ec81ee5011008c9b5d7810
|
|
| BLAKE2b-256 |
424eb4937a5a7734f869c0530a36849cfc35ca1ba5fed012d3c6fc2acb88103b
|
File details
Details for the file dc43-0.0.2.0-py3-none-any.whl.
File metadata
- Download URL: dc43-0.0.2.0-py3-none-any.whl
- Upload date:
- Size: 51.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b9034efc980e8b8b7ac438a9b3e8159c8db79b9c17e940d8673f265518627fd1
|
|
| MD5 |
a8e398999706c606ba1915d5264d699b
|
|
| BLAKE2b-256 |
d54583f65f1861c0464e2ced700da3f66dca023ff13281be4bcf99ab81faf93e
|