Skip to main content

Unified tabular + vector storage in a single Iceberg-compatible file

Project description

ailake — AI-Lake Format Python SDK

Unified storage for tabular data, embeddings, and HNSW vector index in a single Parquet-compatible file. 100% Apache Iceberg Spec v2 compatible.

Install

pip install ailake

Requires Python ≥ 3.9. Dependencies: pyarrow >= 14.0, numpy >= 1.24.

Quickstart

Write + search — fluent API (recommended)

import ailake
import numpy as np

# Open or create a table
table = ailake.open_table(
    "./my_table",
    dim=1536,
    metric="cosine",          # cosine | euclidean | dot_product | normalized_cosine
    pre_normalize=True,       # normalize at write time; enables fast 1-dot(a,b) path
    hnsw_m=16,                # HNSW connections per node (default 16)
    hnsw_ef_construction=150,
    embedding_model="text-embedding-3-small",  # tracked in Iceberg metadata
    embedding_model_version="v1",
)

texts = ["Document about AI", "Another document"]
embeddings = np.random.rand(2, 1536).astype(np.float32)

table.insert(texts, embeddings)   # accepts list or numpy array
snapshot_id = table.commit()

# Pattern B — auto-embed without passing embeddings explicitly
def my_embed(texts: list[str]) -> list[list[float]]:
    return np.random.rand(len(texts), 1536).tolist()  # replace with real model

table2 = ailake.open_table("./my_table2", dim=1536, embed_fn=my_embed)
table2.insert(["Document about AI", "Another document"])  # embed_fn called automatically
table2.commit()

# Pointer-only search (default — backward-compatible)
df      = table.search(embeddings[0], top_k=10).to_pandas()   # row_id, distance, file
lf      = table.search(embeddings[0]).limit(5).to_polars()
results = table.search(embeddings[0]).to_list()   # list[dict]

# Full row data — all Parquet columns + _distance
df_full = table.search(embeddings[0], top_k=10, fetch_data=True).to_pandas()

Async API

import ailake, asyncio
import numpy as np

async def main():
    table = ailake.open_table("./my_table", dim=1536)
    await table.insert_async(texts, embeddings)
    await table.commit_async()

    # fluent async chain
    df = await table.search(query_vec).limit(10).to_pandas_async()

    # parallel searches via asyncio.gather
    r1, r2 = await asyncio.gather(
        table.search(q1).to_list_async(),
        table.search(q2).to_list_async(),
    )

asyncio.run(main())

Module-level search

import ailake
import numpy as np

query = np.random.rand(1536).astype(np.float32)

df     = ailake.search("./my_table", query, top_k=10).to_pandas()
lf     = ailake.search("./my_table", query).limit(5).to_polars()
items  = ailake.search("./my_table", query).to_list()

Assemble context for LLMs

import ailake

chunks = [
    {
        "document_id": "doc-1",
        "chunk_index": 0,
        "chunk_text": "AI-Lake stores vectors and tabular data together.",
        "document_title": "AI-Lake Overview",
        "section_path": "Introduction",
        "source_uri": "s3://my-lake/docs/overview.pdf",
        "distance": 0.12,
    },
]

context_xml = ailake.assemble_context(
    chunks=chunks,
    max_tokens=4096,       # token budget (4 chars ≈ 1 token)
    dedup_threshold=0.05,  # drop near-duplicate chunks
)
# Pass context_xml directly to Claude / GPT-4 as a user message

API reference

open_table(path, *, ...) → Table

Opens or creates an AI-Lake table at path.

