Skip to main content

A python SDK for OceanBase SQL, including JSON Table support and SQLAlchemy dialect extensions.

Project description

pyobsql

A python SDK for OceanBase SQL, including JSON Table support and SQLAlchemy dialect extensions.

Installation

Install from Source

git clone https://github.com/oceanbase/ecology-plugins.git
cd ecology-plugins/pyobsql-oceanbase-plugin
pip install -e .

Install from PyPI

pip install pyobsql

Features

pyobsql provides the following features:

  • OceanBase SQL Dialect Parsing: Extended SQLGlot support for OceanBase-specific SQL syntax
  • SQLAlchemy Integration: Provides synchronous and asynchronous OceanBase dialect support
  • Extended Data Types: Supports VECTOR, SPARSE_VECTOR, ARRAY, POINT and other OceanBase-specific types
  • JSON Table Support: Virtual data types and utility functions for handling JSON tables
  • Table Structure Reflection: Automatically parses OceanBase table structures
  • Partition Support: Supports various partition strategies including Range, Hash, Key, List, etc.

Detailed Usage Guide

1. Connect to Database

from pyobsql.client import ObClient

client = ObClient(
    uri="127.0.0.1:2881",
    user="root@test",
    password="password",
    db_name="test"
)

2. Create Tables

2.1 Create Basic Table

from sqlalchemy import Column, Integer, String, JSON
from pyobsql.schema import VECTOR, SPARSE_VECTOR, ARRAY, POINT

columns = [
    Column('id', Integer, primary_key=True),
    Column('name', String(100)),
    Column('embedding', VECTOR(128)),
    Column('sparse_vec', SPARSE_VECTOR),
    Column('tags', ARRAY(String(50))),
    Column('location', POINT(srid=4326)),
    Column('metadata', JSON)
]

client.create_table(
    table_name='my_table',
    columns=columns
)

2.2 Create Partitioned Table

from pyobsql.client.partitions import ObRangePartition, RangeListPartInfo

range_partition = ObRangePartition(
    is_range_columns=False,
    range_part_infos=[
        RangeListPartInfo('p0', 100),
        RangeListPartInfo('p1', 200),
        RangeListPartInfo('p2', 'MAXVALUE')
    ],
    range_expr='id'
)

columns = [
    Column('id', Integer, primary_key=True),
    Column('name', String(100)),
    Column('embedding', VECTOR(128))
]

client.create_table(
    table_name='partitioned_table',
    columns=columns,
    partitions=range_partition
)

2.3 Create Table with Indexes

from sqlalchemy import Index

indexes = [
    Index('idx_name', 'name'),
    Index('idx_embedding', 'embedding', postgresql_using='hnsw')
]

client.create_table(
    table_name='indexed_table',
    columns=columns,
    indexes=indexes
)

3. Insert Data

3.1 Insert Single Record

client.insert(
    table_name='my_table',
    data={
        'id': 1,
        'name': 'example',
        'embedding': [0.1, 0.2, 0.3, ...],
        'sparse_vec': {1: 0.5, 5: 0.8, 10: 0.3},
        'tags': ['tag1', 'tag2', 'tag3'],
        'location': (116.3974, 39.9093),
        'metadata': {'key': 'value'}
    }
)

3.2 Batch Insert Data

data_list = [
    {
        'id': i,
        'name': f'item_{i}',
        'embedding': [0.1 * i, 0.2 * i, 0.3 * i, ...],
        'tags': [f'tag_{i}'],
        'metadata': {'index': i}
    }
    for i in range(100)
]

client.insert(
    table_name='my_table',
    data=data_list
)

3.3 Insert to Specified Partition

client.insert(
    table_name='partitioned_table',
    data={'id': 50, 'name': 'test', 'embedding': [0.1, 0.2, ...]},
    partition_name='p0'
)

3.4 Use REPLACE INTO (Insert or Replace)

from pyobsql.schema import ReplaceStmt
from sqlalchemy import Table

table = Table('my_table', client.metadata_obj, autoload_with=client.engine)
with client.engine.connect() as conn:
    with conn.begin():
        stmt = ReplaceStmt(table).values({
            'id': 1,
            'name': 'updated_name',
            'embedding': [0.5, 0.6, ...]
        })
        conn.execute(stmt)

client.upsert(
    table_name='my_table',
    data={'id': 1, 'name': 'updated_name', 'embedding': [0.5, 0.6, ...]}
)

4. Update Data

from sqlalchemy import Table

table = Table('my_table', client.metadata_obj, autoload_with=client.engine)

client.update(
    table_name='my_table',
    values_clause=[table.c.name == 'new_name'],
    where_clause=[table.c.id == 1]
)

