Skip to main content

Load metadata and asset check spesifications from data contracts.

Project description

dagster-datacontract

Dagster Datacontract implementation for loading asset metadata and context information from data contracts to Dagster assets.

Installation

Install dagster-datacontract using uv:

uv add dagster-datacontract

Simple Example

The following example can be found inside the example directory:

from datetime import timedelta
from pathlib import Path

import dagster as dg
import requests
import polars as pl
from datacontract.data_contract import DataContract

from dagster_datacontract import DataContractLoader


asset_name = "yellow_taxi_trip_records"
data_contract = DataContractLoader(
    asset_name=asset_name,
    data_contract=DataContract(
        data_contract_file="./example/datacontract.yml",
    ),
)


@dg.asset(
    name=asset_name,
    metadata=data_contract.metadata,
    tags=data_contract.tags,
    description=data_contract.load_description(),
    owners=data_contract.owner,
    code_version=data_contract.version,
)
def yellow_taxi_trip_records(
    context: dg.AssetExecutionContext,
) -> None:
    download_path = "./example/data"
    Path(download_path).mkdir(parents=True, exist_ok=True)

    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
    response = requests.get(url=url)

    file_path = f"{download_path}/yellow_tripdata_2025-01.parquet"
    context.log.info(f"Reading data from '{url}' and writing to '{file_path}'.")
    with open(file_path, "wb") as f:
        f.write(response.content)

    df = pl.read_parquet(file_path)
    context.log.info(f"File contents downloaded:\n{df}")


asset_check_yellow_taxi_trip_records = data_contract.load_data_quality_checks()

freshness_checks = data_contract.load_freshness_checks(
    lower_bound_delta=timedelta(minutes=5)
)
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
    freshness_checks=freshness_checks,
    default_status=dg.DefaultSensorStatus.RUNNING,
)

job = dg.define_asset_job(
    name="monthly_taxi_trips",
    selection=[asset_name],
)
schedule = dg.ScheduleDefinition(
    job=job,
    cron_schedule=data_contract.cron_schedule,
    default_status=dg.DefaultScheduleStatus.RUNNING,
)


defs = dg.Definitions(
    assets=[yellow_taxi_trip_records],
    asset_checks=[asset_check_yellow_taxi_trip_records, *freshness_checks],
    schedules=[schedule],
    sensors=[freshness_checks_sensor],
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_datacontract-0.2.2.tar.gz (8.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_datacontract-0.2.2-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file dagster_datacontract-0.2.2.tar.gz.

File metadata

File hashes

Hashes for dagster_datacontract-0.2.2.tar.gz
Algorithm Hash digest
SHA256 5254aa45059e6b48964f52411abcd1cd187543bd11aa1a7894fe71888e1091c8
MD5 2aac30ea607a26434c91638ecf3a2a6c
BLAKE2b-256 f1788a7a1ac013fea47acaa6eb3d611f4764358a414846b045f1f1806a851ba7

See more details on using hashes here.

File details

Details for the file dagster_datacontract-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_datacontract-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8bc753d63a070d44e486d268fd6810887712f5037ae86f8c20da27b9ef812868
MD5 15546d1100dcf3d9a4576c872c7c1ec4
BLAKE2b-256 f5ed0800000e34192355bf1a271a4f88e09d435a74bcf0063b3c60b49fb7dfde

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page