Skip to main content

Load metadata and asset check spesifications from data contracts.

Project description

dagster-datacontract

Dagster Datacontract implementation for loading asset metadata and context information from data contracts to Dagster assets.

Installation

Install dagster-datacontract using uv:

uv add dagster-datacontract

Simple Example

The following example can be found inside the example directory:

from datetime import timedelta
from pathlib import Path

import dagster as dg
import polars as pl
import requests
from datacontract.data_contract import DataContract

from dagster_datacontract import DataContractLoader

asset_name = "yellow_taxi_trip_records"
data_contract = DataContractLoader(
    asset_name=asset_name,
    data_contract=DataContract(
        data_contract_file="./example/datacontract.yml",
        server="production",
    ),
)


@dg.asset(
    name=asset_name,
    metadata=data_contract.metadata,
    tags=data_contract.tags,
    description=data_contract.description,
    owners=data_contract.owner,
    code_version=data_contract.version,
)
def yellow_taxi_trip_records(
    context: dg.AssetExecutionContext,
) -> None:
    download_path = "./example/data"
    Path(download_path).mkdir(parents=True, exist_ok=True)

    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
    response = requests.get(url=url)

    file_path = f"{download_path}/yellow_tripdata_2025-01.parquet"
    context.log.info(f"Reading data from '{url}' and writing to '{file_path}'.")
    with open(file_path, "wb") as f:
        f.write(response.content)

    df = pl.read_parquet(file_path)
    context.log.info(f"File contents downloaded:\n{df}")


asset_check_yellow_taxi_trip_records = data_contract.load_data_quality_checks()

freshness_checks = data_contract.load_freshness_checks(
    lower_bound_delta=timedelta(minutes=5)
)
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
    freshness_checks=freshness_checks,
    default_status=dg.DefaultSensorStatus.RUNNING,
)

job = dg.define_asset_job(
    name="monthly_taxi_trips",
    selection=[asset_name],
)
schedule = dg.ScheduleDefinition(
    job=job,
    cron_schedule=data_contract.cron_schedule,
    default_status=dg.DefaultScheduleStatus.RUNNING,
)


defs = dg.Definitions(
    assets=[yellow_taxi_trip_records],
    asset_checks=[asset_check_yellow_taxi_trip_records, *freshness_checks],
    schedules=[schedule],
    sensors=[freshness_checks_sensor],
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_datacontract-0.3.0.tar.gz (11.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_datacontract-0.3.0-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file dagster_datacontract-0.3.0.tar.gz.

File metadata

File hashes

Hashes for dagster_datacontract-0.3.0.tar.gz
Algorithm Hash digest
SHA256 cbdf26a5d3a7d791e9ca63471f675d1172c3f55d2e8e75cbc0251419a1be7e14
MD5 1fe99618418737d48ede657275b4df73
BLAKE2b-256 b6265aeb73a74db6276f3360fa59560cbdce2fa0b0d32d030b33a711df50f787

See more details on using hashes here.

File details

Details for the file dagster_datacontract-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_datacontract-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7ee1834d4b4c0543564e6e164bc3c41844da742dd18dc4abbb6b368b90f7675c
MD5 305e3c68db81a8d00c99b8937bea4e31
BLAKE2b-256 1c95ecadafb1ada10ed134137ffd85d60a25e3361dbed4993ca7c70c856062bb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page