Deltalake IO Managers for Dagster with pyarrow and Polars support.
Project description
dagster-delta
Dagster deltalake implementation for Pyarrow & Polars. Originally forked from dagster-deltalake with customizations.
The IO Managers support partition mapping, custom write modes, special metadata configuration for advanced use cases.
The supported write modes:
- error
- append
- overwrite
- ignore
- merge
- create_or_replace
Merge
dagster-delta supports MERGE execution with a couple pre-defined MERGE types (dagster_delta.config.MergeType):
- deduplicate_insert <- Deduplicates on write
- update_only <- updates only the matches records
- upsert <- updates existing matches and inserts non matched records
- replace_and_delete_unmatched <- updates existing matches and deletes unmatched
- custom <- custom Merge with MergeOperationsConfig
Example:
from dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType
from dagster_delta_polars import DeltaLakePolarsIOManager
@asset(
key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": DeltaLakePolarsIOManager(
root_uri="s3://bucket",
mode=WriteMode.merge, # or just "merge"
merge_config=MergeConfig(
merge_type=MergeType.upsert,
predicate="s.a = t.a",
source_alias="s",
target_alias="t",
)
)}
)
Custom merge (gives full control)
from dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType, MergeOperationsConfig
from dagster_delta_polars import DeltaLakePolarsIOManager
@asset(
key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": DeltaLakePolarsIOManager(
root_uri="s3://bucket",
mode=WriteMode.merge, # or just "merge"
merge_config=MergeConfig(
merge_type=MergeType.custom,
predicate="s.a = t.a",
source_alias="s",
target_alias="t",
merge_operations_config=MergeOperationsConfig(
when_not_matched_insert_all=[WhenNotMatchedInsertAll(predicate="s.price > 600")],
when_matched_update_all=[WhenMatchedUpdateAll()],
),
)
)}
)
Special metadata configurations
Add additional table_configuration
Specify additional table configurations for configuration in write_deltalake.
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"table_configuration": {
"delta.enableChangeDataFeed": "true"
}},
)
def my_asset() -> pl.DataFrame:
...
Overwrite the write mode
Override the write mode to be used in write_deltalake.
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"mode": "append"},
)
def my_asset() -> pl.DataFrame:
...
Overwrite the custom_metadata
Override the custom_metadata to be used in write_deltalake.
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"custom_metadata": {"owner":"John Doe"}},
)
def my_asset() -> pl.DataFrame:
...
Overwrite the write schema_mode
Override the schema_mode to be used in write_deltalake.
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"schema_mode": "merge"},
)
def my_asset() -> pl.DataFrame:
...
Overwrite the writer_properties
Override the writer_properties to be used in write_deltalake.
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"writer_properties": {
"compression": "SNAPPY",
}},
)
def my_asset() -> pl.DataFrame:
...
Overwrite the merge_predicate
Override the merge_predicate to be used with merge execution.
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"merge_predicate": "s.foo = t.foo AND s.bar = t.bar"},
)
def my_asset() -> pl.DataFrame:
...
Overwrite the schema
Override the schema of where the table will be saved
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"schema": "custom_db_schema"},
)
def my_asset() -> pl.DataFrame:
...
Set the columns that need to be read
Override the columns to only load these columns in
@dg.asset(
io_manager_key = "deltalake_io_manager",
ins = {
"upstream_asset": dg.AssetIn(metadata={"columns":["foo","bar"]})
}
)
def my_asset(upstream_asset) -> pl.DataFrame:
...
Override table name using root_name
Instead of using the asset_name for the table name it's possible to set a custom table name using the root_name in the asset defintion metadata.
This is useful where you have two or multiple assets who have the same table structure, but each asset is a subset of the full table partition_definition, and it wasn't possible to combine this into a single asset due to requiring different underlying Op logic and/or upstream assets:
import polars as pl
import dagster as dg
@dg.asset(
io_manager_key = "deltalake_io_manager",
partitions_def=dg.StaticPartitionsDefinition(["a", "b"]),
metadata={
"partition_expr": "foo",
"root_name": "asset_partitioned",
},
)
def asset_partitioned_1(upstream_1: pl.DataFrame, upstream_2: pl.DataFrame) -> pl.DataFrame:
...
@dg.asset(
partitions_def=dg.StaticPartitionsDefinition(["c", "d"]),
metadata={
"partition_expr": "foo",
"root_name": "asset_partitioned",
},
)
def asset_partitioned_2(upstream_3: pl.DataFrame, upstream_4: pl.DataFrame) -> pl.DataFrame:
...
Effectively this would be the flow:
{static_partition_def: [a,b]}
┌───────────┐
│upstream 1 ├─┐ ┌────────────────────────┐
└───────────┘ │ │ │ write to storage on partition (a,b)
┌───────────┐ └─► asset_partitioned_1 ├──────────────────────┐
│upstream 2 ├───► │ │
└───────────┘ └────────────────────────┘ ┌──────────────▼──────────────────┐
│ partitions │
│ asset_partitioned: │
│ [a,b,c,d] │
┌───────────┐ ┌────────────────────────┐ └──────────────▲──────────────────┘
│upstream 3 ├──┐│ │ │
└───────────┘ └► asset_partitioned_2 │ │
┌───────────┐ ┌─► ├──────────────────────┘
│upstream 4 ├─┘ └────────────────────────┘ write to storage on partition (c,d)
└───────────┘
{static_partition_def: [c,d]}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_delta-0.5.7.tar.gz.
File metadata
- Download URL: dagster_delta-0.5.7.tar.gz
- Upload date:
- Size: 127.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
af104ed8fa36b844da91e1f3e4fa10652b60969167726c692e024b0c8d6c2fe9
|
|
| MD5 |
47c38847a9eb5fec163c6beafe573e8d
|
|
| BLAKE2b-256 |
e7b682b1f3163fa5010d75d973994f34017c0a6dc56b145a7188dca08bb853a7
|
File details
Details for the file dagster_delta-0.5.7-py3-none-any.whl.
File metadata
- Download URL: dagster_delta-0.5.7-py3-none-any.whl
- Upload date:
- Size: 31.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4298d90297fbe0d3694b0ceea16b6c86ea17f6780bee65dfa2d29d1096ff27d4
|
|
| MD5 |
383885aead1c3f42654656498a08bcda
|
|
| BLAKE2b-256 |
23e8751ffc8968183cae9ddacb30a1ec31e063bd106b495e1037ce545c0e5387
|