Skip to main content

Unified tabular + vector storage in a single Iceberg-compatible file

Project description

ailake — AI-Lake Format Python SDK

Version: 0.0.27 — Unified storage for tabular data, embeddings, and HNSW vector index in a single Parquet-compatible file. Apache Iceberg Spec v2/v3 compatible.

Install

pip install ailake

Requires Python ≥ 3.9. Dependencies: pyarrow >= 14.0, numpy >= 1.24.

Quickstart

Write + search — fluent API (recommended)

import ailake
import numpy as np

# Open or create a table
table = ailake.open_table(
    "./my_table",
    dim=1536,
    metric="cosine",          # cosine | euclidean | dot_product | normalized_cosine
    pre_normalize=True,       # normalize at write time; enables fast 1-dot(a,b) path
    hnsw_m=16,                # HNSW connections per node (default 16)
    hnsw_ef_construction=150,
    embedding_model="text-embedding-3-small",  # tracked in Iceberg metadata
    embedding_model_version="v1",
)

texts = ["Document about AI", "Another document"]
embeddings = np.random.rand(2, 1536).astype(np.float32)

table.insert(texts, embeddings)   # accepts list or numpy array
snapshot_id = table.commit()

# Pattern B — auto-embed without passing embeddings explicitly
def my_embed(texts: list[str]) -> list[list[float]]:
    return np.random.rand(len(texts), 1536).tolist()  # replace with real model

table2 = ailake.open_table("./my_table2", dim=1536, embed_fn=my_embed)
table2.insert(["Document about AI", "Another document"])  # embed_fn called automatically
table2.commit()

# Pointer-only search (default — backward-compatible)
df      = table.search(embeddings[0], top_k=10).to_pandas()   # row_id, distance, file
lf      = table.search(embeddings[0]).limit(5).to_polars()
results = table.search(embeddings[0]).to_list()   # list[dict]

# Full row data — all Parquet columns + _distance
df_full = table.search(embeddings[0], top_k=10, fetch_data=True).to_pandas()

Async API

import ailake, asyncio
import numpy as np

async def main():
    table = ailake.open_table("./my_table", dim=1536)
    await table.insert_async(texts, embeddings)
    await table.commit_async()

    # fluent async chain
    df = await table.search(query_vec).limit(10).to_pandas_async()

    # parallel searches via asyncio.gather
    r1, r2 = await asyncio.gather(
        table.search(q1).to_list_async(),
        table.search(q2).to_list_async(),
    )

asyncio.run(main())

Module-level search

import ailake
import numpy as np

query = np.random.rand(1536).astype(np.float32)

df     = ailake.search("./my_table", query, top_k=10).to_pandas()
lf     = ailake.search("./my_table", query).limit(5).to_polars()
items  = ailake.search("./my_table", query).to_list()

Assemble context for LLMs

import ailake

chunks = [
    {
        "document_id": "doc-1",
        "chunk_index": 0,
        "chunk_text": "AI-Lake stores vectors and tabular data together.",
        "document_title": "AI-Lake Overview",
        "section_path": "Introduction",
        "source_uri": "s3://my-lake/docs/overview.pdf",
        "distance": 0.12,
    },
]

context_xml = ailake.assemble_context(
    chunks=chunks,
    max_tokens=4096,       # token budget (4 chars ≈ 1 token)
    dedup_threshold=0.05,  # drop near-duplicate chunks
)
# Pass context_xml directly to Claude / GPT-4 as a user message

API reference

open_table(path, *, ...) → Table

Opens or creates an AI-Lake table at path.

