Skip to main content

Load metadata and asset check spesifications from data contracts.

Project description

dagster-datacontract

Dagster Datacontract implementation for loading asset metadata and context information from data contracts to Dagster assets.

Installation

Install dagster-datacontract using uv:

uv add dagster-datacontract

Simple Example

The following example can be found inside the example directory:

from datetime import timedelta
from pathlib import Path

import dagster as dg
import polars as pl
import requests
from datacontract.data_contract import DataContract

from dagster_datacontract import DataContractLoader

asset_name = "yellow_taxi_trip_records"
data_contract = DataContractLoader(
    asset_name=asset_name,
    data_contract=DataContract(
        data_contract_file="./example/datacontract.yml",
        server="production",
    ),
)


@dg.asset(
    name=asset_name,
    metadata=data_contract.metadata,
    tags=data_contract.tags,
    description=data_contract.description,
    owners=data_contract.owner,
    code_version=data_contract.version,
)
def yellow_taxi_trip_records(
    context: dg.AssetExecutionContext,
) -> None:
    download_path = "./example/data"
    Path(download_path).mkdir(parents=True, exist_ok=True)

    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
    response = requests.get(url=url)

    file_path = f"{download_path}/yellow_tripdata_2025-01.parquet"
    context.log.info(f"Reading data from '{url}' and writing to '{file_path}'.")
    with open(file_path, "wb") as f:
        f.write(response.content)

    df = pl.read_parquet(file_path)
    context.log.info(f"File contents downloaded:\n{df}")


asset_check_yellow_taxi_trip_records = data_contract.load_data_quality_checks()

freshness_checks = data_contract.load_freshness_checks(
    lower_bound_delta=timedelta(minutes=5)
)
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
    freshness_checks=freshness_checks,
    default_status=dg.DefaultSensorStatus.RUNNING,
)

job = dg.define_asset_job(
    name="monthly_taxi_trips",
    selection=[asset_name],
)
schedule = dg.ScheduleDefinition(
    job=job,
    cron_schedule=data_contract.cron_schedule,
    default_status=dg.DefaultScheduleStatus.RUNNING,
)


defs = dg.Definitions(
    assets=[yellow_taxi_trip_records],
    asset_checks=[asset_check_yellow_taxi_trip_records, *freshness_checks],
    schedules=[schedule],
    sensors=[freshness_checks_sensor],
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_datacontract-0.2.3.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_datacontract-0.2.3-py3-none-any.whl (9.0 kB view details)

Uploaded Python 3

File details

Details for the file dagster_datacontract-0.2.3.tar.gz.

File metadata

File hashes

Hashes for dagster_datacontract-0.2.3.tar.gz
Algorithm Hash digest
SHA256 e042af06a04c2fa2ebabfa7f369c816cf8f3280cff0d76f2c99e1e40d7b14c5f
MD5 e6f1bc4d80287d733128865a8051288b
BLAKE2b-256 9a4dc68fbbbcfd4a53b0204ae464365c294ff85b851ca55a28855a1b2cdac92a

See more details on using hashes here.

File details

Details for the file dagster_datacontract-0.2.3-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_datacontract-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 effd984c9b2ea6dd1f5bc56c12f0f527fa866c30f53a2bfc2c805f8f11c8ac84
MD5 a635354faecfcb4d864066d844484c10
BLAKE2b-256 6d7d5e1dba919ecbc16c17452f2198232ba0cbe5b1271d32a59453c9a30505f1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page