client.update(
    table_name='my_table',
    values_clause=[
        table.c.name == 'updated_name',
        table.c.metadata == {'status': 'updated'}
    ],
    where_clause=[table.c.id.in_([1, 2, 3])]
)

client.update(
    table_name='partitioned_table',
    values_clause=[table.c.name == 'new_name'],
    where_clause=[table.c.id == 50],
    partition_name='p0'
)

5. Delete Data

client.delete(
    table_name='my_table',
    ids=1
)

client.delete(
    table_name='my_table',
    ids=[1, 2, 3]
)

from sqlalchemy import Table

table = Table('my_table', client.metadata_obj, autoload_with=client.engine)
client.delete(
    table_name='my_table',
    where_clause=[table.c.name == 'old_name']
)

client.delete(
    table_name='partitioned_table',
    ids=50,
    partition_name='p0'
)

6. Query Data

6.1 Basic Queries

from sqlalchemy import Table, select

table = Table('my_table', client.metadata_obj, autoload_with=client.engine)

result = client.get(table_name='my_table')
for row in result:
    print(row)

result = client.get(
    table_name='my_table',
    ids=1
)

result = client.get(
    table_name='my_table',
    ids=[1, 2, 3]
)

result = client.get(
    table_name='my_table',
    where_clause=[table.c.name == 'example']
)

result = client.get(
    table_name='my_table',
    output_column_name=['id', 'name', 'embedding']
)

result = client.get(
    table_name='my_table',
    n_limits=10
)

result = client.get(
    table_name='partitioned_table',
    partition_names=['p0', 'p1']
)

6.2 Use SQLAlchemy Native Queries

from sqlalchemy import Table, select, func, text

table = Table('my_table', client.metadata_obj, autoload_with=client.engine)

stmt = select(
    table.c.id,
    table.c.name,
    func.json_extract(table.c.metadata, '$.key').label('extracted_key')
).where(
    table.c.id > 10
).order_by(
    table.c.id.desc()
).limit(10)

with client.engine.connect() as conn:
    result = conn.execute(stmt)
    for row in result:
        print(row)

7. JSON Table Support

7.1 JSON Table Virtual Data Types

from pyobsql import (
    JsonTableBool,
    JsonTableInt,
    JsonTableTimestamp,
    JsonTableVarcharFactory,
    JsonTableDecimalFactory,
    val2json
)

bool_type = JsonTableBool(val=True)
int_type = JsonTableInt(val=42)
varchar_factory = JsonTableVarcharFactory(length=255)
varchar_type = varchar_factory.get_json_table_varchar_type()(val="test")
decimal_factory = JsonTableDecimalFactory(precision=10, scale=2)
decimal_type = decimal_factory.get_json_table_decimal_type()(val=123.45)

json_value = val2json(bool_type)

7.2 Use json_value Function

from pyobsql import json_value
from sqlalchemy import Table, select

table = Table('my_table', client.metadata_obj, autoload_with=client.engine)

stmt = select(
    table.c.id,
    json_value(table.c.metadata, '$.key', 'VARCHAR(100)').label('extracted_value')
).where(
    table.c.id == 1
)

with client.engine.connect() as conn:
    result = conn.execute(stmt)
    for row in result:
        print(row)

8. SQL Parsing (SQLGlot)

8.1 Parse OceanBase SQL

from pyobsql import OceanBase
from sqlglot import parse_one, transpile

sql = "ALTER TABLE t2 CHANGE COLUMN c2 changed_col INT"
ast = parse_one(sql, dialect=OceanBase)
print(ast)

sql = "ALTER TABLE t1 MODIFY COLUMN c1 VARCHAR(100) NOT NULL"
ast = parse_one(sql, dialect=OceanBase)

sql = "ALTER TABLE t1 DROP COLUMN c1"
ast = parse_one(sql, dialect=OceanBase)

sql = "SELECT * FROM table1"
mysql_sql = transpile(sql, read=OceanBase, write="mysql")[0]

9. Data Type Details

9.1 VECTOR (Vector Type)

from pyobsql.schema import VECTOR
from pyobsql.util import Vector

column = Column('embedding', VECTOR(128))

vector_data = [0.1, 0.2, 0.3, ...]
vector_obj = Vector(vector_data)

client.insert(
    table_name='vector_table',
    data={'id': 1, 'embedding': vector_data}
)

9.2 SPARSE_VECTOR (Sparse Vector Type)

from pyobsql.schema import SPARSE_VECTOR
from pyobsql.util import SparseVector

column = Column('sparse_vec', SPARSE_VECTOR)

sparse_data = {1: 0.5, 5: 0.8, 10: 0.3}

client.insert(
    table_name='sparse_table',
    data={'id': 1, 'sparse_vec': sparse_data}
)

9.3 ARRAY (Array Type)

