Skip to main content

Data contracts For Free using ODCS (Bitol)

Project description

dc43 — Data Contracts For Free!

Overview

Conceptual platform

dc43 is a governance-first toolkit that separates the concepts of data contract management from their implementations. At its core it provides:

  • Contract lifecycle management primitives to draft, review, approve, and retire ODCS contracts. Note: dc43 currently targets ODCS 3.0.2; a compatibility table will follow as the spec evolves.
  • Extensible interfaces for contract storage, drafting, and data quality orchestration that keep governance decisions close to the data contract owner.
  • Runtime helpers to apply approved specifications in compute platforms while feeding observations back to governance workflows.

Provided integrations

On top of the conceptual platform, dc43 ships opinionated integrations that you can adopt or replace:

  • Spark & DLT pipelines via dc43_integrations.spark.io with schema/metric helpers from dc43_service_backends.data_quality.backend for auto-casting and contract-aware IO.
  • Storage backends such as filesystem (DBFS/UC volumes), Delta tables, and Collibra through CollibraContractStore.
  • A pluggable data-quality client with a stub implementation that can be replaced by catalog-native tools.

See docs/implementations/data-quality-governance/collibra.md for end-to-end orchestration guidance when Collibra owns stewardship workflows. Component deep dives cover the contract store, contract drafter, data-quality governance interface, data-quality engine, and integration layer. Each component links to implementation catalogs under docs/implementations/ so you can pick technology-specific guides (Spark, Delta, Collibra, ...).

Component model

dc43 exposes a small set of well-defined components. Swap any of them without rewriting the rest of the stack:

Layer Component Responsibility
Governance Contract manager/store interface Retrieve, version, and persist contracts from catalog-backed or file-based sources.
Governance Data quality manager interface Coordinate with an external DQ governance tool (e.g., Collibra, Unity Catalog) that records dataset↔contract alignment and approval state.
Authoring support Contract drafter module Generate ODCS drafts from observed data or schema drift events before handing them back to governance.
Runtime services Data-quality metrics engine Collect contract-driven metrics in execution engines and forward them to the governance tool for status evaluation.
Integration Integration adapters Bridge the contract, drafter, and DQ components into execution engines such as Spark or Delta Live Tables (current adapters live under dc43_integrations.spark).

Guides for each component live under docs/:

Architecture

flowchart TD
    subgraph Governance & Stewardship
        Authoring["Authoring Tools\nJSON · Git · Notebooks"]
        ContractStore["Contract Store Interface\nGit · Filesystem · APIs"]
        DQTool["Data Quality Governance Tool\nCatalog · Collibra"]
    end

    subgraph Lifecycle Services
        DraftModule["Contract Drafter Module"]
        DQManager["Data Quality Manager Interface"]
    end

    subgraph Runtime Execution
        DQEngine["Data Quality Engine\nmetrics generation"]
        IOHelpers["Integration Adapters\nSpark · DLT"]
        Pipelines["Business Pipelines"]
    end

    Authoring -->|publish / review| ContractStore
    ContractStore -->|serve versions| IOHelpers
    ContractStore -->|seed drafts| DraftModule
    DraftModule -->|return proposals| Authoring
    IOHelpers -->|apply contracts| Pipelines
    Pipelines -->|observations| IOHelpers
    IOHelpers -->|metrics & schema drift| DQEngine
    DQEngine -->|metrics package| DQManager
    DQManager -->|submit metrics| DQTool
    DQTool -->|compatibility verdict| DQManager
    DQTool -->|validated versions| ContractStore
    DQManager -->|notify runtime| IOHelpers

This architecture clarifies how governance assets, lifecycle services, and runtime execution collaborate:

  • Governance systems own the authoritative contract store and the data-quality tool. Labeled edges (publish / review, validated versions) highlight how those systems steer approvals.
  • Lifecycle services—drafting and data-quality management—mediate between governance and runtime. The drafter turns runtime feedback into proposals while the DQ manager relays metrics, waits for a verdict, and shares compatibility context with both stewards and pipelines.
  • Integration adapters inside runtime engines (Spark, DLT, …) apply contracts, emit observations, and react when governance signals change.

