Skip to main content

Load metadata and asset check spesifications from data contracts.

Project description

dagster-datacontract

Dagster Datacontract implementation for loading asset metadata and context information from data contracts to Dagster assets.

Installation

Install dagster-datacontract using uv:

uv add dagster-datacontract

Simple Example

The following example can be found inside the example directory:

from datetime import timedelta
from pathlib import Path

import dagster as dg
import requests
import polars as pl
from datacontract.data_contract import DataContract

from dagster_datacontract import DataContractLoader


asset_name = "yellow_taxi_trip_records"
data_contract = DataContractLoader(
    asset_name=asset_name,
    data_contract=DataContract(
        data_contract_file="./example/datacontract.yml",
    ),
)


@dg.asset(
    name=asset_name,
    metadata=data_contract.metadata,
    tags=data_contract.tags,
    description=data_contract.load_description(),
    owners=data_contract.owner,
    code_version=data_contract.version,
)
def yellow_taxi_trip_records(
    context: dg.AssetExecutionContext,
) -> None:
    download_path = "./example/data"
    Path(download_path).mkdir(parents=True, exist_ok=True)

    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
    response = requests.get(url=url)

    file_path = f"{download_path}/yellow_tripdata_2025-01.parquet"
    context.log.info(f"Reading data from '{url}' and writing to '{file_path}'.")
    with open(file_path, "wb") as f:
        f.write(response.content)

    df = pl.read_parquet(file_path)
    context.log.info(f"File contents downloaded:\n{df}")


asset_check_yellow_taxi_trip_records = data_contract.load_data_quality_checks()

freshness_checks = data_contract.load_freshness_checks(
    lower_bound_delta=timedelta(minutes=5)
)
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
    freshness_checks=freshness_checks,
    default_status=dg.DefaultSensorStatus.RUNNING,
)

job = dg.define_asset_job(
    name="monthly_taxi_trips",
    selection=[asset_name],
)
schedule = dg.ScheduleDefinition(
    job=job,
    cron_schedule=data_contract.cron_schedule,
    default_status=dg.DefaultScheduleStatus.RUNNING,
)


defs = dg.Definitions(
    assets=[yellow_taxi_trip_records],
    asset_checks=[asset_check_yellow_taxi_trip_records, *freshness_checks],
    schedules=[schedule],
    sensors=[freshness_checks_sensor],
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_datacontract-0.2.1.tar.gz (8.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_datacontract-0.2.1-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file dagster_datacontract-0.2.1.tar.gz.

File metadata

File hashes

Hashes for dagster_datacontract-0.2.1.tar.gz
Algorithm Hash digest
SHA256 e844b96f866f53199c417a40bb74e544772c685516476ee80c679c5349066229
MD5 17a8d485e8e4ed1c462e9b82ec7b8929
BLAKE2b-256 15530947c2ab914172f8a46e680e1e7e2cedaeda34ae1eb2043956403595a6f8

See more details on using hashes here.

File details

Details for the file dagster_datacontract-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_datacontract-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 922a668866d9c57f9a59e2bbe789b68f589fa3d784be1932f8d86727cd057019
MD5 3dde5f167edb25e473c0a87aa8dfbb0e
BLAKE2b-256 c31666068b33ad554cc231f9258954fbf3ac8be2bca0955efcc68b3dd83859ad

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page