Skip to main content

Load metadata and asset check spesifications from data contracts.

Project description

dagster-datacontract

Dagster Datacontract implementation for loading asset metadata and context information from data contracts to Dagster assets.

Installation

Install dagster-datacontract using uv:

uv add dagster-datacontract

Simple Example

The following example can be found inside the example directory:

from datetime import timedelta
from pathlib import Path

import dagster as dg
import polars as pl
import requests
from datacontract.data_contract import DataContract

from dagster_datacontract import DataContractLoader

asset_name = "yellow_taxi_trip_records"
data_contract = DataContractLoader(
    asset_name=asset_name,
    data_contract=DataContract(
        data_contract_file="./example/datacontract.yml",
        server="production",
    ),
)


@dg.asset(
    name=asset_name,
    metadata=data_contract.metadata,
    tags=data_contract.tags,
    description=data_contract.description,
    owners=data_contract.owner,
    code_version=data_contract.version,
)
def yellow_taxi_trip_records(
    context: dg.AssetExecutionContext,
) -> None:
    download_path = "./example/data"
    Path(download_path).mkdir(parents=True, exist_ok=True)

    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
    response = requests.get(url=url)

    file_path = f"{download_path}/yellow_tripdata_2025-01.parquet"
    context.log.info(f"Reading data from '{url}' and writing to '{file_path}'.")
    with open(file_path, "wb") as f:
        f.write(response.content)

    df = pl.read_parquet(file_path)
    context.log.info(f"File contents downloaded:\n{df}")


asset_check_yellow_taxi_trip_records = data_contract.load_data_quality_checks()

freshness_checks = data_contract.load_freshness_checks(
    lower_bound_delta=timedelta(minutes=5)
)
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
    freshness_checks=freshness_checks,
    default_status=dg.DefaultSensorStatus.RUNNING,
)

job = dg.define_asset_job(
    name="monthly_taxi_trips",
    selection=[asset_name],
)
schedule = dg.ScheduleDefinition(
    job=job,
    cron_schedule=data_contract.cron_schedule,
    default_status=dg.DefaultScheduleStatus.RUNNING,
)


defs = dg.Definitions(
    assets=[yellow_taxi_trip_records],
    asset_checks=[asset_check_yellow_taxi_trip_records, *freshness_checks],
    schedules=[schedule],
    sensors=[freshness_checks_sensor],
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_datacontract-0.2.4.tar.gz (8.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_datacontract-0.2.4-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file dagster_datacontract-0.2.4.tar.gz.

File metadata

File hashes

Hashes for dagster_datacontract-0.2.4.tar.gz
Algorithm Hash digest
SHA256 944cccdd6f2f6280c273ea307e02559525e83ea57d59de36ede867fc7a6e9c52
MD5 1dcb821b9b4b82e72eed54f2434383d3
BLAKE2b-256 e68e20623497908b446442e1ea5270c21d79d1c5d080e85d35306d20cc4b19af

See more details on using hashes here.

File details

Details for the file dagster_datacontract-0.2.4-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_datacontract-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 3fce16d066f8e10c24dbaad2244050aa4dc4f1de1b30eb9cde252fded8810f50
MD5 3c420e37564f4645f1011a0dd2bd4a68
BLAKE2b-256 1ccc11400f26902f4bed8377d575b0f58c1db63853bf4662411f286046ee9c86

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page