Skip to main content

Load metadata and asset check spesifications from data contracts.

Project description

dagster-datacontract

Dagster Datacontract implementation for loading asset metadata and context information from data contracts to Dagster assets.

Installation

Install dagster-datacontract using uv:

uv add dagster-datacontract

Simple Example

The following example can be found inside the example directory:

from datetime import timedelta
from pathlib import Path

import dagster as dg
import polars as pl
import requests
from datacontract.data_contract import DataContract

from dagster_datacontract import DataContractLoader

asset_name = "yellow_taxi_trip_records"
data_contract = DataContractLoader(
    asset_name=asset_name,
    data_contract=DataContract(
        data_contract_file="./example/datacontract.yml",
        server="production",
    ),
)


@dg.asset(
    name=asset_name,
    metadata=data_contract.metadata,
    tags=data_contract.tags,
    description=data_contract.description,
    owners=data_contract.owner,
    code_version=data_contract.version,
)
def yellow_taxi_trip_records(
    context: dg.AssetExecutionContext,
) -> None:
    download_path = "./example/data"
    Path(download_path).mkdir(parents=True, exist_ok=True)

    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
    response = requests.get(url=url)

    file_path = f"{download_path}/yellow_tripdata_2025-01.parquet"
    context.log.info(f"Reading data from '{url}' and writing to '{file_path}'.")
    with open(file_path, "wb") as f:
        f.write(response.content)

    df = pl.read_parquet(file_path)
    context.log.info(f"File contents downloaded:\n{df}")


asset_check_yellow_taxi_trip_records = data_contract.load_data_quality_checks()

freshness_checks = data_contract.load_freshness_checks(
    lower_bound_delta=timedelta(minutes=5)
)
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
    freshness_checks=freshness_checks,
    default_status=dg.DefaultSensorStatus.RUNNING,
)

job = dg.define_asset_job(
    name="monthly_taxi_trips",
    selection=[asset_name],
)
schedule = dg.ScheduleDefinition(
    job=job,
    cron_schedule=data_contract.cron_schedule,
    default_status=dg.DefaultScheduleStatus.RUNNING,
)


defs = dg.Definitions(
    assets=[yellow_taxi_trip_records],
    asset_checks=[asset_check_yellow_taxi_trip_records, *freshness_checks],
    schedules=[schedule],
    sensors=[freshness_checks_sensor],
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_datacontract-0.4.0.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_datacontract-0.4.0-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file dagster_datacontract-0.4.0.tar.gz.

File metadata

File hashes

Hashes for dagster_datacontract-0.4.0.tar.gz
Algorithm Hash digest
SHA256 690952830dc7449b3c7f5e126eadc29ff01070301c99cce2f0d3567a7f7da99a
MD5 7a02752a7262eb6327249648b68d81cc
BLAKE2b-256 fdba5c0015c9801994eea3ad9e9f15982b3cdaa201b7286ac613edcca315635b

See more details on using hashes here.

File details

Details for the file dagster_datacontract-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_datacontract-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8c7948ac8cfd25610a1ad446ae6b40bd6f34cb2f3b029448cdf0620ca73c37c1
MD5 35394687af521f97734e26a23cad348e
BLAKE2b-256 e2ac94428cbc24c5d782efefc1e41efc3cdba4028ec8f395d6fb81a2d0567f77

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page