Node & edge glossary

  • Contract store interface – pluggable storage adapters (filesystem, Delta, Collibra) that resolve authoritative contract versions.
  • Data quality manager interface – the dc43 protocol that hands metrics to the governance platform and retrieves compatibility verdicts.
  • Data quality governance tool – catalog or observability system (Collibra, Unity Catalog, bespoke services) that persists the compatibility matrix and performs the actual check evaluation once it receives metrics.
  • Metrics package – the bundle of row counts, expectation results, and schema drift context emitted by the runtime so the governance tool can recompute the dataset↔contract status.

Variations—such as Collibra-governed contracts or bespoke storage backends—slot into the same model by substituting implementations of the interfaces described above.

Install

dc43 now ships as a family of distributions so you can install only the layers you need:

Distribution Imports Responsibility Depends on
dc43-service-clients dc43_service_clients.* Typed service clients, request/response models, and governance helpers that front-end applications can embed. open-data-contract-standard
dc43-service-backends dc43_service_backends.* Reference backend implementations (filesystem store, local drafting, in-memory governance service) that orchestrate the client layer. dc43-service-clients
dc43-integrations dc43_integrations.* Runtime adapters such as the Spark helpers that call into client APIs without requiring backend dependencies. dc43-service-clients
dc43 dc43.* Aggregating package that wires the CLI/demo and depends on the three modules above. all of the above

Pip installs

  • Service contracts only: pip install dc43-service-clients
  • Backend reference services: pip install dc43-service-backends
  • Spark integrations: pip install "dc43-integrations[spark]"
  • Full stack: pip install dc43
  • Spark extras for the meta package: pip install "dc43[spark]"
  • Demo app: pip install "dc43[demo]"

When developing locally (Databricks Repos, workspace files, or any source checkout) the editable install automatically pulls in the sibling packages:

pip install -e .

Chain extras as needed—for example to prepare the test environment run:

pip install -e ".[test]"

Each distribution can now be installed independently. For example, lightweight clients can use pip install dc43-service-clients and then from dc43_service_clients.data_quality import ValidationResult without pulling in the backend or Spark helpers at runtime.

Quickstart

  1. Define a contract (ODCS typed model)
from open_data_contract_standard.model import (
    OpenDataContractStandard, SchemaObject, SchemaProperty, Description
)

contract = OpenDataContractStandard(
    version="0.1.0",
    kind="DataContract",
    apiVersion="3.0.2",
    id="sales.orders",
    name="Orders",
    description=Description(usage="Orders facts"),
    schema_=[
        SchemaObject(
            name="orders",
            properties=[
                SchemaProperty(name="order_id", physicalType="bigint", required=True, unique=True),
                SchemaProperty(name="customer_id", physicalType="bigint", required=True),
                SchemaProperty(name="order_ts", physicalType="timestamp", required=True),
                SchemaProperty(name="amount", physicalType="double", required=True),
                SchemaProperty(
                    name="currency", physicalType="string", required=True,
                    logicalTypeOptions={"enum": ["EUR", "USD"]}
                ),
            ],
        )
    ],
)
  1. Validate and write with Spark
from dc43_service_backends.contracts.backend.stores import FSContractStore
from dc43_service_clients.contracts import LocalContractServiceClient
from dc43_integrations.spark.io import (
    write_with_contract,
    ContractVersionLocator,
)

store = FSContractStore(base_path="/mnt/contracts")
contract_service = LocalContractServiceClient(store)

write_with_contract(
    df=orders_df,
    contract_id="sales.orders",
    contract_service=contract_service,
    expected_contract_version=">=0.1.0",
    dataset_locator=ContractVersionLocator(dataset_version="latest"),
    mode="append",
    enforce=True,
    auto_cast=True,
)
  1. DLT usage (inside a pipeline notebook)
