Utilities for loading data into Iceberg tables using PyArrow
Project description
iceberg-loader
Utilities for loading data into Iceberg tables using PyArrow. Focus on robust ingestion (messy JSON, schema evolution, idempotent loads, upsert) with a simple facade and configurable defaults.
Why iceberg-loader?
- Handles messy JSON: auto-serializes dict/list/mixed fields to strings so writes don’t fail.
- Schema evolution: add columns on the fly (opt-in), preserves field ids.
- Safe writes: append/overwrite, idempotent replace via
replace_filter, upsert. - Stream friendly: commit intervals, batches, IPC streams.
- One config (
LoaderConfig) to set defaults; override per-call if needed.
Features
- Arrow-first:
pa.Table,RecordBatch, IPC. - Messy JSON friendly: dict/list/mixed -> JSON strings.
- Schema evolution (opt-in).
- Idempotent replace (
replace_filter) and upsert. - Commit interval for long streams.
- Maintenance helpers (expire snapshots).
Quickstart
pip install "iceberg-loader[all]"
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from iceberg_loader import LoaderConfig, load_data_to_iceberg
catalog = load_catalog("default")
data = pa.Table.from_pydict({"id": [1, 2], "signup_date": ["2023-01-01", "2023-01-02"]})
config = LoaderConfig(write_mode="append", partition_col="signup_date", schema_evolution=True)
load_data_to_iceberg(data, ("db", "users"), catalog, config=config)
Compatibility
- Python: 3.10, 3.11, 3.12, 3.13, 3.14
- PyArrow: >= 18.0.0
- PyIceberg: >= 0.7.1 (use extras
hive,s3fs,pyiceberg-coreas needed)
Usage
Basic Example
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from iceberg_loader import LoaderConfig, load_data_to_iceberg
# 1. Connect to your catalog
catalog = load_catalog("default")
# 2. Prepare data
data = pa.Table.from_pydict({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"created_at": [1672531200000, 1672617600000, 1672704000000],
"signup_date": ["2023-01-01", "2023-01-01", "2023-01-02"]
})
# 3. Load data
config = LoaderConfig(write_mode="append", partition_col="signup_date", schema_evolution=True)
result = load_data_to_iceberg(
table_data=data,
table_identifier=("my_db", "my_table"),
catalog=catalog,
config=config,
)
print(result)
# {'rows_loaded': 3, 'write_mode': 'append', 'partition_col': 'signup_date', ...}
Idempotent Load (Replace Partition)
To safely re-load data for a specific day (avoiding duplicates):
config = LoaderConfig(
write_mode="append",
replace_filter="signup_date == '2023-01-01'",
partition_col="signup_date",
)
load_data_to_iceberg(table_data=data, table_identifier=("my_db", "my_table"), catalog=catalog, config=config)
Upsert (Merge Into)
Perform a merge operation (update existing rows, insert new ones) based on key columns. Requires PyIceberg >= 0.7.0.
config = LoaderConfig(write_mode="upsert", join_cols=["id"])
load_data_to_iceberg(table_data=data, table_identifier=("my_db", "my_table"), catalog=catalog, config=config)
Batch Loading
For large datasets, use load_batches_to_iceberg with an iterator of RecordBatches:
from iceberg_loader import load_batches_to_iceberg
def batch_generator():
for i in range(10):
yield some_record_batch
result = load_batches_to_iceberg(
batch_iterator=batch_generator(),
table_identifier=("my_db", "large_table"),
catalog=catalog,
config=LoaderConfig(write_mode="append", commit_interval=100),
)
Stream Loading (Arrow IPC)
Load data directly from an Apache Arrow IPC stream (e.g., from a file or network socket):
from iceberg_loader import load_ipc_stream_to_iceberg
# stream_source can be a file path or a file-like object (BytesIO)
result = load_ipc_stream_to_iceberg(
stream_source="data.arrow",
table_identifier=("my_db", "stream_table"),
catalog=catalog,
config=LoaderConfig(write_mode="append"),
)
Custom Settings
You can override default table properties:
custom_props = {
'write.parquet.compression-codec': 'snappy',
'history.expire.min-snapshots-to-keep': 5,
}
config = LoaderConfig(table_properties=custom_props, write_mode="append")
load_data_to_iceberg(..., config=config)
Maintenance helper
from iceberg_loader import expire_snapshots
table = catalog.load_table(("db", "users"))
expire_snapshots(table, keep_last=2)
API Reference
load_batches_to_iceberg()
Main function for loading a stream of batches into an Iceberg table.
def load_batches_to_iceberg(
batch_iterator: Iterator[pa.RecordBatch] | pa.RecordBatchReader,
table_identifier: tuple[str, str],
catalog: Catalog,
write_mode: Literal['overwrite', 'append', 'upsert'] = 'overwrite',
partition_col: str | None = None,
replace_filter: str | None = None,
schema_evolution: bool = False,
table_properties: dict[str, Any] | None = None,
commit_interval: int = 0,
join_cols: list[str] | None = None,
) -> dict[str, Any]
Parameters
batch_iterator (Iterator[pa.RecordBatch] | pa.RecordBatchReader)
- Iterator or reader that returns PyArrow RecordBatch objects
- Can be a generator, list of batches, or
pa.RecordBatchReader - Enables processing large volumes of data without loading the entire dataset into memory
table_identifier (tuple[str, str])
- Table identifier in the format
(namespace, table_name) - Example:
("my_database", "my_table") - If the table doesn't exist, it will be created automatically
catalog (Catalog)
- PyIceberg catalog instance (from
pyiceberg.catalog.load_catalog()) - Manages metadata and connection to the table storage
- Supports Hive, REST, Glue, and other catalog types
write_mode (Literal['overwrite', 'append', 'upsert'], default='overwrite')
- Data write mode:
'overwrite': First batch overwrites the table, subsequent batches are appended'append': All batches are appended to existing data'upsert': Merge operation (update/insert) based onjoin_cols
- Can be combined with
replace_filterfor idempotent writes (only for'append')
partition_col (str | None, default=None)
- Column name (and optional transform) for table partitioning
- Used only when creating a new table
- Supported syntax:
"col_name"(Identity transform)"year(col_name)""month(col_name)""day(col_name)""hour(col_name)""bucket(N, col_name)"(e.g.,"bucket(16, id)")"truncate(W, col_name)"(e.g.,"truncate(4, name)")
replace_filter (str | None, default=None)
- SQL-like filter for idempotent writes (works only with
write_mode='append') - Deletes existing rows matching the filter before the first write
- Example:
"event_date == '2023-01-01'"or"year == 2023 AND month == 1" - Used for safe partition reloading
join_cols (list[str] | None, default=None)
- List of column names to use as keys for upsert operations
- Required when
write_mode='upsert' - Example:
["id"]or["user_id", "date"]
schema_evolution (bool, default=False)
- Enables automatic table schema evolution
- When
True: new columns from incoming data are automatically added to the table - When
False: incoming data must match the existing schema - When schema changes, buffer is flushed and committed
table_properties (dict[str, Any] | None, default=None)
- Additional Iceberg table properties
- Applied only when creating a new table
- Examples:
{ 'write.parquet.compression-codec': 'zstd', 'write.metadata.compression-codec': 'gzip', 'history.expire.min-snapshots-to-keep': 10 }
commit_interval (int, default=0)
- Transaction commit frequency (number of batches)
0: all batches are written in a single transaction (default)> 0: commit is performed every N batches- Useful for long data streams to manage memory and create checkpoints
- Example:
commit_interval=100will create a snapshot every 100 batches
Return Value
Dictionary with loading results:
{
'rows_loaded': int, # Total number of rows loaded
'batches_processed': int, # Number of batches processed
'write_mode': str, # Write mode used
'partition_col': str | None, # Partitioning column
'schema_evolution': bool, # Whether schema_evolution was enabled
'commit_interval': int # Commit interval used
}
Usage Examples
Basic stream loading:
def generate_batches():
for i in range(100):
data = {"id": [i], "value": [f"row_{i}"]}
yield pa.RecordBatch.from_pydict(data)
result = load_batches_to_iceberg(
batch_iterator=generate_batches(),
table_identifier=("db", "table"),
catalog=catalog,
write_mode="append"
)
With commits for memory management:
result = load_batches_to_iceberg(
batch_iterator=large_batch_stream,
table_identifier=("db", "large_table"),
catalog=catalog,
commit_interval=50, # Commit every 50 batches
schema_evolution=True
)
Idempotent partition loading:
result = load_batches_to_iceberg(
batch_iterator=daily_batches,
table_identifier=("db", "events"),
catalog=catalog,
write_mode="append",
replace_filter="event_date == '2023-12-09'",
partition_col="event_date"
)
Development
This project uses Hatch as build backend. For local workflows prefer uv.
Run Tests
uv run python -m pytest
Linting
uv run ruff check
uv run ruff format --check
uv run mypy
Release
# Build artifacts
hatch build
# (Optional) check README/metadata
twine check dist/*
# Upload to TestPyPI
twine upload --repository testpypi dist/*
# Upload to PyPI
twine upload dist/*
Status / TestPyPI
Work in progress. Test build:
pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple "iceberg-loader[all]==0.0.1"
Package page: https://test.pypi.org/project/iceberg-loader/0.0.1/
GitHub Actions release runs on tag v* (see .github/workflows/release.yml). Required secrets:
PYPI_API_TOKENfor PyPITEST_PYPI_API_TOKEN(optional) for TestPyPI
Examples
See examples/ directory for runnable scripts (requires local docker environment):
cd examples && docker-compose up -d
hatch run python examples/load_example.py
hatch run python examples/advanced_scenarios.py
hatch run python examples/load_complex_json.py
hatch run python examples/load_stream.py
hatch run python examples/load_with_commits.py
hatch run python examples/load_from_api.py
Documentation
Rendered docs (MkDocs) live in docs/ (serve locally with mkdocs serve). The homepage mirrors the README for quick onboarding.
Contributing
We welcome contributions! See CONTRIBUTING.md for setup, coding style, and PR guidelines. Quick checks:
hatch run lint
hatch run test
Contributors
Thanks to all contributors who have helped make this project better!
Made with contrib.rocks.
License
iceberg-loader is distributed under the terms of the MIT license.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file iceberg_loader-0.0.4.tar.gz.
File metadata
- Download URL: iceberg_loader-0.0.4.tar.gz
- Upload date:
- Size: 7.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
54523ff4b504b0198aafccc9a1eec8ce78871fa7a1a89dedc163feeee6f24070
|
|
| MD5 |
b36b746b922262a9e2eb6887dc1c8c54
|
|
| BLAKE2b-256 |
252929f222df91ad6ed11e0edc94670ab0b17c7876aa08de430a941a7b969c97
|
File details
Details for the file iceberg_loader-0.0.4-py3-none-any.whl.
File metadata
- Download URL: iceberg_loader-0.0.4-py3-none-any.whl
- Upload date:
- Size: 19.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
26e85e67e2707b8ca93740a3bcf4376d47a082ef11d3d60894e2b0ff623a689e
|
|
| MD5 |
b75c294f1f958667aad8f66085315cf6
|
|
| BLAKE2b-256 |
c9a76e8a7f7a7443db2faf2b3e9ab8450c886929860274dcbc5d398d09cadf26
|