Skip to main content

Data contracts For Free using ODCS (Bitol)

Project description

dc43 — Data Contracts For Free!

Overview

Conceptual platform

dc43 is a governance-first toolkit that separates the concepts of data contract management from their implementations. At its core it provides:

  • Contract lifecycle management primitives to draft, review, approve, and retire ODCS contracts. Note: dc43 currently targets ODCS 3.0.2; a compatibility table will follow as the spec evolves.
  • Extensible interfaces for contract storage, drafting, and data quality orchestration that keep governance decisions close to the data contract owner.
  • Runtime helpers to apply approved specifications in compute platforms while feeding observations back to governance workflows.

Provided integrations

On top of the conceptual platform, dc43 ships opinionated integrations that you can adopt or replace:

  • Spark & DLT pipelines via dc43_integrations.spark.io with schema/metric helpers from dc43_service_backends.data_quality.backend for auto-casting and contract-aware IO.
  • Storage backends such as filesystem (DBFS/UC volumes), Delta tables, and Collibra through CollibraContractStore.
  • A pluggable data-quality client with a stub implementation that can be replaced by catalog-native tools.
  • Scenario-first getting started guides (operations setup, local Spark flows, remote integrations, and the contracts app helper) live in docs/getting-started/.

See docs/implementations/data-quality-governance/collibra.md for end-to-end orchestration guidance when Collibra owns stewardship workflows. Component deep dives cover the contract store, contract drafter, data-quality governance interface, data-quality engine, and integration layer. Each component links to implementation catalogs under docs/implementations/ so you can pick technology-specific guides (Spark, Delta, Collibra, ...).

Component model

dc43 exposes a small set of well-defined components. Swap any of them without rewriting the rest of the stack:

Layer Component Responsibility
Governance Contract manager/store interface Retrieve, version, and persist contracts from catalog-backed or file-based sources.
Governance Data quality manager interface Coordinate with an external DQ governance tool (e.g., Collibra, Unity Catalog) that records dataset↔contract alignment and approval state.
Authoring support Contract drafter module Generate ODCS drafts from observed data or schema drift events before handing them back to governance.
Runtime services Data-quality metrics engine Collect contract-driven metrics in execution engines and forward them to the governance tool for status evaluation.
Integration Integration adapters Bridge the contract, drafter, and DQ components into execution engines such as Spark or Delta Live Tables (current adapters live under dc43_integrations.spark).

Guides for each component live under docs/:

Architecture

flowchart TD
    subgraph Governance & Stewardship
        Authoring["Authoring Tools\nJSON · Git · Notebooks"]
        ContractStore["Contract Store Interface\nGit · Filesystem · APIs"]
        DQTool["Data Quality Governance Tool\nCatalog · Collibra"]
    end

    subgraph Lifecycle Services
        DraftModule["Contract Drafter Module"]
        DQManager["Data Quality Manager Interface"]
    end

    subgraph Runtime Execution
        DQEngine["Data Quality Engine\nmetrics generation"]
        IOHelpers["Integration Adapters\nSpark · DLT"]
        Pipelines["Business Pipelines"]
    end

    Authoring -->|publish / review| ContractStore
    ContractStore -->|serve versions| IOHelpers
    ContractStore -->|seed drafts| DraftModule
    DraftModule -->|return proposals| Authoring
    IOHelpers -->|apply contracts| Pipelines
    Pipelines -->|observations| IOHelpers
    IOHelpers -->|metrics & schema drift| DQEngine
    DQEngine -->|metrics package| DQManager
    DQManager -->|submit metrics| DQTool
    DQTool -->|compatibility verdict| DQManager
    DQTool -->|validated versions| ContractStore
    DQManager -->|notify runtime| IOHelpers

This architecture clarifies how governance assets, lifecycle services, and runtime execution collaborate:

  • Governance systems own the authoritative contract store and the data-quality tool. Labeled edges (publish / review, validated versions) highlight how those systems steer approvals.
  • Lifecycle services—drafting and data-quality management—mediate between governance and runtime. The drafter turns runtime feedback into proposals while the DQ manager relays metrics, waits for a verdict, and shares compatibility context with both stewards and pipelines.
  • Integration adapters inside runtime engines (Spark, DLT, …) apply contracts, emit observations, and react when governance signals change.

