Skip to main content

EverAlgo clustering: cluster_by_geometry / cluster_by_llm functional operators with ClusterState value object.

Project description

everalgo-clustering

Online incremental clustering for EverAlgo — two async functions (cluster_by_geometry / cluster_by_llm) operating on caller-owned list[Cluster] state.

Stateless: the package never embeds, never queries storage, never holds a lock. The caller owns embedding computation, persistence (list[Cluster] serialisation), and read-modify-write coordination across concurrent writers.

See the umbrella project: EverAlgo monorepo and the architecture document at docs/concepts/architecture.md.

Install

pip install everalgo-clustering

What this distribution provides

Symbol Role
Cluster Frozen Pydantic value object — one cluster snapshot. Caller-supplied id / members; algorithm supplies merged centroid, count, last_ts, preview.
cluster_by_geometry Cosine similarity + time-window filter + threshold; no LLM; async
cluster_by_llm Top-K geometric recall → fast-path skip → LLM semantic ranking; raises on LLM failure; async

Cluster type

class Cluster(BaseModel):
    id: str | None = None           # caller-supplied business id; algo only passes through, never mints
    centroid: np.ndarray
    count: int = 1
    last_ts: int                    # Unix epoch milliseconds
    preview: list[str] = []
    members: list[str] = []         # caller-supplied entity ids; algo appends on merge, never inspects

The caller wraps each incoming item as a size-1 Cluster (count=1) before passing to either function. Both functions return Cluster | None: a merged snapshot when the item is assigned to an existing cluster, or None when no match — the caller then appends the original size-1 Cluster as a brand-new entry and mints its own id.

Quick start

import asyncio
import numpy as np
from everalgo.clustering import Cluster, cluster_by_geometry

async def main() -> None:
    existing: list[Cluster] = []  # caller loads from storage; empty on first run
    vector = np.random.rand(2560).astype(np.float32)
    timestamp_ms = 1_700_000_000_000

    new_cluster = Cluster(centroid=vector, last_ts=timestamp_ms)
    merged = await cluster_by_geometry(
        new_cluster,
        existing,
        threshold=0.65,        # cosine similarity floor
        time_window_days=7.0,  # ignore clusters older than this window
    )

    if merged is not None:
        # item assigned to an existing cluster; caller updates the matching entry
        print(f"merged into cluster id={merged.id!r}, new count={merged.count}")
    else:
        # no match — caller appends new_cluster and stamps its own id
        new_cluster_with_id = new_cluster.model_copy(update={"id": "cid_001"})
        existing.append(new_cluster_with_id)
        print("created new cluster")

asyncio.run(main())

LLM-refined clustering

cluster_by_llm adds a semantic ranking step over the top-K geometrically-nearest candidates. It raises on LLM failure — there is no internal fallback; the caller decides whether to retry or fall back to cluster_by_geometry.

Populate new_cluster.preview with the item's representative text so the LLM has something to rank against.

import asyncio
import numpy as np
from everalgo.clustering import Cluster, cluster_by_llm
from everalgo.llm.types import ChatResponse
from everalgo.testing.fake_llm import FakeLLMClient

_LLM_JSON = '{"idx": 0}'

async def main() -> None:
    fake = FakeLLMClient(responses=[ChatResponse(content=_LLM_JSON, model="fake")])
    existing: list[Cluster] = []
    vector = np.random.rand(2560).astype(np.float32)

    new_cluster = Cluster(
        centroid=vector,
        last_ts=1_700_000_000_000,
        preview=["Python async retry patterns"],  # shown to the LLM
    )
    merged = await cluster_by_llm(
        new_cluster,
        existing,
        llm=fake,
        k_candidates=30,
        llm_skip_threshold=0.85,
    )
    print(f"merged: {merged}")

asyncio.run(main())

Persistence pattern

Cluster is a frozen Pydantic model — serialise with model_dump() and reconstruct with Cluster.model_validate(). The caller owns the list and the lock:

raw_list = await store.load(user_id) or []
clusters = [Cluster.model_validate(r) for r in raw_list]

async with caller.lock(f"cluster:{user_id}"):
    merged = await cluster_by_geometry(new_cluster, clusters)
    if merged is not None:
        idx = next(i for i, c in enumerate(clusters) if c.id == merged.id)
        clusters[idx] = merged
    else:
        new_cluster_stamped = new_cluster.model_copy(update={"id": new_id})
        clusters.append(new_cluster_stamped)
    await store.save(user_id, [c.model_dump() for c in clusters])

API reference

async def cluster_by_geometry(
    new_cluster: Cluster,
    existing_clusters: list[Cluster],
    *,
    threshold: float = 0.65,
    time_window_days: float = 7.0,
    preview_cap: int = 5,
) -> Cluster | None: ...

async def cluster_by_llm(
    new_cluster: Cluster,
    existing_clusters: list[Cluster],
    *,
    llm: LLMClient,
    k_candidates: int = 30,
    llm_skip_threshold: float = 0.85,
    prompt: str | None = None,
    preview_cap: int = 5,
) -> Cluster | None: ...

Both functions return the merged Cluster (existing cluster's id preserved, centroid/count/members updated) or None (no match — caller creates a new cluster entry and mints its own id).

Tested embedding model: Qwen3-Embedding-4B (2560-dim float32). Any consistent-dimension embedding works; EverAlgo does not import or manage embedding SDKs.

Related distributions

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

everalgo_clustering-0.1.0.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

everalgo_clustering-0.1.0-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file everalgo_clustering-0.1.0.tar.gz.

File metadata

  • Download URL: everalgo_clustering-0.1.0.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for everalgo_clustering-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2a983c24c412ab78e18ced112c592f3a7fd322490cee0718b790fd7f6f618eb3
MD5 f44183f6aa84668d837e73137fafda49
BLAKE2b-256 4c4887bfbce424cc7a18c80dc033b01a005408563f499ec1e31c5c74686a9c2d

See more details on using hashes here.

File details

Details for the file everalgo_clustering-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: everalgo_clustering-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for everalgo_clustering-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1977ac8f7c421c9bdcf259873653d5623c3020cf6884573b4a40c8a58faf3a61
MD5 0034b031f5ad0e6a00a3b6264d1c87b1
BLAKE2b-256 7e2d35d6abde6342151d7dd5a0fb9eaa47eb1f63d10dc14a9ca4032fa4ca97fb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page