Data contracts For Free using ODCS (Bitol)
Project description
dc43 — Data Contracts For Free!
Overview
Conceptual platform
dc43 is a governance-first toolkit that separates the concepts of data contract management from their implementations. At its core it provides:
- Contract lifecycle management primitives to draft, review, approve, and retire ODCS contracts. Note: dc43 currently targets ODCS 3.0.2; a compatibility table will follow as the spec evolves.
- Extensible interfaces for contract storage, drafting, and data quality orchestration that keep governance decisions close to the data contract owner.
- Runtime helpers to apply approved specifications in compute platforms while feeding observations back to governance workflows.
Provided integrations
On top of the conceptual platform, dc43 ships opinionated integrations that you can adopt or replace:
- Spark & DLT pipelines via
dc43_integrations.spark.iowith schema/metric helpers fromdc43_service_backends.data_quality.backendfor auto-casting and contract-aware IO. - Storage backends such as filesystem (DBFS/UC volumes), Delta tables, and Collibra through
CollibraContractStore. - A pluggable data-quality client with a stub implementation that can be replaced by catalog-native tools.
See docs/implementations/data-quality-governance/collibra.md for end-to-end orchestration guidance when Collibra owns stewardship workflows. Component deep dives cover the contract store, contract drafter, data-quality governance interface, data-quality engine, and integration layer. Each component links to implementation catalogs under docs/implementations/ so you can pick technology-specific guides (Spark, Delta, Collibra, ...).
Component model
dc43 exposes a small set of well-defined components. Swap any of them without rewriting the rest of the stack:
| Layer | Component | Responsibility |
|---|---|---|
| Governance | Contract manager/store interface | Retrieve, version, and persist contracts from catalog-backed or file-based sources. |
| Governance | Data quality manager interface | Coordinate with an external DQ governance tool (e.g., Collibra, Unity Catalog) that records dataset↔contract alignment and approval state. |
| Authoring support | Contract drafter module | Generate ODCS drafts from observed data or schema drift events before handing them back to governance. |
| Runtime services | Data-quality metrics engine | Collect contract-driven metrics in execution engines and forward them to the governance tool for status evaluation. |
| Integration | Integration adapters | Bridge the contract, drafter, and DQ components into execution engines such as Spark or Delta Live Tables (current adapters live under dc43_integrations.spark). |
Guides for each component live under docs/:
- Contract store:
component-contract-store.md - Contract drafter:
component-contract-drafter.md - Data-quality governance interface:
component-data-quality-governance.md - Data-quality engine:
component-data-quality-engine.md - Integration layer:
component-integration-layer.md; Spark & DLT adapter:implementations/integration/spark-dlt.md - Write violation strategies:
component-write-violation-strategies.md
Architecture
flowchart TD
subgraph Governance & Stewardship
Authoring["Authoring Tools\nJSON · Git · Notebooks"]
ContractStore["Contract Store Interface\nGit · Filesystem · APIs"]
DQTool["Data Quality Governance Tool\nCatalog · Collibra"]
end
subgraph Lifecycle Services
DraftModule["Contract Drafter Module"]
DQManager["Data Quality Manager Interface"]
end
subgraph Runtime Execution
DQEngine["Data Quality Engine\nmetrics generation"]
IOHelpers["Integration Adapters\nSpark · DLT"]
Pipelines["Business Pipelines"]
end
Authoring -->|publish / review| ContractStore
ContractStore -->|serve versions| IOHelpers
ContractStore -->|seed drafts| DraftModule
DraftModule -->|return proposals| Authoring
IOHelpers -->|apply contracts| Pipelines
Pipelines -->|observations| IOHelpers
IOHelpers -->|metrics & schema drift| DQEngine
DQEngine -->|metrics package| DQManager
DQManager -->|submit metrics| DQTool
DQTool -->|compatibility verdict| DQManager
DQTool -->|validated versions| ContractStore
DQManager -->|notify runtime| IOHelpers
This architecture clarifies how governance assets, lifecycle services, and runtime execution collaborate:
- Governance systems own the authoritative contract store and the data-quality tool. Labeled edges (
publish / review,validated versions) highlight how those systems steer approvals. - Lifecycle services—drafting and data-quality management—mediate between governance and runtime. The drafter turns runtime feedback into proposals while the DQ manager relays metrics, waits for a verdict, and shares compatibility context with both stewards and pipelines.
- Integration adapters inside runtime engines (Spark, DLT, …) apply contracts, emit observations, and react when governance signals change.
Node & edge glossary
- Contract store interface – pluggable storage adapters (filesystem, Delta, Collibra) that resolve authoritative contract versions.
- Data quality manager interface – the dc43 protocol that hands metrics to the governance platform and retrieves compatibility verdicts.
- Data quality governance tool – catalog or observability system (Collibra, Unity Catalog, bespoke services) that persists the compatibility matrix and performs the actual check evaluation once it receives metrics.
- Metrics package – the bundle of row counts, expectation results, and schema drift context emitted by the runtime so the governance tool can recompute the dataset↔contract status.
Variations—such as Collibra-governed contracts or bespoke storage backends—slot into the same model by substituting implementations of the interfaces described above.
Install
dc43 now ships as a family of distributions so you can install only the layers you need:
| Distribution | Imports | Responsibility | Depends on |
|---|---|---|---|
dc43-service-clients |
dc43_service_clients.* |
Typed service clients, request/response models, and governance helpers that front-end applications can embed. | open-data-contract-standard |
dc43-service-backends |
dc43_service_backends.* |
Reference backend implementations (filesystem store, local drafting, in-memory governance service) that orchestrate the client layer. | dc43-service-clients |
dc43-integrations |
dc43_integrations.* |
Runtime adapters such as the Spark helpers that call into client APIs without requiring backend dependencies. | dc43-service-clients |
dc43 |
dc43.* |
Aggregating package that wires the CLI/demo and depends on the three modules above. | all of the above |
Pip installs
- Service contracts only:
pip install dc43-service-clients - Backend reference services:
pip install dc43-service-backends - HTTP service backends:
pip install "dc43-service-backends[http]"(seedeploy/http-backend/README.mdfor a containerised deployment recipe) - Spark integrations:
pip install "dc43-integrations[spark]" - Full stack:
pip install dc43 - Spark extras for the meta package:
pip install "dc43[spark]" - Demo app:
pip install "dc43[demo]"
When developing locally (Databricks Repos, workspace files, or any source checkout) the editable install automatically pulls in the sibling packages:
pip install -e .
Chain extras as needed—for example to prepare the test environment run:
pip install -e ".[test]"
Each distribution can now be installed independently. For example, lightweight clients can use pip install dc43-service-clients and then from dc43_service_clients.data_quality import ValidationResult without pulling in the backend or Spark helpers at runtime.
Quickstart
- Define a contract (ODCS typed model)
from open_data_contract_standard.model import (
OpenDataContractStandard, SchemaObject, SchemaProperty, Description
)
contract = OpenDataContractStandard(
version="0.1.0",
kind="DataContract",
apiVersion="3.0.2",
id="sales.orders",
name="Orders",
description=Description(usage="Orders facts"),
schema_=[
SchemaObject(
name="orders",
properties=[
SchemaProperty(name="order_id", physicalType="bigint", required=True, unique=True),
SchemaProperty(name="customer_id", physicalType="bigint", required=True),
SchemaProperty(name="order_ts", physicalType="timestamp", required=True),
SchemaProperty(name="amount", physicalType="double", required=True),
SchemaProperty(
name="currency", physicalType="string", required=True,
logicalTypeOptions={"enum": ["EUR", "USD"]}
),
],
)
],
)
- Validate and write with Spark
from dc43_service_backends.contracts.backend.stores import FSContractStore
from dc43_service_clients.contracts import LocalContractServiceClient
from dc43_integrations.spark.io import (
write_with_contract,
ContractVersionLocator,
)
store = FSContractStore(base_path="/mnt/contracts")
contract_service = LocalContractServiceClient(store)
write_with_contract(
df=orders_df,
contract_id="sales.orders",
contract_service=contract_service,
expected_contract_version=">=0.1.0",
dataset_locator=ContractVersionLocator(dataset_version="latest"),
mode="append",
enforce=True,
auto_cast=True,
)
- DLT usage (inside a pipeline notebook)
import dlt
from collections.abc import Mapping
from dc43_integrations.spark.dlt import apply_dlt_expectations
@dlt.table(name="orders")
def orders():
df = spark.read.stream.table("bronze.sales_orders_raw")
# Retrieve predicates from your configured data-quality service.
predicates = dq_status.details.get("expectation_predicates")
if isinstance(predicates, Mapping):
apply_dlt_expectations(dlt, predicates)
return df.select("order_id", "customer_id", "order_ts", "amount", "currency")
- Store and resolve contracts
from dc43_service_backends.contracts.backend.stores import FSContractStore
store = FSContractStore(base_path="/mnt/contracts")
store.put(contract)
latest = store.latest("sales.orders")
- DQ/DO orchestration on read
from dc43_integrations.spark.io import read_with_contract, ContractVersionLocator
from dc43_service_clients.contracts import LocalContractServiceClient
from dc43_service_clients.governance import build_local_governance_service
governance = build_local_governance_service(store)
contract_service = LocalContractServiceClient(store)
df, status = read_with_contract(
spark,
contract_id="sales.orders",
contract_service=contract_service,
expected_contract_version="==0.1.0",
governance_service=governance,
dataset_locator=ContractVersionLocator(dataset_version="latest"),
return_status=True,
)
print(status.status, status.reason)
- Quality status check on write
from dc43_integrations.spark.io import write_with_contract, ContractVersionLocator
from dc43_service_clients.contracts import LocalContractServiceClient
contract_service = LocalContractServiceClient(store)
vr, status = write_with_contract(
df=orders_df,
contract_id="sales.orders",
contract_service=contract_service,
expected_contract_version=">=0.1.0",
dataset_locator=ContractVersionLocator(dataset_version="latest"),
mode="append",
enforce=False, # continue writing
governance_service=governance,
return_status=True,
)
if status and status.status == "block":
raise ValueError(f"DQ blocked write: {status.details}")
Demo application
A Vue-powered FastAPI application packaged as dc43-demo-app (module
dc43_demo_app) offers a visual way to explore contracts, datasets and data
quality results. Install the optional dependencies and launch the app with:
pip install ".[demo]"
dc43-demo
Visit http://localhost:8000 to:
- Browse contracts and their versions with draft/active status.
- Inspect dataset versions, their linked contract, validation status and detailed DQ metrics derived from contract rules.
- Highlight datasets using draft contracts and trigger validation to promote them.
An additional Reveal.js presentation is available at
http://localhost:8000/static/presentation.html to walk through the
contract lifecycle and automation steps.
The application also exposes an example Spark pipeline in
dc43_demo_app.pipeline used when registering new dataset versions. The
preconfigured scenarios are documented in
docs/demo-pipeline-scenarios.md including
the new split strategy example that writes orders_enriched::valid and
orders_enriched::reject alongside the main dataset.
The demo now drives its contract, governance, and data-quality operations through the same HTTP clients that production pipelines use. When the UI starts it spins up an in-process instance of the service backend HTTP application, so Spark runs exercise the remote code paths without requiring an external deployment.
Running the service backend over HTTP
Pipelines that rely on the remote clients can reach a standalone backend via
the dc43_service_backends.webapp module. Set the contract directory and (if
needed) a bearer token before launching uvicorn:
export DC43_CONTRACT_STORE=/path/to/contracts
export DC43_BACKEND_TOKEN="super-secret" # optional
uvicorn dc43_service_backends.webapp:app --host 0.0.0.0 --port 8001
Contracts stored under $DC43_CONTRACT_STORE are served over the API while the
stub data-quality and governance backends keep draft information on disk. See
docs/implementations/service-backends/http-server.md
for Docker packaging notes and deployment options.
Spark Flow (Mermaid)
flowchart LR
subgraph Read
U[User code / Notebook] --> RWC{read_with_contract}
RWC --> SR["spark.read.format(...).load"]
RWC --> EV["ensure_version(contract)"]
EV --> OBS[collect_observations]
OBS --> DQS["dq_service.evaluate"]
DQS -->|ok| AC["apply_contract (cast/order)"]
DQS -->|errors & enforce| E1[Raise]
AC --> DF[DataFrame ready]
RWC --> DQ{dq_client?}
DQ -->|yes| GS["get_status(dataset@version, contract@version)"]
GS -->|unknown/stale| CM[compute_metrics]
CM --> SM[submit_metrics -> status]
GS -->|up-to-date| SM
SM -->|block & enforce| E2[Raise]
SM -->|ok/warn| DF
end
DF --> T[Transformations / joins / aggregations]
subgraph Write
T --> WWC{write_with_contract}
WWC --> OBS2[collect_observations]
OBS2 --> DQS2["dq_service.evaluate"]
DQS2 -->|ok| AC2[apply_contract]
DQS2 -->|errors & enforce| E3[Raise]
DQS2 -->|errors & !enforce| AC2
AC2 --> SW["spark.write.(format, options).mode.save"]
SW --> DELTA[Delta table / UC]
DELTA --> DQ2{dq_client?}
DQ2 -->|yes| EVAL["evaluate_dataset"]
EVAL -->|status block & enforce| E4[Raise]
EVAL -->|status ok/warn| DELTA
end
classDef err fill:#ffe5e5,stroke:#ff4d4f,color:#000
class E1,E2,E3,E4 err
Notes
- The library uses the official ODCS package and enforces
$schemaversion3.0.2by default (configurable viaDC43_ODCS_REQUIRED). - Validation runs inside the Spark DQ engine: presence, types, nullability and common constraints (enum, regex, min/max, unique best-effort). The resulting
ValidationResultbundles metrics and a schema snapshot so governance adapters can update compatibility matrices. - DLT helpers translate constraints into
expectexpressions when feasible. - DQ orchestration: the IO wrapper checks schema vs contract and consults the DQ client. If dataset version is newer than DQ’s known version, it computes the required metrics and submits them, then enforces the resulting status if requested.
Local Dev
- Install dependencies locally (match your Databricks runtime where possible):
pip install open-data-contract-standard==3.0.2 pyspark
- dc43 enforces
apiVersionviaDC43_ODCS_REQUIRED(default3.0.2).
Tests
- Install the shared test extras and run the suites that matter for the area you
are touching. The top-level pytest configuration automatically adds each
package's
srcdirectory tosys.path, so cloning the repo and runningpytestworks even before the editable installs finish building.
# Option A: run everything with one helper (installs extras unless --skip-install is passed)
./scripts/test_all.sh
# Option B: manage installs yourself and invoke pytest directly
pip install -e ".[test]"
pytest -q tests packages/dc43-service-clients/tests \
packages/dc43-service-backends/tests packages/dc43-integrations/tests
Publishing
- Push a tag
v*on a commit inmain. - The workflow
.github/workflows/release.ymlruns tests, builds the package, waits for manual approval via environmentrelease, publishes to PyPI, and creates a GitHub release with autogenerated notes. - Set
PYPI_TOKENsecret and configure thereleaseenvironment with required reviewers.
License
Apache 2
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dc43-0.14.0.0.tar.gz.
File metadata
- Download URL: dc43-0.14.0.0.tar.gz
- Upload date:
- Size: 35.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
24759682d2da4a0d3bb3508f4f70dfd56d7c9c2c59401ffc1b4b34bd4fc8355d
|
|
| MD5 |
475098f792abaddf8724a4835bcb800d
|
|
| BLAKE2b-256 |
688008c0b6856065287af7640866a6eaccca02f138f9770c2786fa0faaf21198
|
File details
Details for the file dc43-0.14.0.0-py3-none-any.whl.
File metadata
- Download URL: dc43-0.14.0.0-py3-none-any.whl
- Upload date:
- Size: 16.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c55b0fd54ec41a33a5230250b2d5fcc4deb105afc2f96b25d7209b08d534c3d2
|
|
| MD5 |
7b2da23ddb24ad2722ba600bcf5a5602
|
|
| BLAKE2b-256 |
cc3882439691b8406131a2a5b940a62faa7c89856b8964936960a3d42dcf8172
|