A Well-Encapsulated ClickHouse Database APIs Lib
Project description
quantchdb: A Well-Encapsulated ClickHouse Database APIs Lib
Quick Start
Install quantchdb:
pip install quantchdb==0.1.11 -i https://pypi.org/simple
An example of how to use quantchdb:
1. Import quantchdb
from quantchdb import ClickHouseDatabase
import pandas as pd
import numpy as np
import os
from dotenv import load_dotenv
load_dotenv()
2. Configure ClickHouseDatabase instance
# To connect your clickhouse database, you need to setup your config, in which the '.env' method is recommmended for security
config = {
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", 9000)),
"user": os.getenv("DB_USER", "default"),
"password": os.getenv("DB_PASSWORD", ""),
"database": os.getenv("DB_DATABASE", "default")
}
✅ 推荐用法:使用上下文管理器(Context Manager)
这是最安全的用法,连接会在退出 with 块时自动关闭:
# 使用 with 语句,连接会自动关闭,避免资源泄漏
with ClickHouseDatabase(config=config, terminal_log=True, file_log=False) as db:
df = db.fetch("SELECT * FROM your_table LIMIT 10")
# 在 with 块内进行所有数据库操作
db.insert_dataframe(df, "another_table")
# 退出 with 块后,连接自动关闭
⚠️ 手动管理连接(需要显式关闭)
如果不使用上下文管理器,必须 手动调用 close() 方法:
db = ClickHouseDatabase(config=config, terminal_log=True, file_log=False)
try:
df = db.fetch("SELECT * FROM your_table LIMIT 10")
finally:
db.close() # 必须调用!否则会导致套接字泄漏
🚨 危险用法警告
以下用法可能导致 套接字/连接泄漏,请务必避免:
# ❌ 危险:在循环中创建多个实例而不关闭
for i in range(1000):
db = ClickHouseDatabase(config=config)
df = db.fetch(f"SELECT * FROM table_{i}")
# 没有调用 db.close(),套接字泄漏!
# ❌ 危险:在函数中创建实例但不关闭
def get_data():
db = ClickHouseDatabase(config=config)
return db.fetch("SELECT * FROM table") # db 没有被关闭!
# ❌ 危险:作为类属性但不在 __del__ 中清理
class MyClass:
def __init__(self):
self.db = ClickHouseDatabase(config=config) # 可能泄漏
正确的做法:
# ✅ 正确:循环中使用上下文管理器
for i in range(1000):
with ClickHouseDatabase(config=config) as db:
df = db.fetch(f"SELECT * FROM table_{i}")
# ✅ 正确:复用单个实例
db = ClickHouseDatabase(config=config)
try:
for i in range(1000):
df = db.fetch(f"SELECT * FROM table_{i}")
finally:
db.close()
# ✅ 正确:函数内使用上下文管理器
def get_data():
with ClickHouseDatabase(config=config) as db:
return db.fetch("SELECT * FROM table")
# ✅ 正确:类中正确管理连接生命周期
class MyClass:
def __init__(self):
self.db = ClickHouseDatabase(config=config)
def __del__(self):
if hasattr(self, 'db'):
self.db.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.db.close()
3. Functions
注意:以下示例假设使用上下文管理器或已正确管理连接生命周期。
with ClickHouseDatabase(config=config, terminal_log=True) as db:
# Fetch data from clickhouse database
sql = "SELECT * FROM stocks.snap ORDER BY date DESC LIMIT 5"
df = db.fetch(sql)
# Execute SQL sentence
sql = f"""
CREATE TABLE IF NOT EXISTS etf.kline_1m(
`exg` UInt8 NOT NULL COMMENT '交易所标识,沪市为1,深市为0, 北交所为2',
`code` String NOT NULL COMMENT '股票代码',
`date` Date NOT NULL COMMENT '日期',
`date_time` DateTime('Asia/Shanghai') NOT NULL COMMENT '日期时间,最高精度为秒',
`time_int` UInt32 NOT NULL COMMENT '从当日开始至当前时刻的毫秒数',
`open` Float32 NULL COMMENT 'K线开始价格',
`high` Float32 NULL COMMENT 'K线内最高价',
`low` Float32 NULL COMMENT 'K线内最低价',
`close` Float32 NULL COMMENT 'K线结束价格',
`volume` UInt64 NULL COMMENT 'K线内成交量',
`amount` Float32 NULL COMMENT 'K线内成交额'
)Engine = ReplacingMergeTree()
ORDER BY (code, date_time);
"""
db.execute(sql)
# Insert dataframe into clickhouse database.
# Before you insert your dataframe, you need to make sure the corresponding database and table are existed.
# Make sure the dtypes of DataFrame is consistent with dtypes of clickhouse table, or else insert_dataframe may failed.
# Note: insert_dataframe() will NOT modify the original DataFrame.
file_path = "Your/Data/Path/kline_1m.csv"
dtype_dict = {
'exg' : int,
'code' : str,
'open' : np.float32,
'close' : np.float32,
'high' : np.float32,
'low' : np.float32,
'amount' : np.float32
}
df = pd.read_csv(file_path, dtype=dtype_dict)
# Int type with NA need to deal with seperately
df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype('UInt64')
# convert_tz defaults to True, will auto-convert timezone to Asia/Shanghai
db.insert_dataframe(
df=df,
table_name="etf.kline_1m",
datetime_cols=['date','date_time']
# convert_tz=True is the default
)
# Create table from DataFrame and insert data into table automatically.
# This method is not recommanded, because data type inferred may be not suitable or even the sentence failed.
# You can use dtypes to make sure some columns have corrected dtypes and use other params to control the create sql sentence.
db.create_table_from_df(df=df,
table_name='test.etf_kline_1m',
dtypes={'code': 'String',
'date':'Date',
'date_time' :'DateTime'},
engine='ReplacingMergeTree()',
orderby='(code,date_time)',
other='PARTITION BY toYYYYMM(code)')
4. API Reference
属性
| 属性 | 类型 | 描述 |
|---|---|---|
is_connected |
bool |
检查是否已建立连接 |
config |
Dict |
数据库配置 |
client |
Client |
ClickHouse 客户端实例 |
方法
| 方法 | 描述 |
|---|---|
connect() |
建立数据库连接 |
close() |
关闭数据库连接(可安全多次调用) |
reconnect() |
重新建立连接 |
execute(sql) |
执行 SQL 语句 |
fetch(sql, as_df=True) |
查询数据并返回 DataFrame 或原始结果 |
insert_dataframe(df, table_name, ...) |
将 DataFrame 插入表中 |
create_table_from_df(df, table_name, ...) |
根据 DataFrame 创建表并插入数据 |
5. Best Practices (最佳实践)
连接管理
- 优先使用上下文管理器 (
with语句),它能确保连接在任何情况下都被正确关闭 - 复用连接:如果需要执行多个操作,在同一个
with块内完成 - 避免在循环中创建新实例:这会导致套接字快速耗尽
性能优化
- 批量插入:使用
insert_dataframe()一次性插入大量数据 - 合理设置日志级别:生产环境建议关闭
terminal_log和file_log
错误处理
from clickhouse_driver.errors import Error as ClickHouseError
with ClickHouseDatabase(config=config) as db:
try:
df = db.fetch("SELECT * FROM non_existent_table")
except ClickHouseError as e:
print(f"Database error: {e}")
except ConnectionError as e:
print(f"Connection error: {e}")
6. Changelog
v0.2.0 (Breaking Changes)
- ✅ 添加上下文管理器支持(
with语句) - ✅ 添加
is_connected属性 - ✅ 添加
reconnect()方法 - ✅ 添加程序退出时自动清理所有连接的机制
- ✅ 改进错误处理和日志记录
- ✅ 修复套接字泄漏问题
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
quantchdb-0.2.0.tar.gz
(14.0 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
quantchdb-0.2.0-py3-none-any.whl
(12.2 kB
view details)
File details
Details for the file quantchdb-0.2.0.tar.gz.
File metadata
- Download URL: quantchdb-0.2.0.tar.gz
- Upload date:
- Size: 14.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b597b4f110bec0bae28fbf71cfb028bc1167cb08a3f339419c70ae542eed2cb5
|
|
| MD5 |
5f8f7395dfa62842300a330dede9e250
|
|
| BLAKE2b-256 |
8b546136b60ca5606e7256c22b83b6802ecd9d1ad9ebc9ec19194e674165ec5a
|
File details
Details for the file quantchdb-0.2.0-py3-none-any.whl.
File metadata
- Download URL: quantchdb-0.2.0-py3-none-any.whl
- Upload date:
- Size: 12.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f2f22f8628353f1d1db523af63176fc7e6ebfef4b7284a6a4b27cf8486040e76
|
|
| MD5 |
62e37d5a691d3291d70d0018bb473f91
|
|
| BLAKE2b-256 |
8e1f19c4c4881faea81cf261d9917a3ec682db3daf540b7cdc47fb62b6bf599f
|