Skip to main content

A python client for interacting with Ahnlich DB and AI

Project description

Ahnlich Client PY

A Python client that interacts with both ahnlich DB and AI

Ahnlich TestSuite Ahnlich Python Client Tag and Deploy

Usage Overview

The following topics are covered:

Installation

  • Using Poetry
poetry add ahnlich-client-py
  • Using pip
pip3 install ahnlich-client-py

Package Information

The ahnlich client has some noteworthy modules that should provide some context

  • grpclib

Server Response

All db query types have an associating server response, all which can be found

from ahnlich_client_py.grpc.db import server

For AI Server

from ahnlich_client_py.grpc.ai import server

Initialization

Client

from grpclib.client import Channel
from ahnlich_client_py.grpc.services import db_service


channel = Channel(host="127.0.0.1", port=1369)
    client = db_service.DbServiceStub(channel)

Requests - DB

Ping

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

# Initialize client
async with Channel(host="127.0.0.1", port=1369) as channel:
    db_client = DbServiceStub(channel)
    
    # Prepare tracing metadata
    tracing_id = "00-80e1afed08e019fc1110464cfa66635c-7a085853722dc6d2-01"
    metadata = {"ahnlich-trace-id": tracing_id}
    
    # Make request with metadata
    response = await db_client.ping(
        db_query.Ping(),
        metadata=metadata
    )
    
    print(response)  # Returns Pong message

Info Server

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    response = await client.info_server(db_query.InfoServer())
    # response contains server version and type
    print(f"Server version: {response.info.version}")

List Connected Clients

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    response = await client.list_clients(db_query.ListClients())
    print(f"Connected clients: {[c.id for c in response.clients]}")

List Stores

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    tracing_id = "00-80e1afed08e019fc1110464cfa66635c-7a085853722dc6d2-01"
    response = await client.list_stores(
        db_query.ListStores(),
        metadata={"ahnlich-trace-id": tracing_id}
    )
    print(f"Stores: {[store.name for store in response.stores]}")

Create Store

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    response = await client.create_store(
        db_query.CreateStore(
            store="test store",
            dimension=5,  # Fixed vector dimension
            predicates=["job"],  # Index these metadata fields
            error_if_exists=True
        )
    )
    # response is Unit() on success
    
    # All store_keys must match this dimension
    # Example valid key:
    valid_key = [1.0, 2.0, 3.0, 4.0, 5.0]  # length = 5

Once store dimension is fixed, all store_keys must confirm with said dimension. Note we only accept 1 dimensional arrays/vectors of length N. Store dimensions is a one dimensional array of length N

Set

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc import keyval, metadata
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    store_key = keyval.StoreKey(key=[5.0, 3.0, 4.0, 3.9, 4.9])
    store_value = keyval.StoreValue(
        value={"rank": metadata.MetadataValue(raw_string="chunin")}
    )
    
    response = await client.set(
        db_query.Set(
            store="test store",
            inputs=[keyval.DbStoreEntry(key=store_key, value=store_value)]
        )
    )
    # response contains upsert counts (inserted, updated)

Drop store

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    response = await client.drop_store(
        db_query.DropStore(
            store="test store",
            error_if_not_exists=True
        )
    )
    # response contains deleted_count

Get Sim N

Returns an array of tuple of (store_key, store_value) of Maximum specified N

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query
from ahnlich_client_py.grpc.shared.algorithm import Algorithm

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    search_key = keyval.StoreKey(key=[...])  # Your query vector
    
    response = await client.get_sim_n(
        db_query.GetSimN(
            store="test store",
            search_input=search_key,
            closest_n=3,  # Must be > 0
            algorithm=Algorithm.CosineSimilarity
        )
    )
    # response.entries contains (key, value, similarity) tuples

Closest_n is a Nonzero integer value

Get Key

Returns an array of tuple of (store_key, store_value)

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    lookup_key = keyval.StoreKey(key=[...])  # Your lookup vector
    
    response = await client.get_key(
        db_query.GetKey(
            store="test store",
            keys=[lookup_key]
        )
    )
    # response.entries contains matching (key, value) pairs

Get By Predicate

