Skip to main content

EverAlgo clustering: cluster_by_geometry / cluster_by_llm functional operators with Cluster value object.

Project description

everalgo-clustering

Online incremental clustering for EverAlgo — cluster_by_geometry (sync) and cluster_by_llm (async, LLM-refined) operating on caller-owned list[Cluster] state.

Stateless: the package never embeds, never queries storage, never holds a lock. The caller owns embedding computation, persistence (list[Cluster] serialisation), and read-modify-write coordination across concurrent writers.

See the umbrella project: EverAlgo monorepo and the architecture document at docs/concepts/architecture.md.

Install

pip install everalgo-clustering

What this distribution provides

Symbol Role
Cluster Frozen Pydantic value object — one cluster snapshot. Caller-supplied id / members; algorithm supplies merged centroid, count, last_ts, preview.
cluster_by_geometry Cosine similarity + time-window filter + threshold; no LLM; sync
cluster_by_llm Top-K geometric recall → fast-path skip → LLM semantic ranking; raises on LLM failure; async

Cluster type

class Cluster(BaseModel):
    id: str | None = None           # caller-supplied business id; algo only passes through, never mints
    centroid: np.ndarray
    count: int = 1
    last_ts: int                    # Unix epoch milliseconds
    preview: list[str] = []
    members: list[str] = []         # caller-supplied entity ids; algo appends on merge, never inspects

The caller wraps each incoming item as a size-1 Cluster (count=1) before passing to either function. Both functions return Cluster | None: a merged snapshot when the item is assigned to an existing cluster, or None when no match — the caller then appends the original size-1 Cluster as a brand-new entry and mints its own id.

Quick start

import numpy as np
from everalgo.clustering import Cluster, cluster_by_geometry

existing: list[Cluster] = []  # caller loads from storage; empty on first run
vector = np.random.rand(2560).astype(np.float32)
timestamp_ms = 1_700_000_000_000

new_cluster = Cluster(centroid=vector, last_ts=timestamp_ms)
merged = cluster_by_geometry(  # sync — no await
    new_cluster,
    existing,
    threshold=0.65,        # cosine similarity floor
    time_window_days=7.0,  # ignore clusters older than this window
)

if merged is not None:
    # item assigned to an existing cluster; caller updates the matching entry
    print(f"merged into cluster id={merged.id!r}, new count={merged.count}")
else:
    # no match — caller appends new_cluster and stamps its own id
    new_cluster_with_id = new_cluster.model_copy(update={"id": "cid_001"})
    existing.append(new_cluster_with_id)
    print("created new cluster")

LLM-refined clustering

cluster_by_llm adds a semantic ranking step over the top-K geometrically-nearest candidates. It raises on LLM failure — there is no internal fallback; the caller decides whether to retry or fall back to cluster_by_geometry.

Populate new_cluster.preview with the item's representative text so the LLM has something to rank against.

import asyncio
import numpy as np
from everalgo.clustering import Cluster, cluster_by_llm
from everalgo.llm.types import ChatResponse
from everalgo.testing.fake_llm import FakeLLMClient

_LLM_JSON = '{"idx": 0}'

async def main() -> None:
    fake = FakeLLMClient(responses=[ChatResponse(content=_LLM_JSON, model="fake")])
    existing: list[Cluster] = []
    vector = np.random.rand(2560).astype(np.float32)

    new_cluster = Cluster(
        centroid=vector,
        last_ts=1_700_000_000_000,
        preview=["Python async retry patterns"],  # shown to the LLM
    )
    merged = await cluster_by_llm(
        new_cluster,
        existing,
        llm=fake,
        k_candidates=30,
        llm_skip_threshold=0.85,
    )
    print(f"merged: {merged}")

asyncio.run(main())

Persistence pattern

Cluster is a frozen Pydantic model — serialise with model_dump() and reconstruct with Cluster.model_validate(). The caller owns the list and the lock:

raw_list = await store.load(user_id) or []
clusters = [Cluster.model_validate(r) for r in raw_list]

async with caller.lock(f"cluster:{user_id}"):
    merged = cluster_by_geometry(new_cluster, clusters)
    if merged is not None:
        idx = next(i for i, c in enumerate(clusters) if c.id == merged.id)
        clusters[idx] = merged
    else:
        new_cluster_stamped = new_cluster.model_copy(update={"id": new_id})
        clusters.append(new_cluster_stamped)
    await store.save(user_id, [c.model_dump() for c in clusters])

API reference

def cluster_by_geometry(
    new_cluster: Cluster,
    existing_clusters: list[Cluster],
    *,
    threshold: float = 0.65,
    time_window_days: float = 7.0,
    preview_cap: int = 5,
) -> Cluster | None: ...

async def cluster_by_llm(
    new_cluster: Cluster,
    existing_clusters: list[Cluster],
    *,
    llm: LLMClient,
    k_candidates: int = 30,
    llm_skip_threshold: float = 0.85,
    prompt: str | None = None,
    preview_cap: int = 5,
) -> Cluster | None: ...

Both functions return the merged Cluster (existing cluster's id preserved, centroid/count/members updated) or None (no match — caller creates a new cluster entry and mints its own id).

Tested embedding model: Qwen3-Embedding-4B (2560-dim float32). Any consistent-dimension embedding works; EverAlgo does not import or manage embedding SDKs.

Related distributions

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

everalgo_clustering-0.2.1.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

everalgo_clustering-0.2.1-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file everalgo_clustering-0.2.1.tar.gz.

File metadata

  • Download URL: everalgo_clustering-0.2.1.tar.gz
  • Upload date:
  • Size: 13.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for everalgo_clustering-0.2.1.tar.gz
Algorithm Hash digest
SHA256 30fd973f68520e778d3d1bd659198c59f49cf602e9c24b3962297d7c8293ab7e
MD5 d9d6dd57b0c095f268beebaeba977e59
BLAKE2b-256 8b657be2e83566546a1e6eae1ff78aaff5aa30c0b2a230dc38a27b7c3d87eada

See more details on using hashes here.

File details

Details for the file everalgo_clustering-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for everalgo_clustering-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 40d2e42dd6472a36126b012df5be0855ea1a921503c86329fdd5d902f28abc04
MD5 585157e95e3d90667207648f05e73e89
BLAKE2b-256 3c4176a14d1aa18a164eef0dd25b8fe43b2aca7a606c33c6af58ec9806a13762

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page