Skip to main content

A PySpark utility for managing partitioned Apache Iceberg tables.

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

keplerio 🧊

PyPI version

A production-grade PySpark utility for managing partitioned and optimized Apache Iceberg tables. It simplifies writing, reading, updating, and archiving data with features designed for performance and ease of use.

Key Features

  • Standardized Schema: Enforces a consistent schema for all tables, automatically adding and typing required columns.
  • Optimized Writes: Supports both Copy-on-Write (CoW) for read-heavy workloads and Merge-on-Read (MoR) for fast, frequent updates.
  • Z-Ordering: Automatically configures Z-Ordering on key columns to dramatically speed up queries.
  • Flexible Updates: Update single records with a simple dictionary or thousands of records in bulk with a DataFrame.
  • Partition Pruning: Optimized update and read functions that leverage Iceberg's date partitions for maximum performance.
  • Data Archiving: Includes a simple function to move old data to an archive table, keeping your primary tables lean and fast.

Installation

pip install keplerio

Setup: Spark Session

keplerio requires an active Spark Session configured for Apache Iceberg. Here is a typical setup:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("KeplerIO Example") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .getOrCreate()

Usage

Pushing Data (push_to_kepler)

Use push_to_kepler to write data. It will create the table on the first run with all the recommended optimizations.

import keplerio
from pyspark.sql.types import *
from datetime import date

# Create a sample DataFrame
data = [(1, 101, 1001, "95.5", date(2025, 10, 10))]
schema = StructType([
    StructField("id_kepler", LongType()),
    StructField("id_client", StringType()),
    StructField("id_transaction", StringType()),
    StructField("pourcentage", StringType()),
    StructField("date_transaction", DateType())
])
df = spark.createDataFrame(data, schema)

# Push the data
# Use 'merge-on-read' for tables you intend to update frequently
keplerio.push_to_kepler(
    spark=spark,
    usecase_id="123",
    df=df,
    mode='overwrite', # 'overwrite' or 'append'
    write_strategy='merge-on-read' # Use 'copy-on-write' for read-heavy tables
)

Reading Data (read_from_kepler)

Query your table with powerful and efficient filters.

# Read the entire table
df_all = keplerio.read_from_kepler(spark, usecase_id="123")

# Read a specific date range, selecting only certain columns
df_filtered = keplerio.read_from_kepler(
    spark=spark,
    usecase_id="123",
    start_date="2025-10-01",
    end_date="2025-10-31",
    columns=["id_kepler", "statut", "assigned_to"]
)
df_filtered.show()

Updating Data (update_kepler)

Update records efficiently. This works best on tables created with the 'merge-on-read' strategy.

Single Record Update (Convenient)

Use a Python dictionary for simple, single-row updates.

update_payload = {
    "id_kepler": 1,
    "statut": "Completed",
    "assigned_to": "analyst_a"
}

# The transaction_date makes the update much faster via partition pruning
keplerio.update_kepler(
    spark=spark,
    usecase_id="123",
    updates=update_payload,
    transaction_date="2025-10-10"
)

Bulk Record Update (Powerful)

Use a Spark DataFrame to update thousands of records at once.

bulk_updates = [
    (10, "Flagged", "investigation_team"),
    (25, "Closed", "analyst_b")
]
bulk_df = spark.createDataFrame(bulk_updates, ["id_kepler", "statut", "assigned_to"])

keplerio.update_kepler(
    spark=spark,
    usecase_id="123",
    updates=bulk_df,
    transaction_date="2025-10-10" # Assumes all updates are for this date
)

Archiving Data (archive_from_kepler)

Move old data to an archive table (uc_123_archive) to keep your main table fast.

# Archive all data before January 1st, 2025
keplerio.archive_from_kepler(
    spark=spark,
    usecase_id="123",
    archive_before_date="2025-01-01"
)

License

Distributed under the MIT License. See LICENSE for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

keplerio-13.9.9.tar.gz (78.0 MB view details)

Uploaded Source

File details

Details for the file keplerio-13.9.9.tar.gz.

File metadata

  • Download URL: keplerio-13.9.9.tar.gz
  • Upload date:
  • Size: 78.0 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.3

File hashes

Hashes for keplerio-13.9.9.tar.gz
Algorithm Hash digest
SHA256 e934a594dff1bf2dd47f00cd11d54edf5631ab69fa2dd962fe04203b7c12935b
MD5 83292e85e913ca6e9008411c5ad7c3bd
BLAKE2b-256 08c2b743341cbecf0095df34d5fc7d05fa864c2cc54196eeab128f3c91c10379

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page