Skip to main content

A python SDK for OceanBase Vector Store, based on SQLAlchemy, compatible with Milvus API.

Project description

pyobvector

A python SDK for OceanBase Multimodal Store (Vector Store / Full Text Search / JSON Table), based on SQLAlchemy, compatible with Milvus API.

Downloads Downloads

Installation

  • git clone this repo, then install with:
uv sync
  • install with pip:
pip install pyobvector==0.2.28
  • for embedded SeekDB support (local SeekDB without server):
pip install pyobvector[pyseekdb]

Build Doc

You can build document locally with sphinx:

mkdir build
make html

Release Notes

For detailed release notes and changelog, see RELEASE_NOTES.md.

Usage

pyobvector supports four modes:

  • Milvus compatible mode: You can use the MilvusLikeClient class to use vector storage in a way similar to the Milvus API
  • SQLAlchemy hybrid mode: You can use the vector storage function provided by the ObVecClient class and execute the relational database statement with the SQLAlchemy library. In this mode, you can regard pyobvector as an extension of SQLAlchemy.
  • Embedded SeekDB mode: Use ObVecClient or SeekdbRemoteClient with local embedded SeekDB (no server). Same API as remote: create_table, insert, ann_search, etc. Requires optional dependency: pip install pyobvector[pyseekdb].
  • Hybrid Search mode: You can use the HybridSearch class to perform hybrid search that combines full-text search and vector similarity search, with Elasticsearch-compatible query syntax.

Milvus compatible mode

Refer to tests/test_milvus_like_client.py for more examples.

A simple workflow to perform ANN search with OceanBase Vector Store:

  • setup a client:
from pyobvector import *

client = MilvusLikeClient(uri="127.0.0.1:2881", user="test@test")
  • create a collection with vector index:
