Skip to main content

dlt destination for Apache Iceberg with atomic multi-file commits via REST catalogs

Project description

dlt-iceberg

A dlt destination for Apache Iceberg tables using REST catalogs.

Features

  • Atomic Multi-File Commits: Multiple parquet files committed as single Iceberg snapshot per table
  • REST Catalog Support: Works with Nessie, Polaris, AWS Glue, Unity Catalog
  • Credential Vending: Most REST catalogs vend storage credentials automatically
  • Partitioning: Full support for Iceberg partition transforms via iceberg_adapter()
  • Merge Strategies: Delete-insert and upsert with hard delete support
  • DuckDB Integration: Query loaded data via pipeline.dataset()
  • Schema Evolution: Automatic schema updates when adding columns

Installation

pip install dlt-iceberg

Or with uv:

uv add dlt-iceberg

Quick Start

import dlt
from dlt_iceberg import iceberg_rest

@dlt.resource(name="events", write_disposition="append")
def generate_events():
    yield {"event_id": 1, "value": 100}

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination=iceberg_rest(
        catalog_uri="https://my-catalog.example.com/api/catalog",
        namespace="analytics",
        warehouse="my_warehouse",
        credential="client-id:client-secret",
        oauth2_server_uri="https://my-catalog.example.com/oauth/tokens",
    ),
)

pipeline.run(generate_events())

Query Loaded Data

# Query data via DuckDB
dataset = pipeline.dataset()

# Access as dataframe
df = dataset["events"].df()

# Run SQL queries
result = dataset.query("SELECT * FROM events WHERE value > 50").fetchall()

# Get Arrow table
arrow_table = dataset["events"].arrow()

Merge/Upsert

@dlt.resource(
    name="users",
    write_disposition="merge",
    primary_key="user_id"
)
def generate_users():
    yield {"user_id": 1, "name": "Alice", "status": "active"}

pipeline.run(generate_users())

Configuration

Required Options

iceberg_rest(
    catalog_uri="...",    # REST catalog endpoint (or sqlite:// for local)
    namespace="...",      # Iceberg namespace (database)
)

Authentication

Choose based on your catalog:

Catalog Auth Method
Polaris, Lakekeeper credential + oauth2_server_uri
Unity Catalog token
AWS Glue sigv4_enabled + signing_region
Local SQLite None needed

Most REST catalogs (Polaris, Lakekeeper, etc.) vend storage credentials automatically via the catalog API. You typically don't need to configure S3/GCS/Azure credentials manually.

Advanced Options
iceberg_rest(
    # ... required options ...

    # Manual storage credentials (usually not needed with credential vending)
    s3_endpoint="...",
    s3_access_key_id="...",
    s3_secret_access_key="...",
    s3_region="...",

    # Performance tuning
    max_retries=5,               # Retry attempts for transient failures
    retry_backoff_base=2.0,      # Exponential backoff multiplier
    merge_batch_size=500000,     # Rows per batch for merge operations
    strict_casting=False,        # Fail on potential data loss

    # Table management
    table_location_layout=None,  # Custom table location pattern
    register_new_tables=False,   # Register tables found in storage
    hard_delete_column="_dlt_deleted_at",  # Column for hard deletes
)

Catalog Examples

Lakekeeper (Docker)
iceberg_rest(
    catalog_uri="http://localhost:8282/catalog/",
    warehouse="test-warehouse",
    namespace="my_namespace",
    s3_endpoint="http://localhost:9000",
    s3_access_key_id="minioadmin",
    s3_secret_access_key="minioadmin",
    s3_region="us-east-1",
)

Start Lakekeeper + MinIO with docker compose up -d. Lakekeeper supports credential vending in production.

Polaris
iceberg_rest(
    catalog_uri="https://polaris.example.com/api/catalog",
    warehouse="my_warehouse",
    namespace="production",
    credential="client-id:client-secret",
    oauth2_server_uri="https://polaris.example.com/api/catalog/v1/oauth/tokens",
)

Storage credentials are vended automatically by the catalog.

Unity Catalog (Databricks)
iceberg_rest(
    catalog_uri="https://<workspace>.cloud.databricks.com/api/2.1/unity-catalog/iceberg-rest",
    warehouse="<catalog-name>",
    namespace="<schema-name>",
    token="<databricks-token>",
)
AWS Glue
iceberg_rest(
    catalog_uri="https://glue.us-east-1.amazonaws.com/iceberg",
    warehouse="<account-id>:s3tablescatalog/<bucket>",
    namespace="my_database",
    sigv4_enabled=True,
    signing_region="us-east-1",
)

Requires AWS credentials in environment (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).

Local SQLite Catalog
iceberg_rest(
    catalog_uri="sqlite:///catalog.db",
    warehouse="file:///path/to/warehouse",
    namespace="my_namespace",
)

Great for local development and testing.

Nessie (Docker)
iceberg_rest(
    catalog_uri="http://localhost:19120/iceberg/main",
    namespace="my_namespace",
    s3_endpoint="http://localhost:9000",
    s3_access_key_id="minioadmin",
    s3_secret_access_key="minioadmin",
    s3_region="us-east-1",
)

Start Nessie + MinIO with docker compose up -d (see docker-compose.yml in repo).

Partitioning

Using iceberg_adapter (Recommended)

The iceberg_adapter function provides a clean API for configuring Iceberg partitioning:

from dlt_iceberg import iceberg_adapter, iceberg_partition

@dlt.resource(name="events")
def events():
    yield {"event_date": "2024-01-01", "user_id": 123, "region": "US"}

