A PySpark utility for managing partitioned Apache Iceberg tables.
This project has been archived.
The maintainers of this project have marked this project as archived. No new releases are expected.
Project description
keplerio 🧊
A production-grade PySpark utility for managing partitioned and optimized Apache Iceberg tables. It simplifies writing, reading, updating, and archiving data with features designed for performance and ease of use.
Key Features
- Standardized Schema: Enforces a consistent schema for all tables, automatically adding and typing required columns.
- Optimized Writes: Supports both Copy-on-Write (CoW) for read-heavy workloads and Merge-on-Read (MoR) for fast, frequent updates.
- Z-Ordering: Automatically configures Z-Ordering on key columns to dramatically speed up queries.
- Flexible Updates: Update single records with a simple dictionary or thousands of records in bulk with a DataFrame.
- Partition Pruning: Optimized update and read functions that leverage Iceberg's date partitions for maximum performance.
- Data Archiving: Includes a simple function to move old data to an archive table, keeping your primary tables lean and fast.
Installation
pip install keplerio
Setup: Spark Session
keplerio requires an active Spark Session configured for Apache Iceberg. Here is a typical setup:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName("KeplerIO Example") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.getOrCreate()
Usage
Pushing Data (push_to_kepler)
Use push_to_kepler to write data. It will create the table on the first run with all the recommended optimizations.
import keplerio
from pyspark.sql.types import *
from datetime import date
# Create a sample DataFrame
data = [(1, 101, 1001, "95.5", date(2025, 10, 10))]
schema = StructType([
StructField("id_kepler", LongType()),
StructField("id_client", StringType()),
StructField("id_transaction", StringType()),
StructField("pourcentage", StringType()),
StructField("date_transaction", DateType())
])
df = spark.createDataFrame(data, schema)
# Push the data
# Use 'merge-on-read' for tables you intend to update frequently
keplerio.push_to_kepler(
spark=spark,
usecase_id="123",
df=df,
mode='overwrite', # 'overwrite' or 'append'
write_strategy='merge-on-read' # Use 'copy-on-write' for read-heavy tables
)
Reading Data (read_from_kepler)
Query your table with powerful and efficient filters.
# Read the entire table
df_all = keplerio.read_from_kepler(spark, usecase_id="123")
# Read a specific date range, selecting only certain columns
df_filtered = keplerio.read_from_kepler(
spark=spark,
usecase_id="123",
start_date="2025-10-01",
end_date="2025-10-31",
columns=["id_kepler", "statut", "assigned_to"]
)
df_filtered.show()
Updating Data (update_kepler)
Update records efficiently. This works best on tables created with the 'merge-on-read' strategy.
Single Record Update (Convenient)
Use a Python dictionary for simple, single-row updates.
update_payload = {
"id_kepler": 1,
"statut": "Completed",
"assigned_to": "analyst_a"
}
# The transaction_date makes the update much faster via partition pruning
keplerio.update_kepler(
spark=spark,
usecase_id="123",
updates=update_payload,
transaction_date="2025-10-10"
)
Bulk Record Update (Powerful)
Use a Spark DataFrame to update thousands of records at once.
bulk_updates = [
(10, "Flagged", "investigation_team"),
(25, "Closed", "analyst_b")
]
bulk_df = spark.createDataFrame(bulk_updates, ["id_kepler", "statut", "assigned_to"])
keplerio.update_kepler(
spark=spark,
usecase_id="123",
updates=bulk_df,
transaction_date="2025-10-10" # Assumes all updates are for this date
)
Archiving Data (archive_from_kepler)
Move old data to an archive table (uc_123_archive) to keep your main table fast.
# Archive all data before January 1st, 2025
keplerio.archive_from_kepler(
spark=spark,
usecase_id="123",
archive_before_date="2025-01-01"
)
License
Distributed under the MIT License. See LICENSE for more information.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file keplerio-13.9.9.tar.gz.
File metadata
- Download URL: keplerio-13.9.9.tar.gz
- Upload date:
- Size: 78.0 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e934a594dff1bf2dd47f00cd11d54edf5631ab69fa2dd962fe04203b7c12935b
|
|
| MD5 |
83292e85e913ca6e9008411c5ad7c3bd
|
|
| BLAKE2b-256 |
08c2b743341cbecf0095df34d5fc7d05fa864c2cc54196eeab128f3c91c10379
|