Same as Get_key but returns results based defined conditions

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query
from ahnlich_client_py.grpc import predicates, metadata

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    condition = predicates.PredicateCondition(
        value=predicates.Predicate(
            equals=predicates.Equals(
                key="job",
                value=metadata.MetadataValue(raw_string="sorcerer")
            )
        )
    )
    
    response = await client.get_pred(
        db_query.GetPred(
            store="test store",
            condition=condition
        )
    )
    # response.entries contains matching items

Create Predicate Index

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    response = await client.create_pred_index(
        db_query.CreatePredIndex(
            store="test store",
            predicates=["job", "rank"]
        )
    )
    # response.created_indexes shows how many indexes were created

Drop Predicate Index

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    response = await client.drop_pred_index(
        db_query.DropPredIndex(
            store="test store",
            predicates=["job"],
            error_if_not_exists=True
        )
    )
    # response.deleted_count shows how many indexes were removed

Create Non Linear Algorithm Index

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query
from ahnlich_client_py.grpc.algorithm.nonlinear import NonLinearAlgorithm, NonLinearIndex, KDTreeConfig, HNSWConfig

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)

    # Create a KDTree index
    response = await client.create_non_linear_algorithm_index(
        db_query.CreateNonLinearAlgorithmIndex(
            store="test store",
            non_linear_indices=[NonLinearIndex(kdtree=KDTreeConfig())]
        )
    )

    # Or create an HNSW index (with optional config)
    response = await client.create_non_linear_algorithm_index(
        db_query.CreateNonLinearAlgorithmIndex(
            store="test store",
            non_linear_indices=[NonLinearIndex(hnsw=HNSWConfig())]
        )
    )
    # response.created_indexes shows how many indexes were created

Drop Non Linear Algorithm Index

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query
from ahnlich_client_py.grpc.algorithm.nonlinear import NonLinearAlgorithm

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)

    response = await client.drop_non_linear_algorithm_index(
        db_query.DropNonLinearAlgorithmIndex(
            store="test store",
            non_linear_indices=[NonLinearAlgorithm.KDTree],
            error_if_not_exists=True
        )
    )
    # response.deleted_count shows how many indexes were removed

Delete Key

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query
from ahnlich_client_py.grpc import keyval

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    store_key = keyval.StoreKey(key=[5.0, 3.0, 4.0, 3.9, 4.9])
    
    response = await client.del_key(
        db_query.DelKey(
            store="test store",
            keys=[store_key]
        )
    )
    # response.deleted_count shows how many items were deleted

Delete Predicate

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.db_service import DbServiceStub
from ahnlich_client_py.grpc.db import query as db_query
from ahnlich_client_py.grpc import predicates, metadata

async with Channel(host="127.0.0.1", port=1369) as channel:
    client = DbServiceStub(channel)
    
    condition = predicates.PredicateCondition(
        value=predicates.Predicate(
            equals=predicates.Equals(
                key="job",
                value=metadata.MetadataValue(raw_string="sorcerer")
            )
        )
    )
    
    response = await client.del_pred(
        db_query.DelPred(
            store="test store",
            condition=condition
        )
    )
    # response.deleted_count shows how many items were deleted

Requests - AI

Ping

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.ping(ai_query.Ping())

Info Server

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.info_server(ai_query.InfoServer())

List Stores

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.list_stores(ai_query.ListStores())

Get Store

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    # Returns AiStoreInfo with name, query_model, index_model, embedding_size,
    # predicate_indices, dimension, and optional db_info (DB store info when AI is connected to DB).
    response = await client.get_store(ai_query.GetStore(store="my_store"))

Create Store

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query
from ahnlich_client_py.grpc.ai.models import AiModel

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.create_store(
        ai_query.CreateStore(
            store="test store",
            query_model=AiModel.ALL_MINI_LM_L6_V2,
            index_model=AiModel.ALL_MINI_LM_L6_V2,
            predicates=["job"],
            error_if_exists=True,
            # Store original controls if we choose to store the raw inputs 
            # within the DB in order to be able to retrieve the originals again
            # during query, else only store values are returned
            store_original=True
        )
    )