# Single partition
adapted = iceberg_adapter(events, partition="region")

# Multiple partitions with transforms
adapted = iceberg_adapter(
    events,
    partition=[
        iceberg_partition.month("event_date"),
        iceberg_partition.bucket(10, "user_id"),
        "region",  # identity partition
    ]
)

pipeline.run(adapted)

Partition Transforms

# Temporal transforms (for timestamp/date columns)
iceberg_partition.year("created_at")
iceberg_partition.month("created_at")
iceberg_partition.day("created_at")
iceberg_partition.hour("created_at")

# Identity (no transformation)
iceberg_partition.identity("region")

# Bucket (hash into N buckets)
iceberg_partition.bucket(10, "user_id")

# Truncate (truncate to width)
iceberg_partition.truncate(4, "email")

# Custom partition field names
iceberg_partition.month("created_at", "event_month")
iceberg_partition.bucket(8, "user_id", "user_bucket")

Using Column Hints

You can also use dlt column hints for partitioning:

@dlt.resource(
    name="events",
    columns={
        "event_date": {
            "data_type": "date",
            "partition": True,
            "partition_transform": "day",
        },
        "user_id": {
            "data_type": "bigint",
            "partition": True,
            "partition_transform": "bucket[10]",
        }
    }
)
def events():
    ...

Write Dispositions

Append

write_disposition="append"

Adds new data without modifying existing rows.

Replace

write_disposition="replace"

Truncates table and inserts new data.

Merge

Delete-Insert Strategy (Default)

@dlt.resource(
    write_disposition={"disposition": "merge", "strategy": "delete-insert"},
    primary_key="user_id"
)

Deletes matching rows then inserts new data. Single atomic transaction.

Upsert Strategy

@dlt.resource(
    write_disposition={"disposition": "merge", "strategy": "upsert"},
    primary_key="user_id"
)

Updates existing rows, inserts new rows.

Hard Deletes

Mark rows for deletion by setting the _dlt_deleted_at column:

@dlt.resource(
    write_disposition={"disposition": "merge", "strategy": "delete-insert"},
    primary_key="user_id"
)
def users_with_deletes():
    from datetime import datetime
    yield {"user_id": 1, "name": "alice", "_dlt_deleted_at": None}  # Keep
    yield {"user_id": 2, "name": "bob", "_dlt_deleted_at": datetime.now()}  # Delete

Development

Run Tests

# Start Docker services (for Nessie tests)
docker compose up -d

# Run all tests
uv run pytest tests/ -v

# Run only unit tests (no Docker required)
uv run pytest tests/ --ignore=tests/nessie -v

# Run Nessie integration tests
uv run pytest tests/nessie/ -v

Project Structure

dlt-iceberg/
├── src/dlt_iceberg/
│   ├── __init__.py           # Public API
│   ├── destination_client.py # Class-based destination (atomic commits)
│   ├── destination.py        # Function-based destination (legacy)
│   ├── adapter.py            # iceberg_adapter() for partitioning
│   ├── sql_client.py         # DuckDB integration for dataset()
│   ├── schema_converter.py   # dlt → Iceberg schema conversion
│   ├── schema_casting.py     # Arrow table casting
│   ├── schema_evolution.py   # Schema updates
│   ├── partition_builder.py  # Partition specs
│   └── error_handling.py     # Retry logic
├── tests/
│   ├── test_adapter.py       # iceberg_adapter tests
│   ├── test_capabilities.py  # Hard delete, partition names tests
│   ├── test_dataset.py       # DuckDB integration tests
│   ├── test_merge_disposition.py
│   ├── test_schema_evolution.py
│   └── ...
├── examples/
│   ├── incremental_load.py   # CSV incremental loading
│   ├── merge_load.py         # CSV merge/upsert
│   └── data/                 # Sample CSV files
└── docker-compose.yml        # Nessie + MinIO for testing

How It Works

The class-based destination uses dlt's JobClientBase interface to accumulate parquet files during a load and commit them atomically in complete_load():

  1. dlt extracts data and writes parquet files
  2. Each file is registered in module-level global state
  3. After all files complete, complete_load() is called
  4. All files for a table are combined and committed as single Iceberg snapshot
  5. Each table gets one snapshot per load

This ensures atomic commits even though dlt creates multiple client instances.

License

MIT License - see LICENSE file

Resources

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dlt_iceberg-0.4.1.tar.gz (223.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dlt_iceberg-0.4.1-py3-none-any.whl (43.7 kB view details)

Uploaded Python 3

File details

Details for the file dlt_iceberg-0.4.1.tar.gz.

File metadata

  • Download URL: dlt_iceberg-0.4.1.tar.gz
  • Upload date:
  • Size: 223.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dlt_iceberg-0.4.1.tar.gz
Algorithm Hash digest
SHA256 7b2090baf68f93d8c5e28a126a04ec85298a2f8e86d897db5ab8556e6198bacc
MD5 c3e3e704dcdc4993f8b7568729923063
BLAKE2b-256 4deadcbc76233c7c37dbf7a178cc574e9f5b448a0c0c50c6cf435abfd7835e8c

See more details on using hashes here.

File details

Details for the file dlt_iceberg-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: dlt_iceberg-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 43.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dlt_iceberg-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 cdf833478f87baeb5759417e6de1fab75eb63d34adfae0a412d94d9fe55fe1de
MD5 4ce3d83cda368e93cef93a958f0983b4
BLAKE2b-256 30672f31121e17a676199e3db4fbe41ae1d5313b2c83080a524449ec163abdd0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page