Node & edge glossary

  • Contract store interface – pluggable storage adapters (filesystem, Delta, Collibra) that resolve authoritative contract versions.
  • Data quality manager interface – the dc43 protocol that hands metrics to the governance platform and retrieves compatibility verdicts.
  • Data quality governance tool – catalog or observability system (Collibra, Unity Catalog, bespoke services) that persists the compatibility matrix and performs the actual check evaluation once it receives metrics.
  • Metrics package – the bundle of row counts, expectation results, and schema drift context emitted by the runtime so the governance tool can recompute the dataset↔contract status.

Variations—such as Collibra-governed contracts or bespoke storage backends—slot into the same model by substituting implementations of the interfaces described above.

Install

dc43 now ships as a family of distributions so you can install only the layers you need:

Distribution Imports Responsibility Depends on
dc43-service-clients dc43_service_clients.* Typed service clients, request/response models, and governance helpers that front-end applications can embed. open-data-contract-standard
dc43-service-backends dc43_service_backends.* Reference backend implementations (filesystem store, local drafting, in-memory governance service) that orchestrate the client layer. dc43-service-clients
dc43-integrations dc43_integrations.* Runtime adapters such as the Spark helpers that call into client APIs without requiring backend dependencies. dc43-service-clients
dc43 dc43.* Aggregating package that wires the CLI/demo and depends on the three modules above. all of the above

Pip installs

  • Service contracts only: pip install dc43-service-clients
  • Backend reference services: pip install dc43-service-backends
  • HTTP service backends: pip install "dc43-service-backends[http]" (see deploy/http-backend/README.md for a containerised deployment recipe)
  • Spark integrations: pip install "dc43-integrations[spark]"
  • Full stack: pip install dc43
  • Spark extras for the meta package: pip install "dc43[spark]"
  • Demo app: pip install "dc43[demo]"

When developing locally (Databricks Repos, workspace files, or any source checkout) the editable install automatically pulls in the sibling packages:

pip install -e .

Chain extras as needed—for example to prepare the test environment run:

pip install -e ".[test]"

Each distribution can now be installed independently. For example, lightweight clients can use pip install dc43-service-clients and then from dc43_service_clients.data_quality import ValidationResult without pulling in the backend or Spark helpers at runtime.

Quickstart

  1. Define a contract (ODCS typed model)
from open_data_contract_standard.model import (
    OpenDataContractStandard, SchemaObject, SchemaProperty, Description
)

contract = OpenDataContractStandard(
    version="0.1.0",
    kind="DataContract",
    apiVersion="3.0.2",
    id="sales.orders",
    name="Orders",
    description=Description(usage="Orders facts"),
    schema_=[
        SchemaObject(
            name="orders",
            properties=[
                SchemaProperty(name="order_id", physicalType="bigint", required=True, unique=True),
                SchemaProperty(name="customer_id", physicalType="bigint", required=True),
                SchemaProperty(name="order_ts", physicalType="timestamp", required=True),
                SchemaProperty(name="amount", physicalType="double", required=True),
                SchemaProperty(
                    name="currency", physicalType="string", required=True,
                    logicalTypeOptions={"enum": ["EUR", "USD"]}
                ),
            ],
        )
    ],
)
  1. Validate and write with Spark
from dc43_service_backends.contracts.backend.stores import FSContractStore
from dc43_service_clients.contracts import LocalContractServiceClient
from dc43_integrations.spark.io import (
    write_with_contract,
    ContractVersionLocator,
)

store = FSContractStore(base_path="/mnt/contracts")
contract_service = LocalContractServiceClient(store)

write_with_contract(
    df=orders_df,
    contract_id="sales.orders",
    contract_service=contract_service,
    expected_contract_version=">=0.1.0",
    dataset_locator=ContractVersionLocator(dataset_version="latest"),
    mode="append",
    enforce=True,
    auto_cast=True,
)
  1. DLT usage (inside a pipeline notebook)
import dlt
from collections.abc import Mapping
from dc43_integrations.spark.dlt import apply_dlt_expectations

