Skip to main content

Polars-based persistent tabular store backed by Parquet or Arrow IPC files

Project description

shackleton

pypi

Persistent tabular store backed by Parquet or Arrow IPC, built on polars.

  • Partitioned writes — by column value or xxhash bucket
  • Sorted merge — maintain sorted order on extend via id_col
  • Lazy reads — returns pl.LazyFrame so polars can push down predicates
  • Schema evolution — diagonal concat handles mismatched columns transparently
  • Thread-safe — per-directory stdlib locks; polars I/O releases the GIL

Single dependency: polars.


Install

pip install shackleton

Quick start

from pathlib import Path
import polars as pl
from shackleton import TableRepo

repo = TableRepo(Path("/tmp/mydata"))

# Write
repo.extend(pl.DataFrame({"id": [1, 2, 3], "value": [10.0, 20.0, 30.0]}))
repo.extend(pl.DataFrame({"id": [4, 5], "value": [40.0, 50.0]}))

# Read
df = repo.get_full_df()         # pl.DataFrame — all rows
lf = repo.get_full_lf()         # pl.LazyFrame — push down filters yourself

# Overwrite everything
repo.replace_all(pl.DataFrame({"id": [99], "value": [0.0]}))

# Delete all files
repo.purge()

Column partitioning

Data is split into hive-style directories (col=val/data.parquet).

repo = TableRepo(Path("/tmp/events"), partition_cols=["region", "env"])

repo.extend(pl.DataFrame({
    "region": ["us", "us", "eu"],
    "env":    ["prod", "dev", "prod"],
    "count":  [100, 50, 80],
}))

# Read only us/prod — no full scan
df = repo.get_partition_df({"region": "us", "env": "prod"})

# Wildcard: all eu partitions
df_eu = repo.get_partition_df({"region": "eu"})

Hash partitioning

When partition values have unbounded cardinality, hash into fixed buckets instead. Uses polars' native xxhash — vectorised, GIL-free.

from shackleton import HashPartitioner

hp = HashPartitioner(col="user_id", num_groups=128)
repo = TableRepo(Path("/tmp/users"), partition_cols=hp)

repo.extend(pl.DataFrame({
    "user_id": ["alice", "bob", "carol", "dave"],
    "score":   [0.9, 0.7, 0.8, 0.6],
}))

df = repo.get_full_df()
# The hash key column is stripped — only original columns are stored

Deduplication with dedup_cols

When dedup_cols is set, each extend drops duplicate rows on those columns after merging with existing data — only the first occurrence is kept. Combine with id_col for sorted-merge dedup; use alone for unordered dedup. Not supported with max_records > 0 (raises at construction time).

repo = TableRepo(Path("/tmp/events"), dedup_cols=["event_id"])

repo.extend(pl.DataFrame({"event_id": [1, 2, 3], "val": [10, 20, 30]}))
# Re-ingest with overlapping IDs — existing rows are kept, new ones appended
repo.extend(pl.DataFrame({"event_id": [2, 4], "val": [99, 40]}))

repo.get_full_df().shape  # (4, 2) — event_id 2 kept once

Sorted merge with id_col

When id_col is set, each extend keeps the file sorted by that column (when max_records == 0) and replace_records does an upsert: matching IDs are updated, new IDs appended.

repo = TableRepo(Path("/tmp/prices"), id_col="ticker")

repo.extend(pl.DataFrame({
    "ticker": ["AAPL", "GOOG", "MSFT"],
    "price":  [180.0, 140.0, 380.0],
}))

# Update GOOG, add TSLA
repo.replace_records(pl.DataFrame({
    "ticker": ["GOOG", "TSLA"],
    "price":  [145.0, 220.0],
}))

repo.get_full_df().sort("ticker")
# ┌────────┬───────┐
# │ ticker ┆ price │
# ╞════════╪═══════╡
# │ AAPL   ┆ 180.0 │
# │ GOOG   ┆ 145.0 │
# │ MSFT   ┆ 380.0 │
# │ TSLA   ┆ 220.0 │
# └────────┴───────┘

File size limits with max_records

Cap each file at N rows. Useful when downstream tools (Spark, DuckDB) benefit from many smaller files, or when partial reads matter.

repo = TableRepo(Path("/tmp/logs"), max_records=100_000)

for batch in my_log_stream():
    repo.extend(batch)
# Creates file-00000000000000000000.parquet, file-00000000000000000001.parquet, …

Streaming ingestion with batch writers

For row-at-a-time or small-DataFrame pipelines: accumulate in memory, flush in batches.

# Dict record writer — for APIs / scrapers
with repo.get_extending_record_writer(batch_size=50_000) as writer:
    for event in event_stream():
        writer.add_to_batch({"ts": event.ts, "value": event.value})
# Final partial batch is flushed on context exit

# DataFrame writer — for chunked ETL
with repo.get_extending_df_writer(batch_size=500_000) as writer:
    for chunk_df in chunked_source():
        writer.add_to_batch(chunk_df)

# Replacing variants — upsert on each flush
with repo.get_replacing_record_writer(batch_size=10_000) as writer:
    for record in updates():
        writer.add_to_batch(record)

Schema evolution

Columns added in later writes fill missing values with null in earlier files. No migration step needed.

