Community-maintained Dagster integration for drt (data reverse tool)
Project description
dagster-drt
Community-maintained Dagster integration for drt (data reverse tool).
Expose drt syncs as Dagster assets with full observability — metrics, dependencies, subsetting, and dry-run support.
Installation
pip install dagster-drt
Quick Start
from dagster import AssetExecutionContext, Definitions
from dagster_drt import drt_assets, DagsterDrtResource
@drt_assets(project_dir="path/to/drt-project")
def my_syncs(context: AssetExecutionContext, drt: DagsterDrtResource):
yield from drt.run(context=context)
defs = Definitions(
assets=[my_syncs],
resources={"drt": DagsterDrtResource(project_dir="path/to/drt-project")},
)
API Overview
| Component | Purpose |
|---|---|
@drt_assets |
Decorator — creates @multi_asset from drt syncs |
build_drt_asset_specs() |
Spec-only generation (for Pipes / custom execution) |
DagsterDrtResource |
Execution resource with .run() |
DagsterDrtTranslator |
Customise how syncs map to assets |
DrtConfig |
Per-run config (dry-run) from Dagster UI |
Features
@drt_assets Decorator
Creates a Dagster multi_asset with can_subset=True from drt sync definitions:
@drt_assets(
project_dir=".",
sync_names=["sync_a", "sync_b"], # optional filter
group_name="reverse_etl", # optional group override
partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"),
pool="drt_pool", # optional concurrency control
)
def my_syncs(context: AssetExecutionContext, drt: DagsterDrtResource):
yield from drt.run(context=context)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
project_dir |
str | Path |
required | Path to drt project root |
sync_names |
list[str] | None |
None |
Filter to specific syncs |
dagster_drt_translator |
DagsterDrtTranslator | None |
None |
Custom translator |
name |
str | None |
None |
Op name |
group_name |
str | None |
None |
Group name override |
partitions_def |
PartitionsDefinition | None |
None |
Partitions |
backfill_policy |
BackfillPolicy | None |
auto single_run |
Backfill policy |
pool |
str | None |
None |
Concurrency pool |
DagsterDrtResource
Execution resource that yields MaterializeResult per sync:
DagsterDrtResource(
project_dir=".", # optional if @drt_assets has it
dry_run=False, # default dry-run mode
)
- Auto-resolves
project_dirfrom@drt_assetsmetadata - Filters to
context.selected_asset_keysfor subset execution - Supports
dry_runoverride per-run:drt.run(context=ctx, dry_run=True)
DagsterDrtTranslator
Customise how drt syncs map to Dagster assets. Override get_asset_spec():
from dagster_drt import DagsterDrtTranslator, drt_assets
class MyTranslator(DagsterDrtTranslator):
def get_asset_spec(self, data):
default = super().get_asset_spec(data)
return default.replace_attributes(
group_name="reverse_etl",
owners=["team:data"],
)
@drt_assets(project_dir=".", dagster_drt_translator=MyTranslator())
def my_syncs(context, drt):
yield from drt.run(context=context)
Legacy per-attribute methods (get_asset_key, get_group_name, etc.) still work but emit deprecation warnings. Migrate to get_asset_spec().
build_drt_asset_specs (Pipes / Custom Execution)
Generate specs without execution logic — use with Dagster Pipes for remote execution:
from dagster import multi_asset
from dagster_drt import build_drt_asset_specs
specs = build_drt_asset_specs(project_dir=".", sync_names=["my_sync"])
@multi_asset(specs=specs, can_subset=True)
def my_drt_assets(context, pipes: PipesCloudRunJobClient):
return pipes.run(
context=context,
job_name="drt-runner",
command=["drt", "run", "--sync", "my_sync"],
).get_results()
This is the same pattern as dagster-dlt's build_dlt_asset_specs().
MaterializeResult Metadata
Assets return MaterializeResult with structured metadata visible in the Dagster UI:
sync_name— sync identifierrows_synced— successful row countrows_failed— failed row countrows_skipped— skipped row countdry_run— whether dry-run was activerow_errors_count— number of row-level errors (details in logs)
Asset Kinds
Assets are tagged with kinds={"drt", "<destination_type>"} (e.g. {"drt", "rest_api"}), visible in the Dagster UI asset graph.
Usage with dagster-dbt
from dagster import Definitions
from dagster_dbt import dbt_assets, DbtCliResource
from dagster_drt import drt_assets, DagsterDrtResource
@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
@drt_assets(project_dir="path/to/drt-project")
def my_drt_syncs(context, drt: DagsterDrtResource):
yield from drt.run(context=context)
defs = Definitions(
assets=[my_dbt_assets, my_drt_syncs],
resources={
"dbt": DbtCliResource(project_dir=dbt_project),
"drt": DagsterDrtResource(project_dir="path/to/drt-project"),
},
)
Migration from v0.1
v0.2 introduces the @drt_assets decorator, DagsterDrtResource, and build_drt_asset_specs(). The old drt_assets() function is renamed to drt_assets_legacy() and emits a deprecation warning.
Before (v0.1):
from dagster_drt import drt_assets
defs = Definitions(assets=drt_assets(project_dir="."))
After (v0.2):
from dagster_drt import drt_assets, DagsterDrtResource
@drt_assets(project_dir=".")
def my_syncs(context, drt: DagsterDrtResource):
yield from drt.run(context=context)
defs = Definitions(
assets=[my_syncs],
resources={"drt": DagsterDrtResource(project_dir=".")},
)
License
Apache-2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_drt-0.3.0.tar.gz.
File metadata
- Download URL: dagster_drt-0.3.0.tar.gz
- Upload date:
- Size: 11.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
84e9262703a4882b9586e602b81e8bd2905609d21a48e125a88a62d4510a41dd
|
|
| MD5 |
fb3477e0c7fd8f96632582b34d4d0a95
|
|
| BLAKE2b-256 |
917d02e9fdac9a15c6a3c61edf25187f08e053299c89048c844903e3a2bf9afb
|
Provenance
The following attestation bundles were made for dagster_drt-0.3.0.tar.gz:
Publisher:
publish-dagster-drt.yml on drt-hub/drt
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagster_drt-0.3.0.tar.gz -
Subject digest:
84e9262703a4882b9586e602b81e8bd2905609d21a48e125a88a62d4510a41dd - Sigstore transparency entry: 1315945445
- Sigstore integration time:
-
Permalink:
drt-hub/drt@a0d102a98287e14d9509ba56193b818bcf77d2a5 -
Branch / Tag:
refs/tags/dagster-drt-v0.3.0 - Owner: https://github.com/drt-hub
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-dagster-drt.yml@a0d102a98287e14d9509ba56193b818bcf77d2a5 -
Trigger Event:
push
-
Statement type:
File details
Details for the file dagster_drt-0.3.0-py3-none-any.whl.
File metadata
- Download URL: dagster_drt-0.3.0-py3-none-any.whl
- Upload date:
- Size: 10.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b463f34ff643add5c1acd1028f0e9c532539297ce738c6f6e8a05431a8892603
|
|
| MD5 |
de83cc762cdcde3380712e2123bf48d7
|
|
| BLAKE2b-256 |
a2741c5d149ce80d6139e7d953612ac23eba4d95af95cf1da09a7ec87668295b
|
Provenance
The following attestation bundles were made for dagster_drt-0.3.0-py3-none-any.whl:
Publisher:
publish-dagster-drt.yml on drt-hub/drt
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagster_drt-0.3.0-py3-none-any.whl -
Subject digest:
b463f34ff643add5c1acd1028f0e9c532539297ce738c6f6e8a05431a8892603 - Sigstore transparency entry: 1315945473
- Sigstore integration time:
-
Permalink:
drt-hub/drt@a0d102a98287e14d9509ba56193b818bcf77d2a5 -
Branch / Tag:
refs/tags/dagster-drt-v0.3.0 - Owner: https://github.com/drt-hub
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-dagster-drt.yml@a0d102a98287e14d9509ba56193b818bcf77d2a5 -
Trigger Event:
push
-
Statement type: