Skip to main content

dlt destination for Apache Iceberg with atomic multi-file commits via REST catalogs

Project description

dlt-iceberg

A dlt destination for Apache Iceberg tables using REST catalogs.

Features

  • Atomic Multi-File Commits: Multiple parquet files committed as single Iceberg snapshot per table
  • REST Catalog Support: Works with Nessie, Polaris, AWS Glue, Unity Catalog
  • Credential Vending: Most REST catalogs vend storage credentials automatically
  • Partitioning: Full support for Iceberg partition transforms via iceberg_adapter()
  • Merge Strategies: Delete-insert and upsert with hard delete support
  • DuckDB Integration: Query loaded data via pipeline.dataset()
  • Schema Evolution: Automatic schema updates when adding columns

Installation

pip install dlt-iceberg

Or with uv:

uv add dlt-iceberg

Quick Start

import dlt
from dlt_iceberg import iceberg_rest

@dlt.resource(name="events", write_disposition="append")
def generate_events():
    yield {"event_id": 1, "value": 100}

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination=iceberg_rest(
        catalog_uri="https://my-catalog.example.com/api/catalog",
        namespace="analytics",
        warehouse="my_warehouse",
        credential="client-id:client-secret",
        oauth2_server_uri="https://my-catalog.example.com/oauth/tokens",
    ),
)

pipeline.run(generate_events())

Query Loaded Data

# Query data via DuckDB
dataset = pipeline.dataset()

# Access as dataframe
df = dataset["events"].df()

# Run SQL queries
result = dataset.query("SELECT * FROM events WHERE value > 50").fetchall()

# Get Arrow table
arrow_table = dataset["events"].arrow()

Merge/Upsert

@dlt.resource(
    name="users",
    write_disposition="merge",
    primary_key="user_id"
)
def generate_users():
    yield {"user_id": 1, "name": "Alice", "status": "active"}

pipeline.run(generate_users())

Configuration

Required Options

iceberg_rest(
    catalog_uri="...",    # REST catalog endpoint (or sqlite:// for local)
    namespace="...",      # Iceberg namespace (database)
)

Authentication

Choose based on your catalog:

Catalog Auth Method
Polaris, Lakekeeper credential + oauth2_server_uri
Unity Catalog token
AWS Glue sigv4_enabled + signing_region
Local SQLite None needed

Most REST catalogs (Polaris, Lakekeeper, etc.) vend storage credentials automatically via the catalog API. You typically don't need to configure S3/GCS/Azure credentials manually.

Advanced Options
iceberg_rest(
    # ... required options ...

    # Manual storage credentials (usually not needed with credential vending)
    s3_endpoint="...",
    s3_access_key_id="...",
    s3_secret_access_key="...",
    s3_region="...",

    # Performance tuning
    max_retries=5,               # Retry attempts for transient failures
    retry_backoff_base=2.0,      # Exponential backoff multiplier
    merge_batch_size=500000,     # Rows per batch for merge operations
    strict_casting=False,        # Fail on potential data loss

    # Table management
    table_location_layout=None,  # Custom table location pattern
    register_new_tables=False,   # Register tables found in storage
    hard_delete_column="_dlt_deleted_at",  # Column for hard deletes
)

Catalog Examples

Lakekeeper (Docker)
iceberg_rest(
    catalog_uri="http://localhost:8282/catalog/",
    warehouse="test-warehouse",
    namespace="my_namespace",
    s3_endpoint="http://localhost:9000",
    s3_access_key_id="minioadmin",
    s3_secret_access_key="minioadmin",
    s3_region="us-east-1",
)

Start Lakekeeper + MinIO with docker compose up -d. Lakekeeper supports credential vending in production.

Polaris
iceberg_rest(
    catalog_uri="https://polaris.example.com/api/catalog",
    warehouse="my_warehouse",
    namespace="production",
    credential="client-id:client-secret",
    oauth2_server_uri="https://polaris.example.com/api/catalog/v1/oauth/tokens",
)

Storage credentials are vended automatically by the catalog.

Unity Catalog (Databricks)
iceberg_rest(
    catalog_uri="https://<workspace>.cloud.databricks.com/api/2.1/unity-catalog/iceberg-rest",
    warehouse="<catalog-name>",
    namespace="<schema-name>",
    token="<databricks-token>",
)
AWS Glue
iceberg_rest(
    catalog_uri="https://glue.us-east-1.amazonaws.com/iceberg",
    warehouse="<account-id>:s3tablescatalog/<bucket>",
    namespace="my_database",
    sigv4_enabled=True,
    signing_region="us-east-1",
)

Requires AWS credentials in environment (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).

Local SQLite Catalog
iceberg_rest(
    catalog_uri="sqlite:///catalog.db",
    warehouse="file:///path/to/warehouse",
    namespace="my_namespace",
)

Great for local development and testing.

Nessie (Docker)
iceberg_rest(
    catalog_uri="http://localhost:19120/iceberg/main",
    namespace="my_namespace",
    s3_endpoint="http://localhost:9000",
    s3_access_key_id="minioadmin",
    s3_secret_access_key="minioadmin",
    s3_region="us-east-1",
)

Start Nessie + MinIO with docker compose up -d (see docker-compose.yml in repo).

Partitioning

Using iceberg_adapter (Recommended)

The iceberg_adapter function provides a clean API for configuring Iceberg partitioning:

from dlt_iceberg import iceberg_adapter, iceberg_partition

@dlt.resource(name="events")
def events():
    yield {"event_date": "2024-01-01", "user_id": 123, "region": "US"}