Parameter Default Description
path required Table root (local, s3://, gs://, az://)
vector_column "embedding" Vector column name
dim 1536 Embedding dimension
metric "cosine" cosine, euclidean, dot_product, normalized_cosine
pre_normalize False Normalize to unit L2 at write; enables 1-dot(a,b) fast path (~12-20 % speedup)
hnsw_m None (=16) HNSW connections per node
hnsw_ef_construction None (=150) HNSW build pool size
pq_only False Discard raw F16 vectors after index build — only PQ codes stored. ~98 % storage reduction; reranking disabled; recall@10 ~93-95 %.
ivf_residual False Encode vec − cluster_centroid per IVF cell (residual PQ). Same storage as standard PQ; ~2-4 pp better recall@10.
embedding_model None Embedding model name stored in Iceberg properties (ailake.embedding-model). Used for mismatch detection and migration tracking.
embedding_model_version None Optional model version. Stored as "<name>@<version>" in Iceberg properties.
embed_fn None Auto-embed callable list[str] → list[list[float]]. When set, insert(texts) and write_batch(texts) can be called without passing embeddings — the callable is invoked automatically.
partition_by None Single-column Iceberg identity partition (e.g. "agent_id"). Stored in metadata.json. Prefer partition_fields for new tables.
partition_value None Per-write value for partition_by. Tagged in key_metadata; used for manifest-level pruning at search time.
partition_fields None Multi-column Iceberg partition spec. List of (column, transform, column_type) tuples. Supports all Iceberg transforms: "identity", "year", "month", "day", "hour", "bucket[N]", "truncate[N]". Takes precedence over partition_by. Example: [("topic_id","identity","int"),("date","month","date")].
format_version 2 Iceberg format version. Set to 3 to write an Iceberg v3 table.

Table

Method Description
insert(texts, embeddings=None) → Table Buffer a batch. embeddings: list[list[float]] or numpy array. When embed_fn was set on open_table(), embeddings may be omitted — the callable is invoked automatically.
write_batch_auto_deferred(texts, embeddings=None) → Table Deferred write — Parquet persisted immediately (~200k vec/s); index (HNSW or IVF-PQ, auto-selected) built in a background thread. Shard served via flat scan until index ready.
commit() → int Persist as a new Iceberg snapshot; returns snapshot ID.
search(query, top_k=10, fetch_data=False) → SearchQuery Lazy, chainable search. query: list[float] or numpy array. Set fetch_data=True to return full row data. Raises ModelMismatch if query dim ≠ table dim.
insert_async(...) Async variant of insert.
write_batch_auto_deferred_async(...) Async variant of write_batch_auto_deferred.
commit_async() → int Async variant of commit.

Table is a context manager: with ailake.open_table(...) as t: ...

In Jupyter, table renders a styled HTML card showing path and vector config.

SearchQuery

Lazy result set — no I/O until materialised.

Method Description
limit(n) → SearchQuery Cap to n nearest neighbours (chainable).
to_list() → list[dict] Always pointer-only: [{"row_id": int, "distance": float, "file": str}, ...]
to_arrow() → pyarrow.Table Full row data (all columns + _distance) when fetch_data=True; pointer-only pyarrow.Table with columns row_id, distance, file otherwise.
to_pandas() → pd.DataFrame Full row DataFrame when fetch_data=True; pointer-only otherwise.
to_polars() → pl.DataFrame Full row DataFrame when fetch_data=True; pointer-only otherwise.
to_list_async() Async variant.
to_arrow_async() Async variant.
to_pandas_async() Async variant.
to_polars_async() Async variant.

In Jupyter, results renders as an HTML table when executed, pending state otherwise. When fetch_data=True, the HTML table shows all Parquet columns.

Full-read mode

# Pointer-only (default — backward-compatible)
df = ailake.search("./my_table", query, top_k=10).to_pandas()
# columns: row_id, distance, file

# Full row data — all Parquet columns + _distance
df = ailake.search("./my_table", query, top_k=10, fetch_data=True).to_pandas()
# columns: text, embedding, ..., _distance

# Same via Table handle
df = table.search(query, top_k=10, fetch_data=True).to_pandas()

fetch_data=True reads each matching Parquet file once and uses arrow_select::take to extract only the matched rows — no full table scan.

search(path, query, top_k=10, fetch_data=False, partition_filter=None) → SearchQuery

Module-level search returning the same chainable SearchQuery. Pass partition_filter to restrict results to files written with a specific partition_value — pruning happens at the manifest level before any HNSW I/O.

VectorColSpec(column, dim, metric="cosine", modality=None)

Declares one vector column for multi-column writes or searches.

Arg Description Example
column Parquet column name "image_embedding"
dim Embedding dimension 512
metric Distance metric "cosine"
modality Optional tag — stored as ailake.modality-<column> "text" / "image" / "audio" / "video"

TableWriter.write_batch_multi(texts, columns)

Write a batch with N independent vector columns in one call. Each column gets its own HNSW index in the AILK section of the file footer.

from ailake import TableWriter, VectorColSpec

text_spec  = VectorColSpec("embedding",       1536, "cosine", "text")
image_spec = VectorColSpec("image_embedding",  512, "cosine", "image")

writer = TableWriter("s3://my-lake/media/", dim=1536, metric="cosine")
writer.write_batch_multi(
    texts,
    [(text_spec, text_embeddings), (image_spec, image_embeddings)],
)
snapshot_id = writer.commit()

search_multimodal(path, queries, top_k=10) → list[dict]

Cross-modal search: fuse results from N vector columns via Reciprocal Rank Fusion.

rrf_score = Σ weight_i / (60 + rank_i) — higher is better.

results = ailake.search_multimodal(
    "s3://my-lake/media/",
    queries=[
        ("embedding",       text_vec,  0.7),   # 70% weight on text similarity
        ("image_embedding", image_vec, 0.3),   # 30% weight on image similarity
    ],
    top_k=20,
)
# Returns: [{"row_id": int, "rrf_score": float, "file": str}, ...]
# Ordered by descending rrf_score

Each column is searched by its own HNSW. Per-column dimensions are auto-detected from ailake.dim-<col> Iceberg properties written at commit() time — no dim argument needed when reading tables written with write_batch_multi.

Agent(table_path, embed_fn, agent_id=None) — Phase 9 episodic memory

High-level helper for agent frameworks (LangChain, CrewAI, AutoGen). Wraps TableWriter + search + ContextAssembler with hybrid scoring (distance × recency × importance) and automatic per-agent partition isolation.

import ailake

agent = ailake.Agent(
    table_path="s3://my-lake/agents/",
    embed_fn=my_embed_fn,         # list[str] → list[list[float]]
    agent_id="agent-uuid-here",   # isolates reads/writes to this agent's shard
)

# Store a memory with optional importance score
agent.remember("Deployment failed due to OOM on Tuesday", importance=0.9)

# Recall relevant memories — hybrid score = distance × recency × importance
results = agent.recall("deployment issues", top_k=5)

# Log a tool call for later retrieval
agent.log_tool_call(
    name="web_search",
    input={"q": "python asyncio timeout"},
    output={"hits": 5},
    outcome="success",
    latency_ms=120,
)

# Assemble context for LLM prompt (dedup + token budget)
context_xml = agent.assemble_context("why did deployment fail?", max_tokens=4096)
Method Description
remember(text, importance=1.0) Embeds text and stores it as an EpisodicMemorySchema row tagged with agent_id.
recall(query, top_k=5) Embeds query, searches with partition_filter=self.agent_id, applies hybrid score.
log_tool_call(name, input, output, outcome="success", latency_ms=0) Stores a ToolCallSchema row — searchable by tool name and context.
assemble_context(query, max_tokens=4096) recall() + ContextAssembler — returns prompt-ready XML.

migrate_embeddings(path, old_column, new_column, embed_fn, *, ...)

Re-embeds all chunks in a table with a new model, committing the result as a new Iceberg snapshot.

ailake.migrate_embeddings(
    path         = "s3://my-lake/docs/",
    old_column   = "embedding",        # existing vector column
    new_column   = "embedding_v2",     # destination column (may be same name)
    embed_fn     = my_embed_fn,        # callable: list[str] → list[list[float]]
    text_column  = "chunk_text",       # source text column
    strategy     = "dual_write_then_cutover",  # or "atomic_replace"
    batch_size   = 512,
    new_model    = "text-embedding-3-large",
    new_model_version = "v1",
    on_progress  = lambda *, files_done, files_total, rows_migrated: print(
        f"{files_done}/{files_total} files, {rows_migrated} rows"
    ),
)
Parameter Default Description
path required Table root URI
old_column required Existing vector column to migrate from
new_column required Destination vector column
embed_fn required list[str] → list[list[float]] callable
text_column "chunk_text" Parquet column containing the source text
strategy "dual_write_then_cutover" "dual_write_then_cutover" (zero downtime, 2× peak storage) or "atomic_replace" (lower storage, brief mixed-model window)
batch_size 512 Rows passed to embed_fn per call
new_model None Model name written to ailake.embedding-model after migration
new_model_version None Optional version suffix
on_progress None Callable invoked after each file with keyword args files_done, files_total, rows_migrated

TableWriter (low-level — use open_table() for most cases)

# Standard HNSW write with model tracking
writer = ailake.TableWriter(
    path, dim=1536, metric="cosine",
    embedding_model="text-embedding-3-small",
    embedding_model_version="v1",
)
writer.write_batch(texts, embeddings)
snapshot_id = writer.commit()

# Pattern B — auto-embed: omit embeddings, SDK calls embed_fn
writer = ailake.TableWriter(
    path, dim=1536,
    embed_fn=lambda texts: my_model.encode(texts).tolist(),
)
writer.write_batch(texts)  # no embeddings arg needed
writer.commit()

# PQ-only — raw vectors discarded after index build (~98 % storage reduction)
writer = ailake.TableWriter(path, dim=1536, metric="cosine", pq_only=True)
writer.write_batch(texts, embeddings)
writer.commit()

# Residual PQ — per-cluster encoding for better recall
writer = ailake.TableWriter(path, dim=1536, metric="cosine", ivf_residual=True)
writer.write_batch(texts, embeddings)
writer.commit()

# Deferred write — Parquet immediate, index background (~200k vec/s)
writer = ailake.TableWriter(path, dim=1536, metric="cosine")
writer.write_batch_auto_deferred(texts, embeddings)
writer.commit()

TableWriter parameters: same as open_table() (includes pq_only, ivf_residual, pre_normalize, hnsw_m, hnsw_ef_construction, embedding_model, embedding_model_version, embed_fn, partition_by, partition_value, partition_fields, format_version).

delete_where(path, column, values) → None

Commits an Iceberg equality delete. No data files are rewritten.

ailake.delete_where("./my_table", "id", ["doc-obsolete-1", "doc-obsolete-2"])

add_column(path, name, col_type, *, required=False, initial_default=None) → int

Adds column to live table schema. Returns new schema_id. No data files rewritten.

ailake.add_column("./my_table", "source_url", "string", required=False, initial_default="")

rename_column(path, old_name, new_name) → int

Renames column. Returns new schema_id.

hardware_info() → dict[str, str]

Returns hardware profile of current machine.

info = ailake.hardware_info()
# {
#   "backend":           "cpu-simd",   # or "nvidia-cuda" / "amd-rocm"
#   "has_cuda":          "false",
#   "has_rocm":          "false",
#   "cpu_logical_cores": "16",
#   "has_avx2":          "true",
#   "has_avx512":        "false",
#   "recommend_ivf_pq":  "true",       # true when has GPU OR (cores > 8 AND n >= 5000)
# }

Call before write_batch_auto_deferred to understand what index type will be selected.

assemble_context(chunks, max_tokens=4096, dedup_threshold=0.05) → str

Assembles chunk dicts into structured XML for LLM input. Deduplicates near-identical chunks within the token budget.

Storage modes and index types

Mode pq_only ivf_residual Storage (dim=1536, 1M rows) Reranking Recall@10
HNSW + F16 raw (default) False False ~300 GB vectors + ~30 GB HNSW Yes (exact) ~97 %
IVF-PQ + F16 raw False False ~300 GB + ~5 GB PQ codes Yes (exact) ~93 % inline
IVF-PQ residual + raw False True ~300 GB + ~5 GB Yes (exact) ~96 %
PQ-only True False ~5 GB total No ~93-95 %
PQ-only residual True True ~5 GB total No ~94-96 %
# Deferred write — all modes, instant Parquet commit, index in background
writer = ailake.TableWriter(path, dim=1536, pq_only=True, ivf_residual=True)
writer.write_batch_auto_deferred(texts, embeddings)
writer.commit()

HNSW tuning guide

Goal hnsw_m hnsw_ef_construction
Low latency / high QPS 8 100
General purpose (default) 16 150
High recall (RAG) 24 200
Max recall (medical, legal) 32 400

Type checking

Ships py.typed (PEP 561) and ailake/_ailake.pyi stubs. mypy and pyright work out of the box with no configuration.

Iceberg compatibility

Tables are valid Apache Iceberg Spec v2. Spark, Trino, DuckDB, and PyIceberg read tabular columns normally; the HNSW index lives in an extension section that standard Parquet readers silently ignore.

License

MIT OR Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ailake-0.0.20.tar.gz (272.0 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

ailake-0.0.20-cp39-abi3-win_amd64.whl (3.8 MB view details)

Uploaded CPython 3.9+Windows x86-64

ailake-0.0.20-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.2 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ x86-64

ailake-0.0.20-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (3.9 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ ARM64

File details

Details for the file ailake-0.0.20.tar.gz.

File metadata

  • Download URL: ailake-0.0.20.tar.gz
  • Upload date:
  • Size: 272.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for ailake-0.0.20.tar.gz
Algorithm Hash digest
SHA256 9a7617f5a43ec3aeb1d3809ee6f045d9c363edd90e13dab205aa7df1a6104367
MD5 28fc02e74e7d20cd9801461c45aa5489
BLAKE2b-256 49fe283923e70774aa731d440d87124f3559bf0b18db7b9a749bffe66dc39107

See more details on using hashes here.

File details

Details for the file ailake-0.0.20-cp39-abi3-win_amd64.whl.

File metadata

  • Download URL: ailake-0.0.20-cp39-abi3-win_amd64.whl
  • Upload date:
  • Size: 3.8 MB
  • Tags: CPython 3.9+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for ailake-0.0.20-cp39-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 4559d0f5831b62571c4bf637193aac75a8eb818db3135d90c6a9dfd8f454ef5e
MD5 7c07601a265cc77da05aaca0673aa42d
BLAKE2b-256 2eeab1adae8876ad6083539bf7c14fc651abadfc99525afea040ea4565163244

See more details on using hashes here.

File details

Details for the file ailake-0.0.20-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for ailake-0.0.20-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 d7d580d5f88163f427e69cfb1ffbea3582884a4fe1ca8b2e78172947f58bed28
MD5 41a1dafdb762d03c79e04c1f1b96e755
BLAKE2b-256 23bd4649820c4a039b48c7d722769f21dc0e411991e8dc76e24cdef5b37f9af3

See more details on using hashes here.

File details

Details for the file ailake-0.0.20-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for ailake-0.0.20-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 0ba199e8c67313337e445bdf2da65351699b10ef3e5529aba68afeca05b3dbc6
MD5 a502169f48f1fb84e7cd53747193dcdd
BLAKE2b-256 17ec9c7a51b306d7f481f8889f483a59c42ff6fa90bc76bacd111b8b7e16e4aa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page