Skip to main content

Community-maintained Dagster integration for drt (data reverse tool)

Project description

dagster-drt

PyPI

Community-maintained Dagster integration for drt (data reverse tool).

Expose drt syncs as Dagster assets with full observability — metrics, dependencies, subsetting, and dry-run support.

Installation

pip install dagster-drt

Quick Start

from dagster import AssetExecutionContext, Definitions
from dagster_drt import drt_assets, DagsterDrtResource

@drt_assets(project_dir="path/to/drt-project")
def my_syncs(context: AssetExecutionContext, drt: DagsterDrtResource):
    yield from drt.run(context=context)

defs = Definitions(
    assets=[my_syncs],
    resources={"drt": DagsterDrtResource(project_dir="path/to/drt-project")},
)

API Overview

Component Purpose
@drt_assets Decorator — creates @multi_asset from drt syncs
build_drt_asset_specs() Spec-only generation (for Pipes / custom execution)
DagsterDrtResource Execution resource with .run()
DagsterDrtTranslator Customise how syncs map to assets
DrtConfig Per-run config (dry-run) from Dagster UI

Features

@drt_assets Decorator

Creates a Dagster multi_asset with can_subset=True from drt sync definitions:

@drt_assets(
    project_dir=".",
    sync_names=["sync_a", "sync_b"],  # optional filter
    group_name="reverse_etl",         # optional group override
    partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"),
    pool="drt_pool",                  # optional concurrency control
)
def my_syncs(context: AssetExecutionContext, drt: DagsterDrtResource):
    yield from drt.run(context=context)

Parameters:

Parameter Type Default Description
project_dir str | Path required Path to drt project root
sync_names list[str] | None None Filter to specific syncs
dagster_drt_translator DagsterDrtTranslator | None None Custom translator
name str | None None Op name
group_name str | None None Group name override
partitions_def PartitionsDefinition | None None Partitions
backfill_policy BackfillPolicy | None auto single_run Backfill policy
pool str | None None Concurrency pool

DagsterDrtResource

Execution resource that yields MaterializeResult per sync:

DagsterDrtResource(
    project_dir=".",  # optional if @drt_assets has it
    dry_run=False,    # default dry-run mode
)
  • Auto-resolves project_dir from @drt_assets metadata
  • Filters to context.selected_asset_keys for subset execution
  • Supports dry_run override per-run: drt.run(context=ctx, dry_run=True)

DagsterDrtTranslator

Customise how drt syncs map to Dagster assets. Override get_asset_spec():

from dagster_drt import DagsterDrtTranslator, drt_assets

class MyTranslator(DagsterDrtTranslator):
    def get_asset_spec(self, data):
        default = super().get_asset_spec(data)
        return default.replace_attributes(
            group_name="reverse_etl",
            owners=["team:data"],
        )

@drt_assets(project_dir=".", dagster_drt_translator=MyTranslator())
def my_syncs(context, drt):
    yield from drt.run(context=context)

Legacy per-attribute methods (get_asset_key, get_group_name, etc.) still work but emit deprecation warnings. Migrate to get_asset_spec().

build_drt_asset_specs (Pipes / Custom Execution)

Generate specs without execution logic — use with Dagster Pipes for remote execution:

from dagster import multi_asset
from dagster_drt import build_drt_asset_specs

specs = build_drt_asset_specs(project_dir=".", sync_names=["my_sync"])

@multi_asset(specs=specs, can_subset=True)
def my_drt_assets(context, pipes: PipesCloudRunJobClient):
    return pipes.run(
        context=context,
        job_name="drt-runner",
        command=["drt", "run", "--sync", "my_sync"],
    ).get_results()

This is the same pattern as dagster-dlt's build_dlt_asset_specs().

MaterializeResult Metadata

Assets return MaterializeResult with structured metadata visible in the Dagster UI:

  • sync_name — sync identifier
  • rows_synced — successful row count
  • rows_failed — failed row count
  • rows_skipped — skipped row count
  • dry_run — whether dry-run was active
  • row_errors_count — number of row-level errors (details in logs)

Asset Kinds

Assets are tagged with kinds={"drt", "<destination_type>"} (e.g. {"drt", "rest_api"}), visible in the Dagster UI asset graph.

Usage with dagster-dbt

from dagster import Definitions
from dagster_dbt import dbt_assets, DbtCliResource
from dagster_drt import drt_assets, DagsterDrtResource

@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

@drt_assets(project_dir="path/to/drt-project")
def my_drt_syncs(context, drt: DagsterDrtResource):
    yield from drt.run(context=context)

defs = Definitions(
    assets=[my_dbt_assets, my_drt_syncs],
    resources={
        "dbt": DbtCliResource(project_dir=dbt_project),
        "drt": DagsterDrtResource(project_dir="path/to/drt-project"),
    },
)

Migration from v0.1

v0.2 introduces the @drt_assets decorator, DagsterDrtResource, and build_drt_asset_specs(). The old drt_assets() function is renamed to drt_assets_legacy() and emits a deprecation warning.

Before (v0.1):

from dagster_drt import drt_assets
defs = Definitions(assets=drt_assets(project_dir="."))

After (v0.2):

from dagster_drt import drt_assets, DagsterDrtResource

@drt_assets(project_dir=".")
def my_syncs(context, drt: DagsterDrtResource):
    yield from drt.run(context=context)

defs = Definitions(
    assets=[my_syncs],
    resources={"drt": DagsterDrtResource(project_dir=".")},
)

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_drt-0.2.0.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_drt-0.2.0-py3-none-any.whl (10.5 kB view details)

Uploaded Python 3

File details

Details for the file dagster_drt-0.2.0.tar.gz.

File metadata

  • Download URL: dagster_drt-0.2.0.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dagster_drt-0.2.0.tar.gz
Algorithm Hash digest
SHA256 425584960ab205a21e1eb69ddee269b52e4fb2ee483f79a9eac3e19cf48e4582
MD5 9e13a7a57b76c1548537a33622772491
BLAKE2b-256 b254a3d080cb0746556a57818470151052a5bb1f7bfd2e2bade155d48c9b582f

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagster_drt-0.2.0.tar.gz:

Publisher: publish-dagster-drt.yml on drt-hub/drt

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagster_drt-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dagster_drt-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 10.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dagster_drt-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1d74961ab305d2639ac1cc5f46fd09aa3c3f03e48bb5f9b1be23be6d9224aec1
MD5 b8f45ecfc10af7700ae0940de77f1fc5
BLAKE2b-256 4c97a886399f00ade352465f605c2cbc394e9271c64d0f311017fe8a89718a96

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagster_drt-0.2.0-py3-none-any.whl:

Publisher: publish-dagster-drt.yml on drt-hub/drt

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page