Skip to main content

Deltalake IO Managers for Dagster with pyarrow and Polars support.

Project description

dagster-delta

Dagster deltalake implementation for Pyarrow & Polars. Originally forked from dagster-deltalake with customizations.

The IO Managers support partition mapping, custom write modes, special metadata configuration for advanced use cases.

The supported write modes:

  • error
  • append
  • overwrite
  • ignore
  • merge
  • create_or_replace

Merge

dagster-delta supports MERGE execution with a couple pre-defined MERGE types (dagster_delta.config.MergeType):

  • deduplicate_insert <- Deduplicates on write
  • update_only <- updates only the matches records
  • upsert <- updates existing matches and inserts non matched records
  • replace_and_delete_unmatched <- updates existing matches and deletes unmatched
  • custom <- custom Merge with MergeOperationsConfig

Example:

from dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType
from dagster_delta_polars import DeltaLakePolarsIOManager

@asset(
    key_prefix=["my_schema"]  # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pl.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": DeltaLakePolarsIOManager(
        root_uri="s3://bucket",
        mode=WriteMode.merge, # or just "merge"
        merge_config=MergeConfig(
            merge_type=MergeType.upsert,
            predicate="s.a = t.a",
            source_alias="s",
            target_alias="t",
        )
    )}
)

Custom merge (gives full control)

from dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType, MergeOperationsConfig
from dagster_delta_polars import DeltaLakePolarsIOManager

@asset(
    key_prefix=["my_schema"]  # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pl.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": DeltaLakePolarsIOManager(
        root_uri="s3://bucket",
        mode=WriteMode.merge, # or just "merge"
        merge_config=MergeConfig(
            merge_type=MergeType.custom,
            predicate="s.a = t.a",
            source_alias="s",
            target_alias="t",
            merge_operations_config=MergeOperationsConfig(
                when_not_matched_insert_all=[WhenNotMatchedInsertAll(predicate="s.price > 600")],
                when_matched_update_all=[WhenMatchedUpdateAll()],
            ),
        )
    )}
)

Special metadata configurations

Add additional table_configuration

Specify additional table configurations for configuration in write_deltalake.

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"table_configuration": {
        "delta.enableChangeDataFeed": "true"
    }},
)
def my_asset() -> pl.DataFrame:
    ...

Overwrite the write mode

Override the write mode to be used in write_deltalake.

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"mode": "append"},
)
def my_asset() -> pl.DataFrame:
    ...

Overwrite the custom_metadata

Override the custom_metadata to be used in write_deltalake.

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"custom_metadata": {"owner":"John Doe"}},
)
def my_asset() -> pl.DataFrame:
    ...

Overwrite the write schema_mode

Override the schema_mode to be used in write_deltalake.

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"schema_mode": "merge"},
)
def my_asset() -> pl.DataFrame:
    ...

Overwrite the writer_properties

Override the writer_properties to be used in write_deltalake.

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"writer_properties": {
        "compression": "SNAPPY",
    }},
)
def my_asset() -> pl.DataFrame:
    ...

Overwrite the merge_predicate

Override the merge_predicate to be used with merge execution.

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"merge_predicate": "s.foo = t.foo AND s.bar = t.bar"},
)
def my_asset() -> pl.DataFrame:
    ...

Overwrite the schema

Override the schema of where the table will be saved

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"schema": "custom_db_schema"},
)
def my_asset() -> pl.DataFrame:
    ...

Set the columns that need to be read

Override the columns to only load these columns in

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    ins = {
        "upstream_asset": dg.AssetIn(metadata={"columns":["foo","bar"]})
    }
)
def my_asset(upstream_asset) -> pl.DataFrame:
    ...

Override table name using root_name

Instead of using the asset_name for the table name it's possible to set a custom table name using the root_name in the asset defintion metadata.

This is useful where you have two or multiple assets who have the same table structure, but each asset is a subset of the full table partition_definition, and it wasn't possible to combine this into a single asset due to requiring different underlying Op logic and/or upstream assets:

import polars as pl
import dagster as dg

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    partitions_def=dg.StaticPartitionsDefinition(["a", "b"]),
    metadata={
        "partition_expr": "foo",
        "root_name": "asset_partitioned",
        },
)
def asset_partitioned_1(upstream_1: pl.DataFrame, upstream_2: pl.DataFrame) -> pl.DataFrame:
    ...

@dg.asset(
    partitions_def=dg.StaticPartitionsDefinition(["c", "d"]),
    metadata={
        "partition_expr": "foo",
        "root_name": "asset_partitioned",
        },
)
def asset_partitioned_2(upstream_3: pl.DataFrame, upstream_4: pl.DataFrame) -> pl.DataFrame:
    ...

Effectively this would be the flow:


                 {static_partition_def: [a,b]}
┌───────────┐
│upstream 1 ├─┐ ┌────────────────────────┐
└───────────┘ │ │                        │            write to storage on partition (a,b)
┌───────────┐ └─►   asset_partitioned_1  ├──────────────────────┐
│upstream 2 ├───►                        │                      │
└───────────┘   └────────────────────────┘       ┌──────────────▼──────────────────┐
                                                 │                     partitions  │
                                                 │  asset_partitioned:             │
                                                 │                     [a,b,c,d]   │
┌───────────┐   ┌────────────────────────┐       └──────────────▲──────────────────┘
│upstream 3 ├──┐│                        │                      │
└───────────┘  └►   asset_partitioned_2  │                      │
┌───────────┐ ┌─►                        ├──────────────────────┘
│upstream 4 ├─┘ └────────────────────────┘            write to storage on partition (c,d)
└───────────┘
                 {static_partition_def: [c,d]}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_delta-0.5.7.tar.gz (127.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_delta-0.5.7-py3-none-any.whl (31.6 kB view details)

Uploaded Python 3

File details

Details for the file dagster_delta-0.5.7.tar.gz.

File metadata

  • Download URL: dagster_delta-0.5.7.tar.gz
  • Upload date:
  • Size: 127.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dagster_delta-0.5.7.tar.gz
Algorithm Hash digest
SHA256 af104ed8fa36b844da91e1f3e4fa10652b60969167726c692e024b0c8d6c2fe9
MD5 47c38847a9eb5fec163c6beafe573e8d
BLAKE2b-256 e7b682b1f3163fa5010d75d973994f34017c0a6dc56b145a7188dca08bb853a7

See more details on using hashes here.

File details

Details for the file dagster_delta-0.5.7-py3-none-any.whl.

File metadata

  • Download URL: dagster_delta-0.5.7-py3-none-any.whl
  • Upload date:
  • Size: 31.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dagster_delta-0.5.7-py3-none-any.whl
Algorithm Hash digest
SHA256 4298d90297fbe0d3694b0ceea16b6c86ea17f6780bee65dfa2d29d1096ff27d4
MD5 383885aead1c3f42654656498a08bcda
BLAKE2b-256 23e8751ffc8968183cae9ddacb30a1ec31e063bd106b495e1037ce545c0e5387

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page