A python SDK for OceanBase SQL, including JSON Table support and SQLAlchemy dialect extensions.
Project description
pyobsql
A python SDK for OceanBase SQL, including JSON Table support and SQLAlchemy dialect extensions.
Installation
Install from Source
git clone https://github.com/oceanbase/ecology-plugins.git
cd ecology-plugins/pyobsql-oceanbase-plugin
pip install -e .
Install from PyPI
pip install pyobsql
Features
pyobsql provides the following features:
- OceanBase SQL Dialect Parsing: Extended SQLGlot support for OceanBase-specific SQL syntax
- SQLAlchemy Integration: Provides synchronous and asynchronous OceanBase dialect support
- Extended Data Types: Supports VECTOR, SPARSE_VECTOR, ARRAY, POINT and other OceanBase-specific types
- JSON Table Support: Virtual data types and utility functions for handling JSON tables
- Table Structure Reflection: Automatically parses OceanBase table structures
- Partition Support: Supports various partition strategies including Range, Hash, Key, List, etc.
Detailed Usage Guide
1. Connect to Database
from pyobsql.client import ObClient
client = ObClient(
uri="127.0.0.1:2881",
user="root@test",
password="password",
db_name="test"
)
2. Create Tables
2.1 Create Basic Table
from sqlalchemy import Column, Integer, String, JSON
from pyobsql.schema import VECTOR, SPARSE_VECTOR, ARRAY, POINT
columns = [
Column('id', Integer, primary_key=True),
Column('name', String(100)),
Column('embedding', VECTOR(128)),
Column('sparse_vec', SPARSE_VECTOR),
Column('tags', ARRAY(String(50))),
Column('location', POINT(srid=4326)),
Column('metadata', JSON)
]
client.create_table(
table_name='my_table',
columns=columns
)
2.2 Create Partitioned Table
from pyobsql.client.partitions import ObRangePartition, RangeListPartInfo
range_partition = ObRangePartition(
is_range_columns=False,
range_part_infos=[
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 200),
RangeListPartInfo('p2', 'MAXVALUE')
],
range_expr='id'
)
columns = [
Column('id', Integer, primary_key=True),
Column('name', String(100)),
Column('embedding', VECTOR(128))
]
client.create_table(
table_name='partitioned_table',
columns=columns,
partitions=range_partition
)
2.3 Create Table with Indexes
from sqlalchemy import Index
indexes = [
Index('idx_name', 'name'),
Index('idx_embedding', 'embedding', postgresql_using='hnsw')
]
client.create_table(
table_name='indexed_table',
columns=columns,
indexes=indexes
)
3. Insert Data
3.1 Insert Single Record
client.insert(
table_name='my_table',
data={
'id': 1,
'name': 'example',
'embedding': [0.1, 0.2, 0.3, ...],
'sparse_vec': {1: 0.5, 5: 0.8, 10: 0.3},
'tags': ['tag1', 'tag2', 'tag3'],
'location': (116.3974, 39.9093),
'metadata': {'key': 'value'}
}
)
3.2 Batch Insert Data
data_list = [
{
'id': i,
'name': f'item_{i}',
'embedding': [0.1 * i, 0.2 * i, 0.3 * i, ...],
'tags': [f'tag_{i}'],
'metadata': {'index': i}
}
for i in range(100)
]
client.insert(
table_name='my_table',
data=data_list
)
3.3 Insert to Specified Partition
client.insert(
table_name='partitioned_table',
data={'id': 50, 'name': 'test', 'embedding': [0.1, 0.2, ...]},
partition_name='p0'
)
3.4 Use REPLACE INTO (Insert or Replace)
from pyobsql.schema import ReplaceStmt
from sqlalchemy import Table
table = Table('my_table', client.metadata_obj, autoload_with=client.engine)
with client.engine.connect() as conn:
with conn.begin():
stmt = ReplaceStmt(table).values({
'id': 1,
'name': 'updated_name',
'embedding': [0.5, 0.6, ...]
})
conn.execute(stmt)
client.upsert(
table_name='my_table',
data={'id': 1, 'name': 'updated_name', 'embedding': [0.5, 0.6, ...]}
)
4. Update Data
from sqlalchemy import Table
table = Table('my_table', client.metadata_obj, autoload_with=client.engine)
client.update(
table_name='my_table',
values_clause=[table.c.name == 'new_name'],
where_clause=[table.c.id == 1]
)
client.update(
table_name='my_table',
values_clause=[
table.c.name == 'updated_name',
table.c.metadata == {'status': 'updated'}
],
where_clause=[table.c.id.in_([1, 2, 3])]
)
client.update(
table_name='partitioned_table',
values_clause=[table.c.name == 'new_name'],
where_clause=[table.c.id == 50],
partition_name='p0'
)
5. Delete Data
client.delete(
table_name='my_table',
ids=1
)
client.delete(
table_name='my_table',
ids=[1, 2, 3]
)
from sqlalchemy import Table
table = Table('my_table', client.metadata_obj, autoload_with=client.engine)
client.delete(
table_name='my_table',
where_clause=[table.c.name == 'old_name']
)
client.delete(
table_name='partitioned_table',
ids=50,
partition_name='p0'
)
6. Query Data
6.1 Basic Queries
from sqlalchemy import Table, select
table = Table('my_table', client.metadata_obj, autoload_with=client.engine)
result = client.get(table_name='my_table')
for row in result:
print(row)
result = client.get(
table_name='my_table',
ids=1
)
result = client.get(
table_name='my_table',
ids=[1, 2, 3]
)
result = client.get(
table_name='my_table',
where_clause=[table.c.name == 'example']
)
result = client.get(
table_name='my_table',
output_column_name=['id', 'name', 'embedding']
)
result = client.get(
table_name='my_table',
n_limits=10
)
result = client.get(
table_name='partitioned_table',
partition_names=['p0', 'p1']
)
6.2 Use SQLAlchemy Native Queries
from sqlalchemy import Table, select, func, text
table = Table('my_table', client.metadata_obj, autoload_with=client.engine)
stmt = select(
table.c.id,
table.c.name,
func.json_extract(table.c.metadata, '$.key').label('extracted_key')
).where(
table.c.id > 10
).order_by(
table.c.id.desc()
).limit(10)
with client.engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(row)
7. JSON Table Support
7.1 JSON Table Virtual Data Types
from pyobsql import (
JsonTableBool,
JsonTableInt,
JsonTableTimestamp,
JsonTableVarcharFactory,
JsonTableDecimalFactory,
val2json
)
bool_type = JsonTableBool(val=True)
int_type = JsonTableInt(val=42)
varchar_factory = JsonTableVarcharFactory(length=255)
varchar_type = varchar_factory.get_json_table_varchar_type()(val="test")
decimal_factory = JsonTableDecimalFactory(precision=10, scale=2)
decimal_type = decimal_factory.get_json_table_decimal_type()(val=123.45)
json_value = val2json(bool_type)
7.2 Use json_value Function
from pyobsql import json_value
from sqlalchemy import Table, select
table = Table('my_table', client.metadata_obj, autoload_with=client.engine)
stmt = select(
table.c.id,
json_value(table.c.metadata, '$.key', 'VARCHAR(100)').label('extracted_value')
).where(
table.c.id == 1
)
with client.engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(row)
8. SQL Parsing (SQLGlot)
8.1 Parse OceanBase SQL
from pyobsql import OceanBase
from sqlglot import parse_one, transpile
sql = "ALTER TABLE t2 CHANGE COLUMN c2 changed_col INT"
ast = parse_one(sql, dialect=OceanBase)
print(ast)
sql = "ALTER TABLE t1 MODIFY COLUMN c1 VARCHAR(100) NOT NULL"
ast = parse_one(sql, dialect=OceanBase)
sql = "ALTER TABLE t1 DROP COLUMN c1"
ast = parse_one(sql, dialect=OceanBase)
sql = "SELECT * FROM table1"
mysql_sql = transpile(sql, read=OceanBase, write="mysql")[0]
9. Data Type Details
9.1 VECTOR (Vector Type)
from pyobsql.schema import VECTOR
from pyobsql.util import Vector
column = Column('embedding', VECTOR(128))
vector_data = [0.1, 0.2, 0.3, ...]
vector_obj = Vector(vector_data)
client.insert(
table_name='vector_table',
data={'id': 1, 'embedding': vector_data}
)
9.2 SPARSE_VECTOR (Sparse Vector Type)
from pyobsql.schema import SPARSE_VECTOR
from pyobsql.util import SparseVector
column = Column('sparse_vec', SPARSE_VECTOR)
sparse_data = {1: 0.5, 5: 0.8, 10: 0.3}
client.insert(
table_name='sparse_table',
data={'id': 1, 'sparse_vec': sparse_data}
)
9.3 ARRAY (Array Type)
from pyobsql.schema import ARRAY
from sqlalchemy import String, Integer
tags_column = Column('tags', ARRAY(String(50)))
scores_column = Column('scores', ARRAY(Integer))
nested_array = Column('matrix', ARRAY(ARRAY(Integer)))
client.insert(
table_name='array_table',
data={
'id': 1,
'tags': ['tag1', 'tag2', 'tag3'],
'scores': [100, 200, 300],
'matrix': [[1, 2], [3, 4]]
}
)
9.4 POINT (Geographic Coordinate Point Type)
from pyobsql.schema import POINT
location_column = Column('location', POINT(srid=4326))
client.insert(
table_name='location_table',
data={
'id': 1,
'location': (116.3974, 39.9093)
}
)
from pyobsql.schema import ST_GeomFromText, st_distance, st_dwithin
table = Table('location_table', client.metadata_obj, autoload_with=client.engine)
stmt = select(
table.c.id,
st_distance(
table.c.location,
ST_GeomFromText('POINT(116.3974 39.9093)', 4326)
).label('distance')
).where(
st_dwithin(
table.c.location,
ST_GeomFromText('POINT(116.3974 39.9093)', 4326),
1000
)
)
10. Table Structure Management
10.1 Drop Table
client.drop_table_if_exist('my_table')
10.2 Drop Index
client.drop_index(table_name='my_table', index_name='idx_name')
10.3 Refresh Metadata
client.refresh_metadata()
client.refresh_metadata(tables=['my_table', 'other_table'])
11. Async Operations (Optional)
from pyobsql.schema import AsyncOceanBaseDialect
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"mysql+aiomysql://user:password@127.0.0.1:2881/dbname",
dialect=AsyncOceanBaseDialect()
)
async with engine.connect() as conn:
result = await conn.execute(select(table))
rows = result.fetchall()
Complete Example
from pyobsql.client import ObClient
from pyobsql.client.partitions import ObRangePartition, RangeListPartInfo
from sqlalchemy import Column, Integer, String, JSON, Table
from pyobsql.schema import VECTOR, ARRAY
client = ObClient(
uri="127.0.0.1:2881",
user="root@test",
password="password",
db_name="test"
)
partition = ObRangePartition(
is_range_columns=False,
range_part_infos=[
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'MAXVALUE')
],
range_expr='id'
)
columns = [
Column('id', Integer, primary_key=True),
Column('name', String(100)),
Column('embedding', VECTOR(128)),
Column('tags', ARRAY(String(50))),
Column('metadata', JSON)
]
client.create_table('products', columns=columns, partitions=partition)
data = [
{
'id': i,
'name': f'product_{i}',
'embedding': [0.1 * i] * 128,
'tags': [f'tag_{i}', f'category_{i % 5}'],
'metadata': {'price': i * 10, 'stock': 100 - i}
}
for i in range(50)
]
client.insert('products', data)
table = Table('products', client.metadata_obj, autoload_with=client.engine)
result = client.get(
table_name='products',
where_clause=[table.c.id < 10],
output_column_name=['id', 'name', 'tags']
)
for row in result:
print(f"ID: {row.id}, Name: {row.name}, Tags: {row.tags}")
client.update(
table_name='products',
values_clause=[table.c.metadata == {'price': 999, 'stock': 50}],
where_clause=[table.c.id == 1]
)
client.delete(table_name='products', ids=[1, 2, 3])
License
Apache-2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pyobsql-0.1.0.tar.gz
(31.2 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
pyobsql-0.1.0-py3-none-any.whl
(30.9 kB
view details)
File details
Details for the file pyobsql-0.1.0.tar.gz.
File metadata
- Download URL: pyobsql-0.1.0.tar.gz
- Upload date:
- Size: 31.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a93d4fcb847dcf119b3d96b1ecc1eddd08564e753f675068545d42923c85f821
|
|
| MD5 |
d8190ae92c9f455b089d0167bf4a6181
|
|
| BLAKE2b-256 |
e08b6b992f529d0c3c4e9c1568dadd966d7d18e9d52d034d6f6bbb07cb48e925
|
File details
Details for the file pyobsql-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pyobsql-0.1.0-py3-none-any.whl
- Upload date:
- Size: 30.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d4afadba9d91d6fa58cf79fc6976c35ac3d9039022750d18075726f306c0624
|
|
| MD5 |
d6d340bb28cd113060aad7c75d998d14
|
|
| BLAKE2b-256 |
fc4831c92ea735d8d593f652a911a37c46574478ab30ffe5382ae1b0598a9a39
|