Skip to main content

Load metadata and asset check spesifications from data contracts.

Project description

dagster-datacontract

Dagster Datacontract implementation for loading asset metadata and context information from data contracts to Dagster assets.

Installation

Install dagster-datacontract using uv:

uv add dagster-datacontract

Simple Example

The following example can be found inside the example directory:

from datetime import timedelta
from pathlib import Path

import dagster as dg
import requests
import polars as pl
from datacontract.data_contract import DataContract

from dagster_datacontract import DataContractLoader


asset_name = "yellow_taxi_trip_records"
data_contract = DataContractLoader(
    asset_name=asset_name,
    data_contract=DataContract(
        data_contract_file="./example/datacontract.yml",
    ),
)


@dg.asset(
    name=asset_name,
    metadata=data_contract.metadata,
    tags=data_contract.tags,
    description=data_contract.description,
    owners=data_contract.owner,
    code_version=data_contract.version,
)
def yellow_taxi_trip_records(
    context: dg.AssetExecutionContext,
) -> None:
    download_path = "./example/data"
    Path(download_path).mkdir(parents=True, exist_ok=True)

    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
    response = requests.get(url=url)

    file_path = f"{download_path}/yellow_tripdata_2025-01.parquet"
    context.log.info(f"Reading data from '{url}' and writing to '{file_path}'.")
    with open(file_path, "wb") as f:
        f.write(response.content)

    df = pl.read_parquet(file_path)
    context.log.info(f"File contents downloaded:\n{df}")


asset_check_yellow_taxi_trip_records = data_contract.load_data_quality_checks()

freshness_checks = data_contract.load_freshness_checks(
    lower_bound_delta=timedelta(minutes=5)
)
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
    freshness_checks=freshness_checks,
    default_status=dg.DefaultSensorStatus.RUNNING,
)

job = dg.define_asset_job(
    name="monthly_taxi_trips",
    selection=[asset_name],
)
schedule = dg.ScheduleDefinition(
    job=job,
    cron_schedule=data_contract.cron_schedule,
    default_status=dg.DefaultScheduleStatus.RUNNING,
)


defs = dg.Definitions(
    assets=[yellow_taxi_trip_records],
    asset_checks=[asset_check_yellow_taxi_trip_records, *freshness_checks],
    schedules=[schedule],
    sensors=[freshness_checks_sensor],
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_datacontract-0.1.0.tar.gz (3.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_datacontract-0.1.0-py3-none-any.whl (3.6 kB view details)

Uploaded Python 3

File details

Details for the file dagster_datacontract-0.1.0.tar.gz.

File metadata

File hashes

Hashes for dagster_datacontract-0.1.0.tar.gz
Algorithm Hash digest
SHA256 59872df2541dbaa24205ead9452b101d00bab964e1abf29775dac8fcb207d7bb
MD5 8887f89c50d001765b19a1b77de7c9c5
BLAKE2b-256 c0bfd957141da807e893f9b4158a52717925bbd96bc95a5ddb424f7b0b488f10

See more details on using hashes here.

File details

Details for the file dagster_datacontract-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_datacontract-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9c8cc8111ba69cf534d00498dbf7524d18d8a7d4598610159c49bc7c793dd4b7
MD5 33b043e5b41ebbe180844c17946ecae1
BLAKE2b-256 e013c8653d3e9dbd59d6ef8a4fe7f1a0d1174ab7cda0c72b53cec903c99944b7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page