Set

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query
from ahnlich_client_py.grpc import keyval, metadata
from ahnlich_client_py.grpc.ai import preprocess

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.set(
        ai_query.Set(
            store="test store",
            inputs=[
                keyval.AiStoreEntry(
                    key=keyval.StoreInput(raw_string="Jordan One"),
                    value=keyval.StoreValue(
                        value={"brand": metadata.MetadataValue(raw_string="Nike")}
                    ),
                ),
                keyval.AiStoreEntry(
                    key=keyval.StoreInput(raw_string="Yeezey"),
                    value=keyval.StoreValue(
                        value={"brand": metadata.MetadataValue(raw_string="Adidas")}
                    ),
                )
            ],
            preprocess_action=preprocess.PreprocessAction.NoPreprocessing
        )
    )

Drop store

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.drop_store(
        ai_query.DropStore(
            store="test store",
            error_if_not_exists=True
        )
    )

Get Sim N ??

Returns an array of tuple of (store_key, store_value) of Maximum specified N

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query
from ahnlich_client_py.grpc import keyval, algorithms

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    
    response = await client.get_sim_n(
        ai_query.GetSimN(
            store="test store",
            search_input=keyval.StoreInput(raw_string="Jordan"),
            closest_n=3,
            algorithm=algorithms.Algorithm.COSINE_SIMILARITY,
            condition=None,  # Optional predicate condition
            execution_provider=None  # Optional execution provider
        )
    )
    
    # Response contains entries with similarity scores
    for entry in response.entries:
        print(f"Key: {entry.key.raw_string}")
        print(f"Score: {entry.score}")
        print(f"Value: {entry.value}")

Closest_n is a Nonzero integer value

Get By Predicate

Same as Get_key but returns results based defined conditions

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query
from ahnlich_client_py.grpc import predicates, metadata

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    condition = predicates.PredicateCondition(
        value=predicates.Predicate(
            equals=predicates.Equals(
                key="brand", 
                value=metadata.MetadataValue(raw_string="Nike")
            )
        )
    )
    response = await client.get_pred(
        ai_query.GetPred(
            store="test store",
            condition=condition
        )
    )

Create Predicate Index

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.create_pred_index(
        ai_query.CreatePredIndex(
            store="test store",
            predicates=["job", "rank"]
        )
    )

Drop Predicate Index

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.drop_pred_index(
        ai_query.DropPredIndex(
            store="test store",
            predicates=["job"],
            error_if_not_exists=True
        )
    )

Create Non Linear Algorithm Index

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query
from ahnlich_client_py.grpc.algorithm.nonlinear import NonLinearAlgorithm, NonLinearIndex, KDTreeConfig, HNSWConfig

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)

    # Create a KDTree index
    response = await client.create_non_linear_algorithm_index(
        ai_query.CreateNonLinearAlgorithmIndex(
            store="test store",
            non_linear_indices=[NonLinearIndex(kdtree=KDTreeConfig())]
        )
    )

    # Or create an HNSW index (with optional config)
    response = await client.create_non_linear_algorithm_index(
        ai_query.CreateNonLinearAlgorithmIndex(
            store="test store",
            non_linear_indices=[NonLinearIndex(hnsw=HNSWConfig())]
        )
    )

Drop Non Linear Algorithm Index

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query
from ahnlich_client_py.grpc.algorithm.nonlinear import NonLinearAlgorithm

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.drop_non_linear_algorithm_index(
        ai_query.DropNonLinearAlgorithmIndex(
            store="test store",
            non_linear_indices=[NonLinearAlgorithm.KDTree],
            error_if_not_exists=True
        )
    )

Delete Key

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query
from ahnlich_client_py.grpc import keyval

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    response = await client.del_key(
        ai_query.DelKey(
            store="test store",
            key=keyval.StoreInput(raw_string="Custom Made Jordan 4")
        )
    )

Tracing ID example:

from grpclib.client import Channel
from ahnlich_client_py.grpc.services.ai_service import AiServiceStub
from ahnlich_client_py.grpc.ai import query as ai_query

async with Channel(host="127.0.0.1", port=1370) as channel:
    client = AiServiceStub(channel)
    
    # Prepare tracing metadata
    tracing_id = "00-80e1afed08e019fc1110464cfa66635c-7a085853722dc6d2-01"
    metadata = {"ahnlich-trace-id": tracing_id}
    
    # Make request with metadata
    response = await client.ping(
        ai_query.Ping(),
        metadata=metadata
    )