import dlt
from collections.abc import Mapping
from dc43_integrations.spark.dlt import apply_dlt_expectations

@dlt.table(name="orders")
def orders():
    df = spark.read.stream.table("bronze.sales_orders_raw")
    # Retrieve predicates from your configured data-quality service.
    predicates = dq_status.details.get("expectation_predicates")
    if isinstance(predicates, Mapping):
        apply_dlt_expectations(dlt, predicates)
    return df.select("order_id", "customer_id", "order_ts", "amount", "currency")
  1. Store and resolve contracts
from dc43_service_backends.contracts.backend.stores import FSContractStore

store = FSContractStore(base_path="/mnt/contracts")
store.put(contract)
latest = store.latest("sales.orders")
  1. DQ/DO orchestration on read
from dc43_integrations.spark.io import read_with_contract, ContractVersionLocator
from dc43_service_clients.contracts import LocalContractServiceClient
from dc43_service_clients.governance import build_local_governance_service

governance = build_local_governance_service(store)
contract_service = LocalContractServiceClient(store)
df, status = read_with_contract(
    spark,
    contract_id="sales.orders",
    contract_service=contract_service,
    expected_contract_version="==0.1.0",
    governance_service=governance,
    dataset_locator=ContractVersionLocator(dataset_version="latest"),
    return_status=True,
)
print(status.status, status.reason)
  1. Quality status check on write
from dc43_integrations.spark.io import write_with_contract, ContractVersionLocator
from dc43_service_clients.contracts import LocalContractServiceClient

contract_service = LocalContractServiceClient(store)
vr, status = write_with_contract(
    df=orders_df,
    contract_id="sales.orders",
    contract_service=contract_service,
    expected_contract_version=">=0.1.0",
    dataset_locator=ContractVersionLocator(dataset_version="latest"),
    mode="append",
    enforce=False,                 # continue writing
    governance_service=governance,
    return_status=True,
)
if status and status.status == "block":
    raise ValueError(f"DQ blocked write: {status.details}")

Demo application

A Vue-powered FastAPI application in dc43.demo_app offers a visual way to explore contracts, datasets and data quality results. Install the optional dependencies and launch the app with:

pip install ".[demo]"
dc43-demo

Visit http://localhost:8000 to:

  • Browse contracts and their versions with draft/active status.
  • Inspect dataset versions, their linked contract, validation status and detailed DQ metrics derived from contract rules.
  • Highlight datasets using draft contracts and trigger validation to promote them.

An additional Reveal.js presentation is available at http://localhost:8000/static/presentation.html to walk through the contract lifecycle and automation steps.

The application also exposes an example Spark pipeline in dc43.demo_app.pipeline used when registering new dataset versions. The preconfigured scenarios are documented in docs/demo-pipeline-scenarios.md including the new split strategy example that writes orders_enriched::valid and orders_enriched::reject alongside the main dataset.

Spark Flow (Mermaid)