from pyobsql.schema import ARRAY
from sqlalchemy import String, Integer

tags_column = Column('tags', ARRAY(String(50)))
scores_column = Column('scores', ARRAY(Integer))
nested_array = Column('matrix', ARRAY(ARRAY(Integer)))

client.insert(
    table_name='array_table',
    data={
        'id': 1,
        'tags': ['tag1', 'tag2', 'tag3'],
        'scores': [100, 200, 300],
        'matrix': [[1, 2], [3, 4]]
    }
)

9.4 POINT (Geographic Coordinate Point Type)

from pyobsql.schema import POINT

location_column = Column('location', POINT(srid=4326))

client.insert(
    table_name='location_table',
    data={
        'id': 1,
        'location': (116.3974, 39.9093)
    }
)

from pyobsql.schema import ST_GeomFromText, st_distance, st_dwithin

table = Table('location_table', client.metadata_obj, autoload_with=client.engine)
stmt = select(
    table.c.id,
    st_distance(
        table.c.location,
        ST_GeomFromText('POINT(116.3974 39.9093)', 4326)
    ).label('distance')
).where(
    st_dwithin(
        table.c.location,
        ST_GeomFromText('POINT(116.3974 39.9093)', 4326),
        1000
    )
)

10. Table Structure Management

10.1 Drop Table

client.drop_table_if_exist('my_table')

10.2 Drop Index

client.drop_index(table_name='my_table', index_name='idx_name')

10.3 Refresh Metadata

client.refresh_metadata()

client.refresh_metadata(tables=['my_table', 'other_table'])

11. Async Operations (Optional)

from pyobsql.schema import AsyncOceanBaseDialect
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "mysql+aiomysql://user:password@127.0.0.1:2881/dbname",
    dialect=AsyncOceanBaseDialect()
)

async with engine.connect() as conn:
    result = await conn.execute(select(table))
    rows = result.fetchall()

Complete Example

from pyobsql.client import ObClient
from pyobsql.client.partitions import ObRangePartition, RangeListPartInfo
from sqlalchemy import Column, Integer, String, JSON, Table
from pyobsql.schema import VECTOR, ARRAY

client = ObClient(
    uri="127.0.0.1:2881",
    user="root@test",
    password="password",
    db_name="test"
)

partition = ObRangePartition(
    is_range_columns=False,
    range_part_infos=[
        RangeListPartInfo('p0', 100),
        RangeListPartInfo('p1', 'MAXVALUE')
    ],
    range_expr='id'
)

columns = [
    Column('id', Integer, primary_key=True),
    Column('name', String(100)),
    Column('embedding', VECTOR(128)),
    Column('tags', ARRAY(String(50))),
    Column('metadata', JSON)
]

client.create_table('products', columns=columns, partitions=partition)

data = [
    {
        'id': i,
        'name': f'product_{i}',
        'embedding': [0.1 * i] * 128,
        'tags': [f'tag_{i}', f'category_{i % 5}'],
        'metadata': {'price': i * 10, 'stock': 100 - i}
    }
    for i in range(50)
]
client.insert('products', data)

table = Table('products', client.metadata_obj, autoload_with=client.engine)
result = client.get(
    table_name='products',
    where_clause=[table.c.id < 10],
    output_column_name=['id', 'name', 'tags']
)

for row in result:
    print(f"ID: {row.id}, Name: {row.name}, Tags: {row.tags}")

client.update(
    table_name='products',
    values_clause=[table.c.metadata == {'price': 999, 'stock': 50}],
    where_clause=[table.c.id == 1]
)

client.delete(table_name='products', ids=[1, 2, 3])

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyobsql-0.1.0.tar.gz (31.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyobsql-0.1.0-py3-none-any.whl (30.9 kB view details)

Uploaded Python 3

File details

Details for the file pyobsql-0.1.0.tar.gz.

File metadata

  • Download URL: pyobsql-0.1.0.tar.gz
  • Upload date:
  • Size: 31.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pyobsql-0.1.0.tar.gz
Algorithm Hash digest
SHA256 a93d4fcb847dcf119b3d96b1ecc1eddd08564e753f675068545d42923c85f821
MD5 d8190ae92c9f455b089d0167bf4a6181
BLAKE2b-256 e08b6b992f529d0c3c4e9c1568dadd966d7d18e9d52d034d6f6bbb07cb48e925

See more details on using hashes here.

File details

Details for the file pyobsql-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: pyobsql-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 30.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pyobsql-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8d4afadba9d91d6fa58cf79fc6976c35ac3d9039022750d18075726f306c0624
MD5 d6d340bb28cd113060aad7c75d998d14
BLAKE2b-256 fc4831c92ea735d8d593f652a911a37c46574478ab30ffe5382ae1b0598a9a39

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page