repo = TableRepo(Path("/tmp/evolving"))
repo.extend(pl.DataFrame({"a": [1, 2], "b": [3, 4]}))
repo.extend(pl.DataFrame({"a": [5], "c": [6]}))        # new column c

df = repo.get_full_df()
# shape: (3, 3) — columns a, b, c — nulls where data didn't exist

Partition-level overwrite

replace_partition atomically clears and rewrites only the partitions covered by the DataFrame. purge_partition deletes matching files without rewriting.

repo = TableRepo(Path("/tmp/events"), partition_cols=["date"])
repo.extend(pl.DataFrame({
    "date": ["2024-01-01", "2024-01-01", "2024-01-02"],
    "val":  [1, 2, 3],
}))

# Replace just the 2024-01-01 partition with corrected data
repo.replace_partition(pl.DataFrame({
    "date": ["2024-01-01"],
    "val":  [99],
}))

# Or wipe it without rewriting
repo.purge_partition({"date": "2024-01-01"})

Compression

Control the codec and, for Parquet, the compression level.

repo = TableRepo(Path("/tmp/data"), compression="zstd", compression_level=3)
repo_ipc = TableRepo(Path("/tmp/fast"), ipc=True, compression="lz4")

Arrow IPC format

Slightly faster reads for local workloads that don't need Parquet compatibility.

repo = TableRepo(Path("/tmp/fast"), ipc=True)   # writes .arrow files
repo.extend(df)
df = repo.get_full_df()

Lazy reads and predicate pushdown

get_full_lf() and get_partition_lf() return pl.LazyFrame. Polars will push predicates and column selections into the scan.

result = (
    repo.get_full_lf()
    .filter(pl.col("region") == "us")
    .select(["ticker", "price"])
    .sort("price", descending=True)
    .limit(10)
    .collect()
)

Parallel partition processing with map_partitions

Apply a function to each partition's collected DataFrame in parallel. All files within a partition are combined before your function is called, so it always receives one complete DataFrame per partition.

repo = TableRepo(Path("/tmp/events"), partition_cols=["region"])
repo.extend(pl.DataFrame({
    "region": ["us", "us", "eu", "eu", "ap"],
    "revenue": [100.0, 200.0, 150.0, 50.0, 300.0],
}))

# Sequential (workers=None, default)
totals = repo.map_partitions(lambda df: df["revenue"].sum())
# e.g. [300.0, 200.0, 300.0]  — one value per partition

# Parallel with auto-detected thread count
totals = repo.map_partitions(lambda df: df["revenue"].sum(), workers=0)

# Parallel with fixed thread pool
totals = repo.map_partitions(lambda df: df["revenue"].sum(), workers=4)

The workers parameter controls parallelism:

workers Behaviour
None Sequential — no thread pool
0 Thread pool sized to os.cpu_count()
> 0 Thread pool of exactly that size

Because polars I/O releases the GIL, threads overlap on I/O with no contention.

The function can return any type; results preserve partition order.

# Compute per-partition summary stats
def summarise(df: pl.DataFrame) -> dict:
    return {"n": len(df), "mean": df["revenue"].mean()}

stats = repo.map_partitions(summarise, workers=0)
# [{"n": 2, "mean": 150.0}, {"n": 2, "mean": 100.0}, {"n": 1, "mean": 300.0}]

# Collect into a single DataFrame
import polars as pl
result = pl.DataFrame(repo.map_partitions(summarise, workers=0))

Performance

Benchmarked on a single core, NVMe SSD, 1 million rows (4 columns: i64, f64, str, str). Run uv run bench.py to reproduce.

Scenario Throughput
extend — single file ~33M rows/s
extend — sorted merge (id_col) ~49M rows/s
extend — 26 column partitions ~11M rows/s
extend — 64 hash partitions ~7M rows/s
extend — max_records=10k ~7M rows/s
extend — IPC format ~23M rows/s
read full (collect) ~131M rows/s
read full (lazy + filter) ~170M rows/s
read single partition (1 of 26) ~18M rows/s
read full — IPC format ~102M rows/s
replace_records (10k upserts) ~107k rows/s

replace_records does a full read-merge-rewrite, so throughput scales with existing file size, not batch size. Use partitioned repos to bound the merge scope.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shackleton-1.2.0.tar.gz (22.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

shackleton-1.2.0-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file shackleton-1.2.0.tar.gz.

File metadata

  • Download URL: shackleton-1.2.0.tar.gz
  • Upload date:
  • Size: 22.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for shackleton-1.2.0.tar.gz
Algorithm Hash digest
SHA256 6bc9bc6ce016acd681d29c8c4d46c7a81c763823fc569655135405ea9d094098
MD5 b940df369cebd21011ebfa42a18290f2
BLAKE2b-256 03c8157f7876c4840480258e877f45e088fa6534fe8d6a853184cae8bf3ae5c0

See more details on using hashes here.

File details

Details for the file shackleton-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: shackleton-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 14.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for shackleton-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2fd8656bfc89a53d3836d1b5483235147cecdc15569d82ad68b16bf090a8170a
MD5 84656011d3e5d897e3442cd2543aeebb
BLAKE2b-256 1fd477ed0b6baddb3e75d8862d2dbc04d012a91377b53dc5d1a9a13597046dd0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page