flowchart LR
    subgraph Read
      U[User code / Notebook] --> RWC{read_with_contract}
      RWC --> SR["spark.read.format(...).load"]
      RWC --> EV["ensure_version(contract)"]
      EV --> OBS[collect_observations]
      OBS --> DQS["dq_service.evaluate"]
      DQS -->|ok| AC["apply_contract (cast/order)"]
      DQS -->|errors & enforce| E1[Raise]
      AC --> DF[DataFrame ready]
      RWC --> DQ{dq_client?}
      DQ -->|yes| GS["get_status(dataset@version, contract@version)"]
      GS -->|unknown/stale| CM[compute_metrics]
      CM --> SM[submit_metrics -> status]
      GS -->|up-to-date| SM
      SM -->|block & enforce| E2[Raise]
      SM -->|ok/warn| DF
    end

    DF --> T[Transformations / joins / aggregations]

    subgraph Write
      T --> WWC{write_with_contract}
      WWC --> OBS2[collect_observations]
      OBS2 --> DQS2["dq_service.evaluate"]
      DQS2 -->|ok| AC2[apply_contract]
      DQS2 -->|errors & enforce| E3[Raise]
      DQS2 -->|errors & !enforce| AC2
      AC2 --> SW["spark.write.(format, options).mode.save"]
      SW --> DELTA[Delta table / UC]
      DELTA --> DQ2{dq_client?}
      DQ2 -->|yes| EVAL["evaluate_dataset"]
      EVAL -->|status block & enforce| E4[Raise]
      EVAL -->|status ok/warn| DELTA
    end

    classDef err fill:#ffe5e5,stroke:#ff4d4f,color:#000
    class E1,E2,E3,E4 err

Notes

  • The library uses the official ODCS package and enforces $schema version 3.0.2 by default (configurable via DC43_ODCS_REQUIRED).
  • Validation runs inside the Spark DQ engine: presence, types, nullability and common constraints (enum, regex, min/max, unique best-effort). The resulting ValidationResult bundles metrics and a schema snapshot so governance adapters can update compatibility matrices.
  • DLT helpers translate constraints into expect expressions when feasible.
  • DQ orchestration: the IO wrapper checks schema vs contract and consults the DQ client. If dataset version is newer than DQ’s known version, it computes the required metrics and submits them, then enforces the resulting status if requested.

Local Dev

  • Install dependencies locally (match your Databricks runtime where possible):
pip install open-data-contract-standard==3.0.2 pyspark
  • dc43 enforces apiVersion via DC43_ODCS_REQUIRED (default 3.0.2).

Tests

  • Install the shared test extras and run the suites that matter for the area you are touching. The top-level pytest configuration automatically adds each package's src directory to sys.path, so cloning the repo and running pytest works even before the editable installs finish building.
# Option A: run everything with one helper (installs extras unless --skip-install is passed)
./scripts/test_all.sh

# Option B: manage installs yourself and invoke pytest directly
pip install -e ".[test]"
pytest -q tests packages/dc43-service-clients/tests \
  packages/dc43-service-backends/tests packages/dc43-integrations/tests

Publishing

  • Push a tag v* on a commit in main.
  • The workflow .github/workflows/release.yml runs tests, builds the package, waits for manual approval via environment release, publishes to PyPI, and creates a GitHub release with autogenerated notes.
  • Set PYPI_TOKEN secret and configure the release environment with required reviewers.

License

Apache 2

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dc43-0.7.0.0.tar.gz (220.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dc43-0.7.0.0-py3-none-any.whl (212.0 kB view details)

Uploaded Python 3

File details

Details for the file dc43-0.7.0.0.tar.gz.

File metadata

  • Download URL: dc43-0.7.0.0.tar.gz
  • Upload date:
  • Size: 220.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dc43-0.7.0.0.tar.gz
Algorithm Hash digest
SHA256 a45034f66bceed3d1b05531de9dcb4130b07a26f9d3186321840562dab2678d5
MD5 6d9fe9dd859dd9d5173b792afd5bb2be
BLAKE2b-256 a8e887060f6b34b3a07ecb590498776c4d6ded04ef0e345ba03c77d4dae224ed

See more details on using hashes here.

File details

Details for the file dc43-0.7.0.0-py3-none-any.whl.

File metadata

  • Download URL: dc43-0.7.0.0-py3-none-any.whl
  • Upload date:
  • Size: 212.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dc43-0.7.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5705e21873bda89f0f43a8f4b998a9e6ae2a03388724a833509157b75ff2d4ec
MD5 9e5da57cae3cb15667f1654708266fe7
BLAKE2b-256 1b8d0dbc0309b3c313b24097438c18b4f13ab512b5a4266d34e63698c9401992

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page