@dlt.table(name="orders")
def orders():
    df = spark.read.stream.table("bronze.sales_orders_raw")
    # Retrieve predicates from your configured data-quality service.
    predicates = dq_status.details.get("expectation_predicates")
    if isinstance(predicates, Mapping):
        apply_dlt_expectations(dlt, predicates)
    return df.select("order_id", "customer_id", "order_ts", "amount", "currency")
  1. Store and resolve contracts
from dc43_service_backends.contracts.backend.stores import FSContractStore

store = FSContractStore(base_path="/mnt/contracts")
store.put(contract)
latest = store.latest("sales.orders")
  1. DQ/DO orchestration on read
from dc43_integrations.spark.io import read_with_contract, ContractVersionLocator
from dc43_service_clients.contracts import LocalContractServiceClient
from dc43_service_clients.governance import build_local_governance_service

governance = build_local_governance_service(store)
contract_service = LocalContractServiceClient(store)
df, status = read_with_contract(
    spark,
    contract_id="sales.orders",
    contract_service=contract_service,
    expected_contract_version="==0.1.0",
    governance_service=governance,
    dataset_locator=ContractVersionLocator(dataset_version="latest"),
    return_status=True,
)
print(status.status, status.reason)
  1. Quality status check on write
from dc43_integrations.spark.io import write_with_contract, ContractVersionLocator
from dc43_service_clients.contracts import LocalContractServiceClient

contract_service = LocalContractServiceClient(store)
vr, status = write_with_contract(
    df=orders_df,
    contract_id="sales.orders",
    contract_service=contract_service,
    expected_contract_version=">=0.1.0",
    dataset_locator=ContractVersionLocator(dataset_version="latest"),
    mode="append",
    enforce=False,                 # continue writing
    governance_service=governance,
    return_status=True,
)
if status and status.status == "block":
    raise ValueError(f"DQ blocked write: {status.details}")

Demo application

A Vue-powered FastAPI application packaged as dc43-demo-app (module dc43_demo_app) offers a visual way to explore contracts, datasets and data quality results. Install the optional dependencies and launch the app with:

pip install ".[demo]"
dc43-demo

Visit http://localhost:8000 to:

  • Browse contracts and their versions with draft/active status.
  • Inspect dataset versions, their linked contract, validation status and detailed DQ metrics derived from contract rules.
  • Highlight datasets using draft contracts and trigger validation to promote them.

An additional Reveal.js presentation is available at http://localhost:8000/static/presentation.html to walk through the contract lifecycle and automation steps.

The application also exposes an example Spark pipeline in dc43_demo_app.pipeline used when registering new dataset versions. The preconfigured scenarios are documented in docs/demo-pipeline-scenarios.md including the new split strategy example that writes orders_enriched::valid and orders_enriched::reject alongside the main dataset.

The demo now drives its contract, governance, and data-quality operations through the same HTTP clients that production pipelines use. When the UI starts it spins up an in-process instance of the service backend HTTP application, so Spark runs exercise the remote code paths without requiring an external deployment.

Running the service backend over HTTP

Pipelines that rely on the remote clients can reach a standalone backend via the dc43_service_backends.webapp module. Set the contract directory and (if needed) a bearer token before launching uvicorn:

export DC43_CONTRACT_STORE=/path/to/contracts
export DC43_BACKEND_TOKEN="super-secret"  # optional
uvicorn dc43_service_backends.webapp:app --host 0.0.0.0 --port 8001

Contracts stored under $DC43_CONTRACT_STORE are served over the API while the stub data-quality and governance backends keep draft information on disk. See docs/implementations/service-backends/http-server.md for Docker packaging notes and deployment options.

Spark Flow (Mermaid)