Parameter Default Description
path required Table root (local, s3://, gs://, az://)
vector_column "embedding" Vector column name
dim 1536 Embedding dimension
metric "cosine" cosine, euclidean, dot_product, normalized_cosine
pre_normalize False Normalize to unit L2 at write; enables 1-dot(a,b) fast path (~12-20 % speedup)
hnsw_m None (=16) HNSW connections per node
hnsw_ef_construction None (=150) HNSW build pool size
pq_only False Discard raw F16 vectors after index build — only PQ codes stored. ~98 % storage reduction; reranking disabled; recall@10 ~93-95 %.
ivf_residual False Encode vec − cluster_centroid per IVF cell (residual PQ). Same storage as standard PQ; ~2-4 pp better recall@10.
embedding_model None Embedding model name stored in Iceberg properties (ailake.embedding-model). Used for mismatch detection and migration tracking.
embedding_model_version None Optional model version. Stored as "<name>@<version>" in Iceberg properties.
fts_text_columns None List of text column names to index with Tantivy FTS (e.g. ["chunk_text", "document_title"]). When set, each file gets an AILK_FTS section; search_text() uses O(log N) Tantivy path instead of BM25 brute-force.
embed_fn None Auto-embed callable list[str] → list[list[float]]. When set, insert(texts) and write_batch(texts) can be called without passing embeddings — the callable is invoked automatically.
partition_by None Single-column Iceberg identity partition (e.g. "agent_id"). Stored in metadata.json. Prefer partition_fields for new tables.
partition_value None Per-write value for partition_by. Tagged in key_metadata; used for manifest-level pruning at search time.
partition_fields None Multi-column Iceberg partition spec. List of (column, transform, column_type) tuples. Supports all Iceberg transforms: "identity", "year", "month", "day", "hour", "bucket[N]", "truncate[N]". Takes precedence over partition_by. Example: [("topic_id","identity","int"),("date","month","date")].
format_version 2 Iceberg format version. Set to 3 to write an Iceberg v3 table.

Table

Method Description
insert(texts, embeddings=None) → Table Buffer a batch. embeddings: list[list[float]] or numpy array. When embed_fn was set on open_table(), embeddings may be omitted — the callable is invoked automatically.
write_batch_auto_deferred(texts, embeddings=None) → Table Deferred write — Parquet persisted immediately (~200k vec/s); index (HNSW or IVF-PQ, auto-selected) built in a background thread. Shard served via flat scan until index ready.
commit() → int Persist as a new Iceberg snapshot; returns snapshot ID.
search(query, top_k=10, fetch_data=False, partition_filter=None, score_fn=None, hybrid_text=None, text_column="chunk_text", bm25_weight=0.5, pruning_threshold=None, ef_search=None) → SearchQuery Lazy, chainable search. query: list[float] or numpy array. fetch_data=True returns all Parquet columns + _distance. hybrid_text enables BM25+vector RRF fusion. pruning_threshold skips files whose centroid is farther than this from the query. ef_search overrides the HNSW search pool size. Raises ModelMismatch if query dim ≠ table dim.
insert_async(...) Async variant of insert.
write_batch_auto_deferred_async(...) Async variant of write_batch_auto_deferred.
commit_async() → int Async variant of commit.

Table is a context manager: with ailake.open_table(...) as t: ...

In Jupyter, table renders a styled HTML card showing path and vector config.

SearchQuery

Lazy result set — no I/O until materialised.

Method Description
limit(n) → SearchQuery Cap to n nearest neighbours (chainable).
to_list() → list[dict] Always pointer-only: [{"row_id": int, "distance": float, "file": str}, ...]
to_arrow() → pyarrow.Table Full row data (all columns + _distance) when fetch_data=True; pointer-only pyarrow.Table with columns row_id, distance, file otherwise.
to_pandas() → pd.DataFrame Full row DataFrame when fetch_data=True; pointer-only otherwise.
to_polars() → pl.DataFrame Full row DataFrame when fetch_data=True; pointer-only otherwise.
to_list_async() Async variant.
to_arrow_async() Async variant.
to_pandas_async() Async variant.
to_polars_async() Async variant.

In Jupyter, results renders as an HTML table when executed, pending state otherwise. When fetch_data=True, the HTML table shows all Parquet columns.

Full-read mode

# Pointer-only (default — backward-compatible)
df = ailake.search("./my_table", query, top_k=10).to_pandas()
# columns: row_id, distance, file

# Full row data — all Parquet columns + _distance
df = ailake.search("./my_table", query, top_k=10, fetch_data=True).to_pandas()
# columns: text, embedding, ..., _distance

# Same via Table handle
df = table.search(query, top_k=10, fetch_data=True).to_pandas()

fetch_data=True reads each matching Parquet file once and uses arrow_select::take to extract only the matched rows — no full table scan.

search(path, query, top_k=10, fetch_data=False, partition_filter=None, score_fn=None, hybrid_text=None, text_column="chunk_text", bm25_weight=0.5, pruning_threshold=None, ef_search=None) → SearchQuery

Module-level search returning the same chainable SearchQuery.

  • partition_filter — restrict to files with matching partition_value; pruning at manifest level before HNSW I/O.
  • hybrid_text — BM25 query string; when set, retrieves 10×top_k HNSW candidates and fuses via RRF with bm25_weight.
  • pruning_threshold — geometric pruning distance; files whose centroid distance exceeds this are skipped. Default None = no pruning.
  • ef_search — HNSW search pool size. Larger = higher recall, slower. Default None = table default (50).
  • score_fn — re-ranking callable (distance: float, row: Any) -> float. Requires fetch_data=True.

VectorColSpec(column, dim, metric="cosine", modality=None)

Declares one vector column for multi-column writes or searches.

Arg Description Example
column Parquet column name "image_embedding"
dim Embedding dimension 512
metric Distance metric "cosine"
modality Optional tag — stored as ailake.modality-<column> "text" / "image" / "audio" / "video"

TableWriter.write_batch_multi(texts, columns)

Write a batch with N independent vector columns in one call. Each column gets its own HNSW index in the AILK section of the file footer.

from ailake import TableWriter, VectorColSpec

text_spec  = VectorColSpec("embedding",       1536, "cosine", "text")
image_spec = VectorColSpec("image_embedding",  512, "cosine", "image")

writer = TableWriter("s3://my-lake/media/", dim=1536, metric="cosine")
writer.write_batch_multi(
    texts,
    [(text_spec, text_embeddings), (image_spec, image_embeddings)],
)
snapshot_id = writer.commit()

search_multimodal(path, queries, top_k=10) → list[dict]

Cross-modal search: fuse results from N vector columns via Reciprocal Rank Fusion.

rrf_score = Σ weight_i / (60 + rank_i) — higher is better.

results = ailake.search_multimodal(
    "s3://my-lake/media/",
    queries=[
        ("embedding",       text_vec,  0.7),   # 70% weight on text similarity
        ("image_embedding", image_vec, 0.3),   # 30% weight on image similarity
    ],
    top_k=20,
)
# Returns: [{"row_id": int, "rrf_score": float, "file": str}, ...]
# Ordered by descending rrf_score

Each column is searched by its own HNSW. Per-column dimensions are auto-detected from ailake.dim-<col> Iceberg properties written at commit() time — no dim argument needed when reading tables written with write_batch_multi.

Agent(table_path, embed_fn, agent_id=None) — Phase 9 episodic memory

High-level helper for agent frameworks (LangChain, CrewAI, AutoGen). Wraps TableWriter + search + ContextAssembler with hybrid scoring (distance × recency × importance) and automatic per-agent partition isolation.

import ailake

agent = ailake.Agent(
    table_path="s3://my-lake/agents/",
    embed_fn=my_embed_fn,         # list[str] → list[list[float]]
    agent_id="agent-uuid-here",   # isolates reads/writes to this agent's shard
)

# Store a memory with optional importance score
agent.remember("Deployment failed due to OOM on Tuesday", importance=0.9)

# Recall relevant memories — hybrid score = distance × recency × importance
results = agent.recall("deployment issues", top_k=5)

# Log a tool call for later retrieval
agent.log_tool_call(
    name="web_search",
    input={"q": "python asyncio timeout"},
    output={"hits": 5},
    outcome="success",
    latency_ms=120,
)

# Assemble context for LLM prompt (dedup + token budget)
context_xml = agent.assemble_context("why did deployment fail?", max_tokens=4096)
Method Description
remember(text, importance=1.0) Embeds text and stores it as an EpisodicMemorySchema row tagged with agent_id.
recall(query, top_k=5) Embeds query, searches with partition_filter=self.agent_id, applies hybrid score.
log_tool_call(name, input, output, outcome="success", latency_ms=0) Stores a ToolCallSchema row — searchable by tool name and context.
assemble_context(query, max_tokens=4096) recall() + ContextAssembler — returns prompt-ready XML.

migrate_embeddings(path, old_column, new_column, embed_fn, *, ...)

Re-embeds all chunks in a table with a new model, committing the result as a new Iceberg snapshot.

ailake.migrate_embeddings(
    path         = "s3://my-lake/docs/",
    old_column   = "embedding",        # existing vector column
    new_column   = "embedding_v2",     # destination column (may be same name)
    embed_fn     = my_embed_fn,        # callable: list[str] → list[list[float]]
    text_column  = "chunk_text",       # source text column
    strategy     = "dual_write_then_cutover",  # or "atomic_replace"
    batch_size   = 512,
    new_model    = "text-embedding-3-large",
    new_model_version = "v1",
    on_progress  = lambda *, files_done, files_total, rows_migrated: print(
        f"{files_done}/{files_total} files, {rows_migrated} rows"
    ),
)
Parameter Default Description
path required Table root URI
old_column required Existing vector column to migrate from
new_column required Destination vector column
embed_fn required list[str] → list[list[float]] callable
text_column "chunk_text" Parquet column containing the source text
strategy "dual_write_then_cutover" "dual_write_then_cutover" (zero downtime, 2× peak storage) or "atomic_replace" (lower storage, brief mixed-model window)
batch_size 512 Rows passed to embed_fn per call
new_model None Model name written to ailake.embedding-model after migration
new_model_version None Optional version suffix
on_progress None Callable invoked after each file with keyword args files_done, files_total, rows_migrated

TableWriter (low-level — use open_table() for most cases)

# Standard HNSW write with model tracking
writer = ailake.TableWriter(
    path, dim=1536, metric="cosine",
    embedding_model="text-embedding-3-small",
    embedding_model_version="v1",
)
writer.write_batch(texts, embeddings)
snapshot_id = writer.commit()

# Pattern B — auto-embed: omit embeddings, SDK calls embed_fn
writer = ailake.TableWriter(
    path, dim=1536,
    embed_fn=lambda texts: my_model.encode(texts).tolist(),
)
writer.write_batch(texts)  # no embeddings arg needed
writer.commit()

# PQ-only — raw vectors discarded after index build (~98 % storage reduction)
writer = ailake.TableWriter(path, dim=1536, metric="cosine", pq_only=True)
writer.write_batch(texts, embeddings)
writer.commit()

# Residual PQ — per-cluster encoding for better recall
writer = ailake.TableWriter(path, dim=1536, metric="cosine", ivf_residual=True)
writer.write_batch(texts, embeddings)
writer.commit()

# Deferred write — Parquet immediate, index background (~200k vec/s)
writer = ailake.TableWriter(path, dim=1536, metric="cosine")
writer.write_batch_auto_deferred(texts, embeddings)
writer.commit()

TableWriter parameters: same as open_table() (includes pq_only, ivf_residual, pre_normalize, hnsw_m, hnsw_ef_construction, embedding_model, embedding_model_version, embed_fn, partition_by, partition_value, partition_fields, format_version).

delete_where(path, column, values) → None

Commits an Iceberg equality delete. No data files are rewritten.

ailake.delete_where("./my_table", "id", ["doc-obsolete-1", "doc-obsolete-2"])

add_column(path, name, col_type, *, required=False, initial_default=None) → int

Adds column to live table schema. Returns new schema_id. No data files rewritten.

ailake.add_column("./my_table", "source_url", "string", required=False, initial_default="")

rename_column(path, old_name, new_name) → int

Renames column. Returns new schema_id.

hardware_info() → dict[str, str]

Returns hardware profile of current machine.

info = ailake.hardware_info()
# {
#   "backend":           "cpu-simd",   # or "nvidia-cuda" / "amd-rocm"
#   "has_cuda":          "false",
#   "has_rocm":          "false",
#   "cpu_logical_cores": "16",
#   "has_avx2":          "true",
#   "has_avx512":        "false",
#   "recommend_ivf_pq":  "true",       # true when has GPU OR (cores > 8 AND n >= 5000)
# }

Call before write_batch_auto_deferred to understand what index type will be selected.

compact(path, *, min_files=4, target_size_bytes=134217728, max_files_per_pass=20, deferred=False) → dict

Merges small files into a larger file and rebuilds the HNSW index. Returns {"ok": True, "files_compacted": N}. No-op when fewer than min_files qualify.

result = ailake.compact("s3://my-lake/docs/", min_files=5)
# {"ok": True, "files_compacted": 1, "output_path": "data/compacted-..."}

evolve_schema(path, *, add_columns=None, rename_columns=None) → int

Applies schema evolution in a single metadata-only call (no data files rewritten). Combines add_column + rename_column in order. Returns final schema_id.

ailake.evolve_schema(
    "s3://my-lake/docs/",
    add_columns=[{"name": "score", "type": "float", "initial_default": 0.0}],
    rename_columns=[{"from": "old_text", "to": "chunk_text"}],
)

now_ns() → int

Returns current Unix epoch time in nanoseconds. Use to populate created_at / last_accessed_at columns (Arrow Timestamp(ns, UTC)).

ts = ailake.now_ns()   # e.g. 1750000000000000000

delete_rows(path, file_path, row_ids) → None

Low-level Rust binding: physically removes rows from a specific Parquet file within the table, rebuilding the HNSW index. For logical Iceberg deletes (no file rewrite), use delete_where instead.

search_text(path, query_text, top_k=10, text_columns=None) → list[dict]

Pure BM25 full-text search — no HNSW required. O(N) brute-force scan; uses Tantivy O(log N) fast path for files that have a Tantivy FTS index embedded (written with fts_text_columns=).

# BM25 search (no embedding needed)
hits = ailake.search_text("s3://my-lake/docs/", "rust async programming", top_k=10)
# Returns: [{"row_id": int, "score": float, "file": str}]  (score: higher = more relevant)

# Restrict to specific text columns (default: ["chunk_text"])
hits = ailake.search_text(path, "query", text_columns=["chunk_text", "document_title"])

info(path) → dict

Returns table metadata including per-file index status. Useful for monitoring deferred index builds.

info = ailake.info("s3://my-lake/docs/")
# {"files": [...], "ready_files": 5, "indexing_files": 1, "failed_files": 0}
# Each entry in "files" has:
#   {"path": "data/part-00000.parquet", "index_status": "ready"|"indexing"|"failed",
#    "index_error": None|"k-means did not converge", "record_count": 50000}

When index_status == "failed", the file is served via flat scan (O(N) brute-force). The next compaction run automatically rebuilds the index.

assemble_context(chunks, max_tokens=4096, dedup_threshold=0.05) → str

Assembles chunk dicts into structured XML for LLM input. Deduplicates near-identical chunks within the token budget.

Storage modes and index types

Mode pq_only ivf_residual Storage (dim=1536, 1M rows) Reranking Recall@10
HNSW + F16 raw (default) False False ~300 GB vectors + ~30 GB HNSW Yes (exact) ~97 %
IVF-PQ + F16 raw False False ~300 GB + ~5 GB PQ codes Yes (exact) ~93 % inline
IVF-PQ residual + raw False True ~300 GB + ~5 GB Yes (exact) ~96 %
PQ-only True False ~5 GB total No ~93-95 %
PQ-only residual True True ~5 GB total No ~94-96 %
# Deferred write — all modes, instant Parquet commit, index in background
writer = ailake.TableWriter(path, dim=1536, pq_only=True, ivf_residual=True)
writer.write_batch_auto_deferred(texts, embeddings)
writer.commit()

HNSW tuning guide

Goal hnsw_m hnsw_ef_construction
Low latency / high QPS 8 100
General purpose (default) 16 150
High recall (RAG) 24 200
Max recall (medical, legal) 32 400

Type checking

Ships py.typed (PEP 561) and ailake/_ailake.pyi stubs. mypy and pyright work out of the box with no configuration.

Iceberg compatibility

Tables are valid Apache Iceberg Spec v2. Spark, Trino, DuckDB, and PyIceberg read tabular columns normally; the HNSW index lives in an extension section that standard Parquet readers silently ignore.

License

MIT OR Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ailake-0.0.27.tar.gz (298.3 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

ailake-0.0.27-cp39-abi3-win_amd64.whl (5.1 MB view details)

Uploaded CPython 3.9+Windows x86-64

ailake-0.0.27-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (5.7 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ x86-64

ailake-0.0.27-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (5.3 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ ARM64

File details

Details for the file ailake-0.0.27.tar.gz.

File metadata

  • Download URL: ailake-0.0.27.tar.gz
  • Upload date:
  • Size: 298.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for ailake-0.0.27.tar.gz
Algorithm Hash digest
SHA256 f8db63fcc4440f09ad3293613260c623a9533c08f55f11defecb0acc803af149
MD5 cfb3ac6794ffe55ccf7484fc15aadab4
BLAKE2b-256 d74f6db0c0b152c95f694167744e809194b85ee808615918756cffc01dce2aa0

See more details on using hashes here.

File details

Details for the file ailake-0.0.27-cp39-abi3-win_amd64.whl.

File metadata

  • Download URL: ailake-0.0.27-cp39-abi3-win_amd64.whl
  • Upload date:
  • Size: 5.1 MB
  • Tags: CPython 3.9+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for ailake-0.0.27-cp39-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 8d6786379d4917115fedc89ce97f29090454659bccb2a90935a99dc532f064e8
MD5 abb322bb5e4b89de4284260446119301
BLAKE2b-256 b61cfc492ba3ca8ed10b4583e894fbdcf2ffcceef75a89fc6ae5ac9392c8146c

See more details on using hashes here.

File details

Details for the file ailake-0.0.27-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for ailake-0.0.27-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 e17b3a8c563678b911037fafcfc741014f90b419ca9d9efde728b7be30cb0eb7
MD5 f1de28668f957beec467e87b417dcb5d
BLAKE2b-256 356530d902b8835c43892dcbb2f2f7806cb7b49853ab7047cfda82ad57a2d873

See more details on using hashes here.

File details

Details for the file ailake-0.0.27-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for ailake-0.0.27-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 cda62c4af849de1724ecf70489e2d4a12b23e4e839fd8f46abe9238bb93a5a6f
MD5 44615a1147b1bf8d47f6ef30ac0bafed
BLAKE2b-256 0c7a89cb23327cd6ac027f7aa0eeff1a16b8bf5e4140052f0788cbfba9487177

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page