test_collection_name = "ann_test"
# define the schema of collection with optional partitions
range_part = ObRangePartition(False, range_part_infos = [
    RangeListPartInfo('p0', 100),
    RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
schema = client.create_schema(partitions=range_part)
# define field schema of collection
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=3)
schema.add_field(field_name="meta", datatype=DataType.JSON, nullable=True)
# define index parameters
idx_params = self.client.prepare_index_params()
idx_params.add_index(
    field_name='embedding',
    index_type=VecIndexType.HNSW,
    index_name='vidx',
    metric_type="L2",
    params={"M": 16, "efConstruction": 256},
)
# create collection
client.create_collection(
    collection_name=test_collection_name,
    schema=schema,
    index_params=idx_params,
)
  • insert data to your collection:
# prepare
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
# insert data
client.insert(collection_name=test_collection_name, data=data1)
  • do ann search:
res = client.search(collection_name=test_collection_name, data=[0,0,0], anns_field='embedding', limit=5, output_fields=['id'])
# For example, the result will be:
# [{'id': 112}, {'id': 111}, {'id': 10}, {'id': 11}, {'id': 12}]

SQLAlchemy hybrid mode

  • setup a client:
from pyobvector import *
from sqlalchemy import Column, Integer, JSON
from sqlalchemy import func

client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
  • create a partitioned table with vector index:
# create partitioned table
range_part = ObRangePartition(False, range_part_infos = [
    RangeListPartInfo('p0', 100),
    RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')

cols = [
    Column('id', Integer, primary_key=True, autoincrement=False),
    Column('embedding', VECTOR(3)),
    Column('meta', JSON)
]
client.create_table(test_collection_name, columns=cols, partitions=range_part)

# create vector index
client.create_index(
    test_collection_name,
    is_vec_index=True,
    index_name='vidx',
    column_names=['embedding'],
    vidx_params='distance=l2, type=hnsw, lib=vsag',
)
  • insert data to your collection:
# insert data
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
client.insert(test_collection_name, data=data1)
  • do ann search:
# perform ann search with basic column selection
res = self.client.ann_search(
    test_collection_name,
    vec_data=[0,0,0],
    vec_column_name='embedding',
    distance_func=l2_distance,
    topk=5,
    output_column_names=['id']  # Legacy parameter
)
# For example, the result will be:
# [(112,), (111,), (10,), (11,), (12,)]

# perform ann search with SQLAlchemy expressions (recommended)
from sqlalchemy import Table, text, func

table = Table(test_collection_name, client.metadata_obj, autoload_with=client.engine)
res = self.client.ann_search(
    test_collection_name,
    vec_data=[0,0,0],
    vec_column_name='embedding',
    distance_func=l2_distance,
    topk=5,
    output_columns=[
        table.c.id,
        table.c.meta,
        (table.c.id + 1000).label('id_plus_1000'),
        text("JSON_EXTRACT(meta, '$.key') as extracted_key")
    ]
)
# For example, the result will be:
# [(112, '{"key": "value"}', 1112, 'value'), ...]

# perform ann search with distance threshold (filter results by distance)
res = self.client.ann_search(
    test_collection_name,
    vec_data=[0,0,0],
    vec_column_name='embedding',
    distance_func=l2_distance,
    with_dist=True,
    topk=10,
    output_column_names=['id'],
    distance_threshold=0.5  # Only return results where distance <= 0.5
)
# Only returns results with distance <= 0.5
# For example, the result will be:
# [(10, 0.0), (11, 0.0), ...]  # Only includes results with distance <= 0.5

ann_search Parameters

The ann_search method supports flexible output column selection through the output_columns parameter:

  • output_columns (recommended): Accepts SQLAlchemy Column objects, expressions, or a mix of both

    • Column objects: table.c.id, table.c.name
    • Expressions: (table.c.age + 10).label('age_plus_10')
    • JSON queries: text("JSON_EXTRACT(meta, '$.key') as extracted_key")
    • String functions: func.concat(table.c.name, ' (', table.c.age, ')').label('name_age')
  • output_column_names (legacy): Accepts list of column name strings

    • Example: ['id', 'name', 'meta']
  • Parameter Priority: output_columns takes precedence over output_column_names when both are provided

  • distance_threshold (optional): Filter results by distance threshold

    • Type: Optional[float]
    • Only returns results where distance <= threshold
    • Example: distance_threshold=0.5 returns only results with distance <= 0.5
    • Use case: Quality control for similarity search, only return highly similar results
  • If you want to use pure SQLAlchemy API with OceanBase dialect, you can just get an SQLAlchemy.engine via client.engine. The engine can also be created as following:

import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine

uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
    f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
  • Async engine is also supported:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine

uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
    f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
  • For further usage in pure SQLAlchemy mode, please refer to SQLAlchemy

Embedded SeekDB mode

Use the same ObClient/ObVecClient API with embedded SeekDB (local file, no server). Install the optional dependency:

pip install pyobvector[pyseekdb]
  • connect with path or with an existing pyseekdb.Client:
from pyobvector import SeekdbRemoteClient, ObVecClient
from pyobvector.client.ob_client import ObClient

# Option 1: path to SeekDB data directory
client = SeekdbRemoteClient(path="./seekdb_data", database="test")

# Option 2: use an existing pyseekdb.Client
import pyseekdb
pyseekdb_client = pyseekdb.Client(path="./seekdb_data", database="test")
client = SeekdbRemoteClient(pyseekdb_client=pyseekdb_client)

# Option 3: ObVecClient directly
client = ObVecClient(path="./seekdb_data", db_name="test")

assert isinstance(client, ObVecClient)
assert isinstance(client, ObClient)
  • create table, insert, and ann search (same API as remote):
from sqlalchemy import Column, Integer, VARCHAR
from pyobvector import VECTOR, VectorIndex, l2_distance

client.drop_table_if_exist("vec_table")
client.create_table(
    table_name="vec_table",
    columns=[
        Column("id", Integer, primary_key=True),
        Column("title", VARCHAR(255)),
        Column("vec", VECTOR(3)),
    ],
    indexes=[VectorIndex("vec_idx", "vec", params="distance=l2, type=hnsw, lib=vsag")],
    mysql_organization="heap",
)
client.insert("vec_table", data=[
    {"id": 1, "title": "doc A", "vec": [1.0, 1.0, 1.0]},
    {"id": 2, "title": "doc B", "vec": [1.0, 2.0, 3.0]},
])
res = client.ann_search(
    "vec_table",
    vec_data=[1.0, 2.0, 3.0],
    vec_column_name="vec",
    distance_func=l2_distance,
    with_dist=True,
    topk=5,
    output_column_names=["id", "title"],
)
client.drop_table_if_exist("vec_table")
  • See tests/test_seekdb_embedded.py for more examples.

Hybrid Search Mode

pyobvector supports hybrid search that combines full-text search and vector similarity search, with query syntax compatible with Elasticsearch. This allows you to perform semantic search with both keyword matching and vector similarity in a single query.

  • setup a client:
from pyobvector import *
from pyobvector.client.hybrid_search import HybridSearch
from sqlalchemy import Column, Integer, VARCHAR

client = HybridSearch(uri="127.0.0.1:2881", user="test@test")

Note: Hybrid search requires OceanBase version >= 4.4.1.0, or SeekDB.

  • create a table with both vector index and full-text index:
test_table_name = "hybrid_search_test"

# create table with vector and text columns
client.create_table(
    table_name=test_table_name,
    columns=[
        Column("id", Integer, primary_key=True, autoincrement=False),
        Column("source_id", VARCHAR(32)),
        Column("enabled", Integer),
        Column("vector", VECTOR(3)),  # vector column
        Column("title", VARCHAR(255)),  # text column for full-text search
        Column("content", VARCHAR(255)),  # text column for full-text search
    ],
    indexes=[
        VectorIndex("vec_idx", "vector", params="distance=l2, type=hnsw, lib=vsag"),
    ],
    mysql_charset='utf8mb4',
    mysql_collate='utf8mb4_unicode_ci',
)

# create full-text indexes for text columns
from pyobvector import FtsIndexParam, FtsParser

for col in ["title", "content"]:
    client.create_fts_idx_with_fts_index_param(
        table_name=test_table_name,
        fts_idx_param=FtsIndexParam(
            index_name=f"fts_idx_{col}",
            field_names=[col],
            parser_type=FtsParser.IK,  # or other parser types
        ),
    )
  • insert data:
client.insert(
    table_name=test_table_name,
    data=[
        {
            "id": 1,
            "source_id": "3b767712b57211f09c170242ac130008",
            "enabled": 1,
            "vector": [1, 1, 1],
            "title": "企业版和社区版的功能差异",
            "content": "OceanBase 数据库提供企业版和社区版两种形态。",
        },
        {
            "id": 2,
            "vector": [1, 2, 3],
            "enabled": 1,
            "source_id": "3b791472b57211f09c170242ac130008",
            "title": "快速体验 OceanBase 社区版",
            "content": "本文根据使用场景详细介绍如何快速部署 OceanBase 数据库。",
        },
        # ... more data
    ]
)
  • perform hybrid search with Elasticsearch-compatible query syntax:
# build query body (compatible with Elasticsearch syntax)
query = {
    "bool": {
        "must": [
            {
                "query_string": {
                    "fields": ["title^10", "content"],  # field weights
                    "type": "best_fields",
                    "query": "oceanbase 数据 迁移",
                    "minimum_should_match": "30%",
                    "boost": 1
                }
            }
        ],
        "filter": [
            {
                "terms": {
                    "source_id": [
                        "3b791472b57211f09c170242ac130008",
                        "3b7af31eb57211f09c170242ac130008"
                    ]
                }
            },
            {
                "bool": {
                    "must_not": [
                        {
                            "range": {
                                "enabled": {"lt": 1}
                            }
                        }
                    ]
                }
            }
        ],
        "boost": 0.7
    }
}

body = {
    "query": query,
    "knn": {  # vector similarity search
        "field": "vector",
        "k": 1024,
        "num_candidates": 1024,
        "query_vector": [1, 2, 3],
        "filter": query,  # optional: apply same filter to KNN
        "similarity": 0.2  # similarity threshold
    },
    "from": 0,  # pagination offset
    "size": 60  # pagination size
}

# execute hybrid search
results = client.search(index=test_table_name, body=body)
# results is a list of matching documents

Supported Query Types

The hybrid search supports Elasticsearch-compatible query syntax:

  • bool query: Combine multiple queries with must, must_not, should, filter
  • query_string: Full-text search with field weights, boost, and matching options
  • terms: Exact match filtering for multiple values
  • range: Range queries (lt, lte, gt, gte)
  • knn: Vector similarity search (KNN) with:
    • field: Vector field name
    • query_vector: Query vector
    • k: Number of results to return
    • num_candidates: Number of candidates to consider
    • filter: Optional filter to apply to KNN search
    • similarity: Similarity threshold
  • Pagination: from and size parameters

Get SQL Query

You can also get the actual SQL that will be executed:

sql = client.get_sql(index=test_table_name, body=body)
print(sql)  # prints the SQL query

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyobvector-0.2.28.tar.gz (81.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyobvector-0.2.28-py3-none-any.whl (66.6 kB view details)

Uploaded Python 3

File details

Details for the file pyobvector-0.2.28.tar.gz.

File metadata

  • Download URL: pyobvector-0.2.28.tar.gz
  • Upload date:
  • Size: 81.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyobvector-0.2.28.tar.gz
Algorithm Hash digest
SHA256 d394d66f4a7dc97c221b7584e52b329e3fe61ed96b6f248d7360e3e5a53581ce
MD5 49b7910019b4800617629b49dc936690
BLAKE2b-256 e5e7397eaeab4d52126675a30bcbfe1b74b6f3d3512a9e714966a93f08f3da2e

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyobvector-0.2.28.tar.gz:

Publisher: python-publish.yml on oceanbase/pyobvector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyobvector-0.2.28-py3-none-any.whl.

File metadata

  • Download URL: pyobvector-0.2.28-py3-none-any.whl
  • Upload date:
  • Size: 66.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyobvector-0.2.28-py3-none-any.whl
Algorithm Hash digest
SHA256 36708fc0020307890b9001035904ff14719b81e20e58f2a97f7f3739dad53c72
MD5 cc2fb6a990921a195f62115adf1f6112
BLAKE2b-256 dda68acb9821c78bafc3ff9db5bbea05a8b67ff871b7590b7d25cc91c34fc2f0

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyobvector-0.2.28-py3-none-any.whl:

Publisher: python-publish.yml on oceanbase/pyobvector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page