# Single partition
adapted = iceberg_adapter(events, partition="region")

# Multiple partitions with transforms
adapted = iceberg_adapter(
    events,
    partition=[
        iceberg_partition.month("event_date"),
        iceberg_partition.bucket(10, "user_id"),
        "region",  # identity partition
    ]
)

pipeline.run(adapted)

Partition Transforms

# Temporal transforms (for timestamp/date columns)
iceberg_partition.year("created_at")
iceberg_partition.month("created_at")
iceberg_partition.day("created_at")
iceberg_partition.hour("created_at")

# Identity (no transformation)
iceberg_partition.identity("region")

# Bucket (hash into N buckets)
iceberg_partition.bucket(10, "user_id")

# Truncate (truncate to width)
iceberg_partition.truncate(4, "email")

# Custom partition field names
iceberg_partition.month("created_at", "event_month")
iceberg_partition.bucket(8, "user_id", "user_bucket")

Using Column Hints

You can also use dlt column hints for partitioning:

@dlt.resource(
    name="events",
    columns={
        "event_date": {
            "data_type": "date",
            "partition": True,
            "partition_transform": "day",
        },
        "user_id": {
            "data_type": "bigint",
            "partition": True,
            "partition_transform": "bucket[10]",
        }
    }
)
def events():
    ...

Write Dispositions

Append

write_disposition="append"

Adds new data without modifying existing rows.

Replace

write_disposition="replace"

Truncates table and inserts new data.

Merge

Delete-Insert Strategy (Default)

@dlt.resource(
    write_disposition={"disposition": "merge", "strategy": "delete-insert"},
    primary_key="user_id"
)

Deletes matching rows then inserts new data. Single atomic transaction.

Upsert Strategy

@dlt.resource(
    write_disposition={"disposition": "merge", "strategy": "upsert"},
    primary_key="user_id"
)

Updates existing rows, inserts new rows.

Hard Deletes

Mark rows for deletion by setting the _dlt_deleted_at column:

@dlt.resource(
    write_disposition={"disposition": "merge", "strategy": "delete-insert"},
    primary_key="user_id"
)
def users_with_deletes():
    from datetime import datetime
    yield {"user_id": 1, "name": "alice", "_dlt_deleted_at": None}  # Keep
    yield {"user_id": 2, "name": "bob", "_dlt_deleted_at": datetime.now()}  # Delete

Development

Run Tests

# Start Docker services (for Nessie tests)
docker compose up -d

# Run all tests
uv run pytest tests/ -v

# Run only unit tests (no Docker required)
uv run pytest tests/ --ignore=tests/nessie -v

# Run Nessie integration tests
uv run pytest tests/nessie/ -v

Project Structure

dlt-iceberg/
├── src/dlt_iceberg/
│   ├── __init__.py           # Public API
│   ├── destination_client.py # Class-based destination (atomic commits)
│   ├── destination.py        # Function-based destination (legacy)
│   ├── adapter.py            # iceberg_adapter() for partitioning
│   ├── sql_client.py         # DuckDB integration for dataset()
│   ├── schema_converter.py   # dlt → Iceberg schema conversion
│   ├── schema_casting.py     # Arrow table casting
│   ├── schema_evolution.py   # Schema updates
│   ├── partition_builder.py  # Partition specs
│   └── error_handling.py     # Retry logic
├── tests/
│   ├── test_adapter.py       # iceberg_adapter tests
│   ├── test_capabilities.py  # Hard delete, partition names tests
│   ├── test_dataset.py       # DuckDB integration tests
│   ├── test_merge_disposition.py
│   ├── test_schema_evolution.py
│   └── ...
├── examples/
│   ├── incremental_load.py   # CSV incremental loading
│   ├── merge_load.py         # CSV merge/upsert
│   └── data/                 # Sample CSV files
└── docker-compose.yml        # Nessie + MinIO for testing

How It Works

The class-based destination uses dlt's JobClientBase interface to accumulate parquet files during a load and commit them atomically in complete_load():

  1. dlt extracts data and writes parquet files
  2. Each file is registered in module-level global state
  3. After all files complete, complete_load() is called
  4. All files for a table are combined and committed as single Iceberg snapshot
  5. Each table gets one snapshot per load

This ensures atomic commits even though dlt creates multiple client instances.

License

MIT License - see LICENSE file

Resources

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dlt_iceberg-0.3.0.tar.gz (221.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dlt_iceberg-0.3.0-py3-none-any.whl (42.7 kB view details)

Uploaded Python 3

File details

Details for the file dlt_iceberg-0.3.0.tar.gz.

File metadata

  • Download URL: dlt_iceberg-0.3.0.tar.gz
  • Upload date:
  • Size: 221.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dlt_iceberg-0.3.0.tar.gz
Algorithm Hash digest
SHA256 e56d173ed28f80e155845855db5ca29f5a9ccd5cde88af274966818b34f168a1
MD5 de0c5d78b2469a12f0d253c5b94ef4a9
BLAKE2b-256 5c45857b27f3805e16b3d8070c10cfba201f4fb50c3728c6663bd29c4531bb01

See more details on using hashes here.

File details

Details for the file dlt_iceberg-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: dlt_iceberg-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 42.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dlt_iceberg-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f6baea6ada4e9b3f792beb3e3c5e143cf93a9cb4be434e2696204ae3c9c6cef0
MD5 04af9c51e1460d02015a9b7cab759fa3
BLAKE2b-256 ee617e3dc0c4bdef284e3df75b5a756bd12215cb7dd2d780ab683b1c6cec5596

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page