flowchart LR
    subgraph Read
      U[User code / Notebook] --> RWC{read_with_contract}
      RWC --> SR["spark.read.format(...).load"]
      RWC --> EV["ensure_version(contract)"]
      EV --> OBS[collect_observations]
      OBS --> DQS["dq_service.evaluate"]
      DQS -->|ok| AC["apply_contract (cast/order)"]
      DQS -->|errors & enforce| E1[Raise]
      AC --> DF[DataFrame ready]
      RWC --> DQ{dq_client?}
      DQ -->|yes| GS["get_status(dataset@version, contract@version)"]
      GS -->|unknown/stale| CM[compute_metrics]
      CM --> SM[submit_metrics -> status]
      GS -->|up-to-date| SM
      SM -->|block & enforce| E2[Raise]
      SM -->|ok/warn| DF
    end

    DF --> T[Transformations / joins / aggregations]

    subgraph Write
      T --> WWC{write_with_contract}
      WWC --> OBS2[collect_observations]
      OBS2 --> DQS2["dq_service.evaluate"]
      DQS2 -->|ok| AC2[apply_contract]
      DQS2 -->|errors & enforce| E3[Raise]
      DQS2 -->|errors & !enforce| AC2
      AC2 --> SW["spark.write.(format, options).mode.save"]
      SW --> DELTA[Delta table / UC]
      DELTA --> DQ2{dq_client?}
      DQ2 -->|yes| EVAL["evaluate_dataset"]
      EVAL -->|status block & enforce| E4[Raise]
      EVAL -->|status ok/warn| DELTA
    end

    classDef err fill:#ffe5e5,stroke:#ff4d4f,color:#000
    class E1,E2,E3,E4 err

Notes

  • The library uses the official ODCS package and enforces $schema version 3.0.2 by default (configurable via DC43_ODCS_REQUIRED).
  • Validation runs inside the Spark DQ engine: presence, types, nullability and common constraints (enum, regex, min/max, unique best-effort). The resulting ValidationResult bundles metrics and a schema snapshot so governance adapters can update compatibility matrices.
  • DLT helpers translate constraints into expect expressions when feasible.
  • DQ orchestration: the IO wrapper checks schema vs contract and consults the DQ client. If dataset version is newer than DQ’s known version, it computes the required metrics and submits them, then enforces the resulting status if requested.

Local Dev

  • Install dependencies locally (match your Databricks runtime where possible):
pip install open-data-contract-standard==3.0.2 pyspark
  • dc43 enforces apiVersion via DC43_ODCS_REQUIRED (default 3.0.2).

Tests

  • Install the shared test extras and run the suites that matter for the area you are touching. The top-level pytest configuration automatically adds each package's src directory to sys.path, so cloning the repo and running pytest works even before the editable installs finish building.
# Option A: run everything with one helper (installs extras unless --skip-install is passed)
./scripts/test_all.sh

# Option B: manage installs yourself and invoke pytest directly
pip install -e ".[test]"
pytest -q tests packages/dc43-service-clients/tests \
  packages/dc43-service-backends/tests packages/dc43-integrations/tests

Publishing

  • Push a tag v* on a commit in main.
  • The workflow .github/workflows/release.yml runs tests, builds the package, waits for manual approval via environment release, publishes to PyPI, and creates a GitHub release with autogenerated notes.
  • Set PYPI_TOKEN secret and configure the release environment with required reviewers.

License

Apache 2

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dc43-0.16.0.tar.gz (41.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dc43-0.16.0-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file dc43-0.16.0.tar.gz.

File metadata

  • Download URL: dc43-0.16.0.tar.gz
  • Upload date:
  • Size: 41.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dc43-0.16.0.tar.gz
Algorithm Hash digest
SHA256 e2530e65234da623a00e4d842301547027094440e89b30f07b1f68c4dd0713fa
MD5 d0d61521079042a3f6e9d9b696c97041
BLAKE2b-256 5268e5bea7edd26c475b85b04e37e19eeeaf2ff04a57931816d6dc5d94b6ba00

See more details on using hashes here.

File details

Details for the file dc43-0.16.0-py3-none-any.whl.

File metadata

  • Download URL: dc43-0.16.0-py3-none-any.whl
  • Upload date:
  • Size: 20.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dc43-0.16.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7a9ff9e899549edee13fab87bfdff8c96ac11b4f52d4b70f81725f98d6e59903
MD5 20e503cffb553681a6f9de49a2838935
BLAKE2b-256 98e4ba1d4a96667a63a0a0ccc85f2dd0407250e8c13369c7f6ee946b5a950c7c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page