SQLAlchemy dialects for multiple databases
Project description
ns-sqlalchemy
SQLAlchemy dialects for multiple databases.
Overview
ns-sqlalchemy is a Python package that provides SQLAlchemy dialect implementations for various databases. Currently, it supports:
- ClickHouse: Full dialect support with HTTP, Native, and asynch drivers
- Dameng (DM): Full dialect support with dmPython and dmAsync drivers
Features
ClickHouse
- Multiple Drivers: HTTP (
clickhouse+http://), Native (clickhouse+native://), asynch (clickhouse+asynch://) - Full Type Support: Int8–UInt256, Float32/64, Decimal, Date/Date32, DateTime/DateTime64, Enum8/16, Array, Nullable, LowCardinality, Tuple, Map, Nested, IPv4/IPv6, UUID, AggregateFunction, SimpleAggregateFunction
- DDL Support: MergeTree-family engines (PARTITION BY, ORDER BY, PRIMARY KEY, SAMPLE BY, TTL, SETTINGS), Replicated engines, Distributed, Buffer, MaterializedView, column CODEC/MATERIALIZED/ALIAS/AFTER
- SQL Extensions: FINAL, SAMPLE, LIMIT BY, ARRAY JOIN, WITH CUBE/ROLLUP/TOTALS, JOIN strictness/distribution, ALTER TABLE DELETE/UPDATE/ADD/DROP/MODIFY/RENAME
- Alembic Integration: Full migration support including Materialized Views
- ORM Support: Custom Query class with ClickHouse-specific methods, declarative base integration
- Reflection: Inspect tables, views, materialized views
Dameng
- Synchronous Driver:
dmPythondriver for synchronous operations - Asynchronous Driver:
dmAsyncdriver for async operations - Advanced Type Support: NUMBER, VARCHAR2, NVARCHAR2, CHAR, DATE, DATETIME, TIMESTAMP, INTERVAL, BLOB, CLOB, NCLOB, BFILE
- Vector Search: Built-in support for vector similarity search with IVF and HNSW indexes
- JSON Support: JSON data type with path indexing
- Compatibility Modes: DM, MySQL, TSQL, and Oracle compatibility modes
Installation
# ClickHouse dialect (no extra dependencies beyond sqlalchemy + httpx)
pip install ns-sqlalchemy
# With Dameng support
pip install ns-sqlalchemy[dameng]
# For ClickHouse native driver
pip install clickhouse-driver
# For ClickHouse asynch driver
pip install clickhouse-driver asyncio
Usage
Basic Usage
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, Session
from ns_sqlalchemy.dameng import dialect
# Create engine with Dameng dialect
engine = create_engine(
"dm+dmPython://user:password@host:port/database",
dialect_options={"driver": "dmPython"}
)
# Create declarative base
Base = declarative_base()
# Define a model
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
# Create tables
Base.metadata.create_all(engine)
# Use with session
with Session(engine) as session:
# Insert data
new_user = User(name="John Doe", email="john@example.com")
session.add(new_user)
session.commit()
# Query data
users = session.query(User).filter(User.name == "John Doe").all()
Vector Search
from ns_sqlalchemy.dameng import VectorWordSeek
# Create vector search instance
vector_search = VectorWordSeek(
connection_str="dm+dmPython://user:password@host:port/database",
table_name="documents",
vector_dim=128,
drop_if_existing=False,
)
# Insert vectors
ids = vector_search.insert(
texts=["Hello world", "Goodbye world"],
metadatas=[{"source": "test"}, {"source": "test"}]
)
# Query similar vectors
results = vector_search.query(
DistanceMetric="COSINE",
query_vector=[0.1, 0.2, 0.3, ...],
count=5,
filter={"source": "test"}
)
ClickHouse Usage
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import Session
from ns_sqlalchemy.clickhouse import engines, types
from ns_sqlalchemy.clickhouse.sql.schema import Table
from ns_sqlalchemy.clickhouse.sql.ddl import CreateTable
# Create engine with ClickHouse HTTP driver
engine = create_engine("clickhouse+http://user:password@localhost:8123/default")
# Define a table with ClickHouse-specific engine
table = Table(
"events",
MetaData(),
Column("event_id", String, primary_key=True),
Column("timestamp", DateTime),
Column("value", Float32),
engine=engines.MergeTree(
order_by=func.tuple("event_id"),
partition_by="toYYYYMM(timestamp)",
),
)
# Create table
CreateTable(table).execute(engine)
# Insert data with engine
with engine.connect() as conn:
conn.execute(
table.insert(),
{"event_id": "abc", "timestamp": datetime.now(), "value": 1.5},
)
# Query with ClickHouse-specific features
from sqlalchemy import select, func
from ns_sqlalchemy.clickhouse.sql.selectable import Select
s = (
Select(table)
.final()
.sample(0.1)
.limit_by(10, table.event_id)
)
Async ClickHouse
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
engine = create_async_engine(
"clickhouse+asynch://user:password@localhost:9000/default"
)
Alembic Integration
# In alembic/env.py:
from ns_sqlalchemy.clickhouse.alembic.dialect import (
ClickHouseDialectImpl,
patch_alembic_version,
include_object,
)
# Register the ClickHouse dialect implementation
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object,
)
# For clustered ClickHouse:
# patch_alembic_version(context, cluster="my_cluster")
Dameng Async Usage
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base
async def main():
# Create async engine
engine = create_async_engine(
"dm+dmAsync://user:password@host:port/database",
echo=True
)
# Create declarative base
Base = declarative_base()
# Define a model
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
name = Column(String(100))
price = Column(Float)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Use with async session
async with AsyncSession(engine) as session:
# Insert data
new_product = Product(name="Laptop", price=999.99)
session.add(new_product)
await session.commit()
# Query data
result = await session.execute(
select(Product).filter(Product.price > 500)
)
products = result.scalars().all()
if __name__ == "__main__":
asyncio.run(main())
Supported Databases
ClickHouse
- URL Schemes:
clickhouse://,clickhouse+http://,clickhouse+native://,clickhouse+asynch:// - Drivers: HTTP (
httpx), Native (clickhouse-driver), asynch (clickhouse-driverasync) - Engine Support: MergeTree, ReplacingMergeTree, AggregatingMergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree, SummingMergeTree, ReplicatedMergeTree, Distributed, Buffer, MaterializedView, and more
- SQL Extensions: FINAL, SAMPLE, LIMIT BY, ARRAY JOIN, WITH CUBE/ROLLUP/TOTALS, ALTER TABLE, ON CLUSTER
- Alembic: Full migration support for ClickHouse including Materialized Views
Dameng (DM)
- Driver:
dmPython(synchronous) anddmAsync(asynchronous) - Compatibility Modes: DM, MySQL, TSQL, Oracle
- Features:
- Advanced type support
- Vector similarity search
- JSON support
- High-performance operations
Type Support
ClickHouse Types
| SQLAlchemy Type | ClickHouse Type | Notes |
|---|---|---|
types.Int8 |
Int8 |
Signed 8-bit |
types.Int16 |
Int16 |
Signed 16-bit |
types.Int32 |
Int32 |
Signed 32-bit |
types.Int64 |
Int64 |
Signed 64-bit |
types.Int128 |
Int128 |
Signed 128-bit |
types.Int256 |
Int256 |
Signed 256-bit |
types.UInt8 |
UInt8 |
Unsigned 8-bit |
types.UInt16 |
UInt16 |
Unsigned 16-bit |
types.UInt32 |
UInt32 |
Unsigned 32-bit |
types.UInt64 |
UInt64 |
Unsigned 64-bit |
types.Float32 |
Float32 |
32-bit float |
types.Float64 |
Float64 |
64-bit float |
types.String |
String |
Variable-length string |
types.FixedString(n) |
FixedString(n) |
Fixed-length string |
types.Date |
Date |
Date |
types.Date32 |
Date32 |
Extended range date |
types.DateTime |
DateTime |
DateTime |
types.DateTime64(p) |
DateTime64(p) |
DateTime with precision |
types.Decimal(p,s) |
Decimal(p,s) |
Fixed-point number |
types.Boolean |
Boolean |
Boolean |
types.Enum8 |
Enum8 |
8-bit enum |
types.Enum16 |
Enum16 |
16-bit enum |
types.UUID |
UUID |
UUID |
types.IPv4 |
IPv4 |
IPv4 address |
types.IPv6 |
IPv6 |
IPv6 address |
types.Array(t) |
Array(t) |
Array of type |
types.Nullable(t) |
Nullable(t) |
Nullable wrapper |
types.LowCardinality(t) |
LowCardinality(t) |
Low cardinality wrapper |
types.Tuple(...) |
Tuple(...) |
Named tuple |
types.Map(k,v) |
Map(k,v) |
Key-value map |
types.Nested(...) |
Nested(...) |
Nested structure |
types.AggregateFunction(f,t) |
AggregateFunction(f,t) |
Aggregate function type |
types.SimpleAggregateFunction(f,t) |
SimpleAggregateFunction(f,t) |
Simple aggregate type |
Dameng Scalar Types
| SQLAlchemy Type | Dameng Type | Notes |
|---|---|---|
NUMBER |
NUMBER | Supports precision and scale |
VARCHAR2 |
VARCHAR2 | Variable character |
NVARCHAR2 |
NVARCHAR2 | National character varying |
CHAR |
CHAR | Fixed character |
DATE |
DATE | Date only |
DATETIME |
DATETIME | Date and time |
TIMESTAMP |
TIMESTAMP | Timestamp with timezone support |
INTERVAL |
INTERVAL | Time interval |
BLOB |
BLOB | Binary large object |
CLOB |
CLOB | Character large object |
NCLOB |
NCLOB | National character large object |
BFILE |
BFILE | Binary file |
Vector Types
VECTOR(dim=128, format="FLOAT32"): Vector type for similarity search- Supports L1, L2, cosine, dot product, and Hamming distances
JSON Types
JSON: JSON data typeJSONIndexType: JSON path index typeJSONPathType: JSON path type
Configuration
Connection Parameters
engine = create_engine(
"dm+dmPython://user:password@host:port/database?charset=utf8",
dialect_options={
"driver": "dmPython",
"auto_convert_lobs": True,
"coerce_to_decimal": True,
"arraysize": 50,
"connection_timeout": 30,
}
)
Compatibility Modes
# MySQL compatibility
engine = create_engine(
"dm+dmPython://user:password@host:port/database?parse_type=MYSQL",
dialect_options={"compatible_mode": "MYSQL"}
)
# TSQL compatibility
engine = create_engine(
"dm+dmPython://user:password@host:port/database?parse_type=TSQL",
dialect_options={"compatible_mode": "TSQL"}
)
# Oracle compatibility
engine = create_engine(
"dm+dmPython://user:password@host:port/database",
dialect_options={"compatible_mode": "ORACLE"}
)
Vector Index Operations
Create IVF Index
from ns_sqlalchemy.dameng import VectorAdaptor
vector_adaptor = VectorAdaptor(engine)
# Create IVF index
vector_adaptor.create_vector_ivf_index(
column=Product.embedding,
metric_name="COSINE",
percentage_value=90,
num_of_partitions=100,
)
Create HNSW Index
# Create HNSW index
vector_adaptor.create_vector_hnsw_index(
column=Product.embedding,
metric_name="COSINE",
percentage_value=90,
max_connection=16,
ef_construction=200,
)
Testing
Run the tests with pytest:
pytest tests/
Or run specific test suites:
# ClickHouse tests
pytest tests/test_clickhouse/
pytest tests/test_clickhouse/test_sql.py
pytest tests/test_clickhouse/test_ddl.py
# Dameng tests
pytest tests/test_dameng/
pytest tests/test_dameng/test_dialect.py
pytest tests/test_dameng/test_vector.py
Development
Project Structure
ns-sqlalchemy/
├── src/
│ └── ns_sqlalchemy/
│ ├── __init__.py
│ ├── clickhouse/
│ │ ├── __init__.py
│ │ ├── alembic/
│ │ │ ├── comparators.py
│ │ │ ├── dialect.py
│ │ │ ├── operations.py
│ │ │ ├── renderers.py
│ │ │ └── toimpl.py
│ │ ├── drivers/
│ │ │ ├── base.py
│ │ │ ├── compilers/
│ │ │ │ ├── ddlcompiler.py
│ │ │ │ ├── sqlcompiler.py
│ │ │ │ └── typecompiler.py
│ │ │ ├── http/
│ │ │ │ └── base.py
│ │ │ ├── native/
│ │ │ │ └── base.py
│ │ │ ├── asynch/
│ │ │ │ └── base.py
│ │ │ └── reflection.py
│ │ ├── engines/
│ │ │ ├── __init__.py
│ │ │ └── base.py
│ │ ├── ext/
│ │ │ ├── clauses.py
│ │ │ └── declarative.py
│ │ ├── orm/
│ │ │ ├── query.py
│ │ │ └── session.py
│ │ ├── sql/
│ │ │ ├── ddl.py
│ │ │ ├── functions.py
│ │ │ ├── schema.py
│ │ │ └── selectable.py
│ │ └── types.py
│ └── dameng/
│ ├── __init__.py
│ ├── _compat.py
│ ├── base.py
│ ├── dmPython.py
│ ├── dmAsync.py
│ ├── globalvars.py
│ ├── json.py
│ ├── types.py
│ └── vector.py
├── tests/
│ ├── test_clickhouse/
│ │ ├── test_clickhouse_dialect.py
│ │ ├── test_ddl.py
│ │ ├── test_engines.py
│ │ ├── test_orm.py
│ │ ├── test_sql.py
│ │ ├── test_type_compiler.py
│ │ └── test_types.py
│ └── test_dameng/
│ ├── test_basic.py
│ ├── test_dialect.py
│ └── test_vector.py
├── pyproject.toml
├── README.md
└── LICENSE
Code Style
This project uses:
- Black: Code formatting
- Ruff: Linting and code style
- MyPy: Type checking
Run code quality checks:
black src/ tests/
ruff check src/ tests/
ruff check src/ns_sqlalchemy/clickhouse/ # ClickHouse should be clean
Testing
The project uses pytest for testing. All tests are located in the tests/ directory.
License
This project is licensed under the MIT License. See the LICENSE file for details.
Contributing
Contributions are welcome! Please follow the existing code style and conventions.
Support
For issues and questions, please create an issue in the GitHub repository.
Acknowledgments
- Based on the original ns-dm-sqlalchemy project
- Inspired by SQLAlchemy's dialect architecture
- Thanks to the open-source community for their contributions
Created with ❤️ for high-performance database applications
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ns_sqlalchemy-0.1.0-py3-none-any.whl.
File metadata
- Download URL: ns_sqlalchemy-0.1.0-py3-none-any.whl
- Upload date:
- Size: 110.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.25
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a618c8203d300cde7fd4511e4cbf88e9d842cc563911ae8d20ce3052a79accd3
|
|
| MD5 |
29f371c25b235c4e258b8d6dfdf58033
|
|
| BLAKE2b-256 |
4dba93a3c43f3b25e3f7d6381174b952e1d36ca864c0b82782553126382b43a7
|