Bulk Requests

Clients have the ability to send multiple requests at once, and these requests will be handled sequentially. The builder class takes care of this. The response is a list of all individual request responses.

from ahnlich_client_py import AhnlichDBClient
client = AhnlichDBClient(address="127.0.0.1", port=port)

request_builder = client.pipeline()
request_builder.ping()
request_builder.info_server()
request_builder.list_clients()
request_builder.list_stores()

response: server_response.ServerResult = client.exec()

Sample applies to the AIclient

Deploy to Artifactory

Replace the contents of MSG_TAG file with your new tag message

From Feature branch, either use the makefile :

make bump-py-client BUMP_RULE=[major, minor, patch] 

or

poetry run bumpversion [major, minor, patch] 

When Your PR is made, changes in the client version file would trigger a release build to Pypi

Type Meanings

  • Store Key: A one dimensional vector
  • Store Value: A Dictionary containing texts or binary associated with a storekey
  • Store Predicates: Or Predicate indices are basically indices that improves the filtering of store_values
  • Predicates: These are operations that can be used to filter data(Equals, NotEquals, Contains, etc)
  • PredicateConditions: They are conditions that utilize one predicate or tie Multiple predicates together using the AND, OR or Value operation. Where Value means just a predicate. Example: Value
condition = predicates.PredicateCondition(
            value=predicates.Predicate(
                equals=predicates.Equals(
                    key="job", value=metadata.MetadataValue(raw_string="sorcerer")
                )
            )
        )

Metadatavalue can also be a binary(list of u8s)

condition = predicates.PredicateCondition(
            value=predicates.Predicate(
                equals=predicates.Equals(
                    key="rank", value=metadata.MetadataValue(image=[2,2,3,4,5,6,7])
                )
            )
        )

AND

condition = predicates.PredicateCondition(
    and_=predicates.AndCondition(
        left=predicates.PredicateCondition(
            value=predicates.Predicate(
                equals=predicates.Equals(
                    key="job",
                    value=metadata.MetadataValue(raw_string="sorcerer")
                )
            )
        ),
        right=predicates.PredicateCondition(
            value=predicates.Predicate(
                equals=predicates.Equals(
                    key="rank",
                    value=metadata.MetadataValue(raw_string="chunin")
                )
            )
        )
    )
)
  • Search Input: A string or binary file that can be stored by the aiproxy. Note, the binary file depends on the supported models used in a store or supported by Ahnlich AI

  • AIModels: Supported AI models used by ahnlich ai

  • AIStoreType: A type of store to be created. Either a Binary or String

Change Log

Version Description
0.0.0 Base Python clients (Async and Sync) to connect to ahnlich db and AI, with connection pooling and Bincode serialization and deserialization
1.0.0 Rewrite Underlying communication using GRPC
0.2.0 HNSW + AI Model Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ahnlich_client_py-0.2.0.tar.gz (19.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ahnlich_client_py-0.2.0-py3-none-any.whl (26.8 kB view details)

Uploaded Python 3

File details

Details for the file ahnlich_client_py-0.2.0.tar.gz.

File metadata

  • Download URL: ahnlich_client_py-0.2.0.tar.gz
  • Upload date:
  • Size: 19.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.0 CPython/3.11.14 Linux/6.14.0-1017-azure

File hashes

Hashes for ahnlich_client_py-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d4d1c8ae2cbd1f18178abac67668c27137105ce029f2c280f2e039f73541a282
MD5 3d60be16de438d3219f8e29253173419
BLAKE2b-256 39ae41c2ef39aebe8dcd6a4fd6c73ef2786f4fb5d480f64d5dba11d765e20138

See more details on using hashes here.

File details

Details for the file ahnlich_client_py-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: ahnlich_client_py-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 26.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.0 CPython/3.11.14 Linux/6.14.0-1017-azure

File hashes

Hashes for ahnlich_client_py-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 480850aa72f61317f9b2c3a6a668a4ae6922349cd26a42e3c4d95738af036fed
MD5 e1efe0a74c52413e8bf0a47329538f87
BLAKE2b-256 290ab4a16d0254f95834b0c92713a13fe00138adf258cbd703348ede96674cd8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page