Skip to main content

Polars-based persistent tabular store backed by Parquet or Arrow IPC files

Project description

shackleton

pypi

Persistent tabular store backed by Parquet or Arrow IPC, built on polars.

  • Partitioned writes — by column value or xxhash bucket
  • Sorted merge — maintain sorted order on extend via id_col
  • Lazy reads — returns pl.LazyFrame so polars can push down predicates
  • Schema evolution — diagonal concat handles mismatched columns transparently
  • Thread-safe — per-directory stdlib locks; polars I/O releases the GIL

Single dependency: polars.


Install

pip install shackleton

Quick start

from pathlib import Path
import polars as pl
from shackleton import TableRepo

repo = TableRepo(Path("/tmp/mydata"))

# Write
repo.extend(pl.DataFrame({"id": [1, 2, 3], "value": [10.0, 20.0, 30.0]}))
repo.extend(pl.DataFrame({"id": [4, 5], "value": [40.0, 50.0]}))

# Read
df = repo.get_full_df()         # pl.DataFrame — all rows
lf = repo.get_full_lf()         # pl.LazyFrame — push down filters yourself

# Overwrite everything
repo.replace_all(pl.DataFrame({"id": [99], "value": [0.0]}))

# Delete all files
repo.purge()

Column partitioning

Data is split into hive-style directories (col=val/data.parquet).

repo = TableRepo(Path("/tmp/events"), partition_cols=["region", "env"])

repo.extend(pl.DataFrame({
    "region": ["us", "us", "eu"],
    "env":    ["prod", "dev", "prod"],
    "count":  [100, 50, 80],
}))

# Read only us/prod — no full scan
df = repo.get_partition_df({"region": "us", "env": "prod"})

# Wildcard: all eu partitions
df_eu = repo.get_partition_df({"region": "eu"})

Hash partitioning

When partition values have unbounded cardinality, hash into fixed buckets instead. Uses polars' native xxhash — vectorised, GIL-free.

from shackleton import HashPartitioner

hp = HashPartitioner(col="user_id", num_groups=128)
repo = TableRepo(Path("/tmp/users"), partition_cols=hp)

repo.extend(pl.DataFrame({
    "user_id": ["alice", "bob", "carol", "dave"],
    "score":   [0.9, 0.7, 0.8, 0.6],
}))

df = repo.get_full_df()
# The hash key column is stripped — only original columns are stored

Sorted merge with id_col

When id_col is set, each extend keeps the file sorted by that column and replace_records does an upsert: matching IDs are updated, new IDs appended.

repo = TableRepo(Path("/tmp/prices"), id_col="ticker")

repo.extend(pl.DataFrame({
    "ticker": ["AAPL", "GOOG", "MSFT"],
    "price":  [180.0, 140.0, 380.0],
}))

# Update GOOG, add TSLA
repo.replace_records(pl.DataFrame({
    "ticker": ["GOOG", "TSLA"],
    "price":  [145.0, 220.0],
}))

repo.get_full_df().sort("ticker")
# ┌────────┬───────┐
# │ ticker ┆ price │
# ╞════════╪═══════╡
# │ AAPL   ┆ 180.0 │
# │ GOOG   ┆ 145.0 │
# │ MSFT   ┆ 380.0 │
# │ TSLA   ┆ 220.0 │
# └────────┴───────┘

File size limits with max_records

Cap each file at N rows. Useful when downstream tools (Spark, DuckDB) benefit from many smaller files, or when partial reads matter.

repo = TableRepo(Path("/tmp/logs"), max_records=100_000)

for batch in my_log_stream():
    repo.extend(batch)
# Creates file-00000000000000000000.parquet, file-00000000000000000001.parquet, …

Streaming ingestion with batch writers

For row-at-a-time or small-DataFrame pipelines: accumulate in memory, flush in batches.

# Dict record writer — for APIs / scrapers
with repo.get_extending_record_writer(batch_size=50_000) as writer:
    for event in event_stream():
        writer.add_to_batch({"ts": event.ts, "value": event.value})
# Final partial batch is flushed on context exit

# DataFrame writer — for chunked ETL
with repo.get_extending_df_writer(batch_size=500_000) as writer:
    for chunk_df in chunked_source():
        writer.add_to_batch(chunk_df)

# Replacing variants — upsert on each flush
with repo.get_replacing_record_writer(batch_size=10_000) as writer:
    for record in updates():
        writer.add_to_batch(record)

Schema evolution

