Skip to main content

EverAlgo clustering: cluster_by_geometry / cluster_by_llm functional operators with Cluster value object.

Project description

everalgo-clustering

Online incremental clustering for EverAlgo — cluster_by_geometry (sync) and cluster_by_llm (async, LLM-refined) operating on caller-owned list[Cluster] state.

Stateless: the package never embeds, never queries storage, never holds a lock. The caller owns embedding computation, persistence (list[Cluster] serialisation), and read-modify-write coordination across concurrent writers.

See the umbrella project: EverAlgo monorepo and the architecture document at docs/concepts/architecture.md.

Install

pip install everalgo-clustering

What this distribution provides

Symbol Role
Cluster Frozen Pydantic value object — one cluster snapshot. Caller-supplied id / members; algorithm supplies merged centroid, count, last_ts, preview.
cluster_by_geometry Cosine similarity + time-window filter + threshold; no LLM; sync
cluster_by_llm Top-K geometric recall → fast-path skip → LLM semantic ranking; raises on LLM failure; async

Cluster type

class Cluster(BaseModel):
    id: str | None = None           # caller-supplied business id; algo only passes through, never mints
    centroid: np.ndarray
    count: int = 1
    last_ts: int                    # Unix epoch milliseconds
    preview: list[str] = []
    members: list[str] = []         # caller-supplied entity ids; algo appends on merge, never inspects

The caller wraps each incoming item as a size-1 Cluster (count=1) before passing to either function. Both functions return Cluster | None: a merged snapshot when the item is assigned to an existing cluster, or None when no match — the caller then appends the original size-1 Cluster as a brand-new entry and mints its own id.

Quick start

import numpy as np
from everalgo.clustering import Cluster, cluster_by_geometry

existing: list[Cluster] = []  # caller loads from storage; empty on first run
vector = np.random.rand(2560).astype(np.float32)
timestamp_ms = 1_700_000_000_000

new_cluster = Cluster(centroid=vector, last_ts=timestamp_ms)
merged = cluster_by_geometry(  # sync — no await
    new_cluster,
    existing,
    threshold=0.65,        # cosine similarity floor
    time_window_days=7.0,  # ignore clusters older than this window
)

if merged is not None:
    # item assigned to an existing cluster; caller updates the matching entry
    print(f"merged into cluster id={merged.id!r}, new count={merged.count}")
else:
    # no match — caller appends new_cluster and stamps its own id
    new_cluster_with_id = new_cluster.model_copy(update={"id": "cid_001"})
    existing.append(new_cluster_with_id)
    print("created new cluster")

LLM-refined clustering

cluster_by_llm adds a semantic ranking step over the top-K geometrically-nearest candidates. It raises on LLM failure — there is no internal fallback; the caller decides whether to retry or fall back to cluster_by_geometry.

Populate new_cluster.preview with the item's representative text so the LLM has something to rank against.

import asyncio
import numpy as np
from everalgo.clustering import Cluster, cluster_by_llm
from everalgo.llm.types import ChatResponse
from everalgo.testing.fake_llm import FakeLLMClient

_LLM_JSON = '{"idx": 0}'

async def main() -> None:
    fake = FakeLLMClient(responses=[ChatResponse(content=_LLM_JSON, model="fake")])
    existing: list[Cluster] = []
    vector = np.random.rand(2560).astype(np.float32)

    new_cluster = Cluster(
        centroid=vector,
        last_ts=1_700_000_000_000,
        preview=["Python async retry patterns"],  # shown to the LLM
    )
    merged = await cluster_by_llm(
        new_cluster,
        existing,
        llm=fake,
        k_candidates=30,
        llm_skip_threshold=0.85,
    )
    print(f"merged: {merged}")

asyncio.run(main())

Persistence pattern

Cluster is a frozen Pydantic model — serialise with model_dump() and reconstruct with Cluster.model_validate(). The caller owns the list and the lock:

raw_list = await store.load(user_id) or []
clusters = [Cluster.model_validate(r) for r in raw_list]

async with caller.lock(f"cluster:{user_id}"):
    merged = cluster_by_geometry(new_cluster, clusters)
    if merged is not None:
        idx = next(i for i, c in enumerate(clusters) if c.id == merged.id)
        clusters[idx] = merged
    else:
        new_cluster_stamped = new_cluster.model_copy(update={"id": new_id})
        clusters.append(new_cluster_stamped)
    await store.save(user_id, [c.model_dump() for c in clusters])

API reference

def cluster_by_geometry(
    new_cluster: Cluster,
    existing_clusters: list[Cluster],
    *,
    threshold: float = 0.65,
    time_window_days: float = 7.0,
    preview_cap: int = 5,
) -> Cluster | None: ...

async def cluster_by_llm(
    new_cluster: Cluster,
    existing_clusters: list[Cluster],
    *,
    llm: LLMClient,
    k_candidates: int = 30,
    llm_skip_threshold: float = 0.85,
    prompt: str | None = None,
    preview_cap: int = 5,
) -> Cluster | None: ...

Both functions return the merged Cluster (existing cluster's id preserved, centroid/count/members updated) or None (no match — caller creates a new cluster entry and mints its own id).

Tested embedding model: Qwen3-Embedding-4B (2560-dim float32). Any consistent-dimension embedding works; EverAlgo does not import or manage embedding SDKs.

Related distributions

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

everalgo_clustering-0.2.0.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

everalgo_clustering-0.2.0-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file everalgo_clustering-0.2.0.tar.gz.

File metadata

  • Download URL: everalgo_clustering-0.2.0.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for everalgo_clustering-0.2.0.tar.gz
Algorithm Hash digest
SHA256 9abf39da845d954a46db675ba9aeaab6702d258799a082b910e806dba3c6cdb4
MD5 4a745546d4109f3d61869bedc912ede1
BLAKE2b-256 1b23aca68fa184cae216f55e7c6b4985001323e5247d2bd04949e6f16399fb54

See more details on using hashes here.

File details

Details for the file everalgo_clustering-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for everalgo_clustering-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 368f784452617c90e15e2a027344d11668801ad6456aa14404cfa603f1f77ef8
MD5 2f4a238e69575a2b3c1bdc8d5a3bd18e
BLAKE2b-256 f4d7cee385939974b825f8a208af073782fa11effc8069f4b99137cdbe520c8f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page