Columns added in later writes fill missing values with null in earlier files. No migration step needed.

repo = TableRepo(Path("/tmp/evolving"))
repo.extend(pl.DataFrame({"a": [1, 2], "b": [3, 4]}))
repo.extend(pl.DataFrame({"a": [5], "c": [6]}))        # new column c

df = repo.get_full_df()
# shape: (3, 3) — columns a, b, c — nulls where data didn't exist

Arrow IPC format

Slightly faster reads for local workloads that don't need Parquet compatibility.

repo = TableRepo(Path("/tmp/fast"), ipc=True)   # writes .arrow files
repo.extend(df)
df = repo.get_full_df()

Lazy reads and predicate pushdown

get_full_lf() and get_partition_lf() return pl.LazyFrame. Polars will push predicates and column selections into the scan.

result = (
    repo.get_full_lf()
    .filter(pl.col("region") == "us")
    .select(["ticker", "price"])
    .sort("price", descending=True)
    .limit(10)
    .collect()
)

Parallel partition processing with map_partitions

Apply a function to each partition's collected DataFrame in parallel. All files within a partition are combined before your function is called, so it always receives one complete DataFrame per partition.

repo = TableRepo(Path("/tmp/events"), partition_cols=["region"])
repo.extend(pl.DataFrame({
    "region": ["us", "us", "eu", "eu", "ap"],
    "revenue": [100.0, 200.0, 150.0, 50.0, 300.0],
}))

# Sequential (workers=None, default)
totals = repo.map_partitions(lambda df: df["revenue"].sum())
# e.g. [300.0, 200.0, 300.0]  — one value per partition

# Parallel with auto-detected thread count
totals = repo.map_partitions(lambda df: df["revenue"].sum(), workers=0)

# Parallel with fixed thread pool
totals = repo.map_partitions(lambda df: df["revenue"].sum(), workers=4)

The workers parameter controls parallelism:

workers Behaviour
None Sequential — no thread pool
0 Thread pool sized to os.cpu_count()
> 0 Thread pool of exactly that size

Because polars I/O releases the GIL, threads overlap on I/O with no contention.

The function can return any type; results preserve partition order.

# Compute per-partition summary stats
def summarise(df: pl.DataFrame) -> dict:
    return {"n": len(df), "mean": df["revenue"].mean()}

stats = repo.map_partitions(summarise, workers=0)
# [{"n": 2, "mean": 150.0}, {"n": 2, "mean": 100.0}, {"n": 1, "mean": 300.0}]

# Collect into a single DataFrame
import polars as pl
result = pl.DataFrame(repo.map_partitions(summarise, workers=0))

Performance

Benchmarked on a single core, NVMe SSD, 1 million rows (4 columns: i64, f64, str, str). Run uv run bench.py to reproduce.

Scenario Throughput
extend — single file ~33M rows/s
extend — sorted merge (id_col) ~49M rows/s
extend — 26 column partitions ~11M rows/s
extend — 64 hash partitions ~7M rows/s
extend — max_records=10k ~7M rows/s
extend — IPC format ~23M rows/s
read full (collect) ~131M rows/s
read full (lazy + filter) ~170M rows/s
read single partition (1 of 26) ~18M rows/s
read full — IPC format ~102M rows/s
replace_records (10k upserts) ~107k rows/s

replace_records does a full read-merge-rewrite, so throughput scales with existing file size, not batch size. Use partitioned repos to bound the merge scope.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shackleton-1.1.0.tar.gz (20.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

shackleton-1.1.0-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file shackleton-1.1.0.tar.gz.

File metadata

  • Download URL: shackleton-1.1.0.tar.gz
  • Upload date:
  • Size: 20.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for shackleton-1.1.0.tar.gz
Algorithm Hash digest
SHA256 9238651ba076212b0c9143ab8a325e471e41c66c66edbbce6d85fa92ca1d7903
MD5 d5e599ad50c1e5ec522e63a283a330b1
BLAKE2b-256 8b5347a4ba4516ad969d8dacab861eebe27b79aedc24d288eed141821c8eb181

See more details on using hashes here.

File details

Details for the file shackleton-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: shackleton-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for shackleton-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4221c744fbfd4cec53d2bf18ab20518bce9d212219684ba8f6a74143b9a8e3a6
MD5 a2693bc6f0d60e45520a5187586cebc9
BLAKE2b-256 3bd294bffc717ddcac95e6a1d7db863804db0b2d359b9b833cd51ee7bad9c200

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page