K2data内部的数据分析工具包
Project description
K2Magic
K2Magic是K2Assets提供的数据分析开发包(以下简称SDK),用于简化Python里访问各类数据库的操作。
一、安装
1.1 安装SDK
安装SDK最新版本:
pip install -U k2magic
1.2 安装数据库驱动
SDK兼容多种数据库的方言,用户通过连接字符串
参数配置目标数据库的地址等信息,同时需要确保已安装此数据库的驱动包,例如对MySQL数据库需要pip install pymysql
。
常见的数据库的驱动包和连接字符串如下:
数据库 | 驱动包 | 连接字符串 | 备注 |
---|---|---|---|
PostgreSQL | psycopg2 | postgresql+psycopg2://user:password@hostname:port/db_name | |
MySQL | pymysql | mysql+pymysql://user:password@hostname:port/db_name | 未测试 |
Oracle | cx_oracle | oracle+cx_oracle://user:password@hostname:port/db_name | 配置说明 |
SQL Server | pymssql | mssql+pymssql://user:password@hostname:port/db_name | |
TDengine | taospy | tdengine+taospy://user:password@hostname:port/db_name | 需安装同版本驱动 |
Repo (Native) | 底层库对应驱动 | k2assets+repo://user:password_md5@hostname:port/repo_name | 只读 |
Repo (REST) | 无需驱动 | k2assets+repo://user:password_md5@hostname:port/repo_name | 只读 |
表中未包含的数据库请参考:https://docs.sqlalchemy.org/en/20/dialects/
二、使用SDK
2.1 访问数据库
在python代码里使用SDK可以直接连接到指定数据库,并读写其中数据表的数据。假设数据库里有数据表table1
的结构如下:
CREATE TABLE table1 (
k_device VARCHAR(255) PRIMARY KEY,
col1 FLOAT,
col2 FLOAT,
col3 FLOAT
);
对上述数据表操作的示例代码:
import pandas as pd
from k2magic.dataframe_db import DataFrameDB
db = DataFrameDB('postgresql+psycopg2://...') # 此为连接字符串
df = pd.DataFrame({'k_device': ['a', 'b', 'c'], 'col1': [1, 2, 3], 'col2': [4, 5, 6]})
db.insert('table1', df)
db.update('table1', df, index_keys=['k_device'])
db.upsert('table1', df, index_keys=['k_device'])
df = db.select('table1', condition='col1 > 1')
df = db.select('table1', limit=3, order_by=['k_device DESC'])
df = db.sql_select('select * from table1')
db.delete('table1')
db.create_table(df, 'table2', primary_keys=['k_device'])
db.drop_table('table2')
2.2 读取K2Assets数据
除了能够直接访问各类数据库外,SDK还支持访问K2Assets平台的数据,即将K2Assets中的每个Repo视为数据库。
入口类是K2DataFrameDB
,查询数据使用get_repo_data
函数(不支持写操作)。
目前支持postgresql
、TDengine_3
作为底层存储的repo,获取数据时将直接查询底层数据库以提高效率,后续计划支持ysdb
存储引擎。
同时也支持通过REST接口访问repo数据,在构造方法里指定rest=True
即可,对于不支持直接获取数据的repo类型可使用此方式。
注:对于tsf
(文件)类型的repo,为了降低服务器配置复杂度,目前统一通过REST接口获取文件内容。
下面是一个示例:
import pandas as pd
from k2magic.k2_dataframe_db import K2DataFrameDB
# 连接到dev环境K2Assets的指定Repo
db = K2DataFrameDB(
'k2assets+repo://zhangsan1:4669526643b35bf8affc1761e1155b6a@192.168.132.167/repodata_ut_v2_c5_r180_pg')
# 查询时序数据,返回DataFrame
df = db.get_repo_data(
columns=['k_device', 'k_ts', 'col1', 'col2'],
start_time='2024-08-01 00:00:00',
end_time='2024-08-01 00:00:10',
devices=['dev001'],
limit=100, desc=False)
# 查询文件数据,抽取文件到本地指定目录
db.get_repo_file(
dest_path='/tmp',
start_time='2024-08-05 00:00:00',
end_time='2024-08-05 00:00:10',
devices=['dev001'])
注意:
- 连接字符串里地址、用户名和密码都是K2Assets服务的,而非底层数据库的
- 连接字符串里的密码需进行md5编码,在K2Assets用户管理页面可以直接查看md5处理后的密码
- 默认通过
https
协议连接K2Assets,如要使用http
协议,在连接字符串末尾追加:?protocol=http
- 若要指定租户,在连接字符串末尾追加:
?tenant=xxx
- 若底层数据库端口号不是默认值,需要在构造方法里指定:
db_port=xxx
- 若需要更详细的日志输出,可在构造方法里指定:
debug=True
- 若强制要求通过REST接口获取数据,可在构造方法里指定:
rest=True
2.3 异常处理
SDK进行数据库操作时,如果中途遇到异常情况会抛出DataFrameDBException
,并自动回滚数据库到操作前的状态。用户可以使用try..except
捕获此异常进行必要的处理:
try:
db.insert('table1', df)
except DataFrameDBException as e:
print('数据插入(部分)失败,操作已回滚')
三、常见问题
3.1 性能问题
当需要通过SDK读写大量数据时,性能往往会成为项目成败的关键因素。一方面加载到内存中的DataFrame
对象过大可能导致内存溢出,另一方面若底层数据库的读写效率低可能导致操作超时。
解决第一个问题的思路是将大的DataFrame
分为小批处理,同时SDK内部实现此处理过程,以尽量对使用者保持透明,降低对使用者的要求。
解决第二个问题更复杂些,也需要针对具体场景讨论,例如底层所使用的数据库类型,数据情况以及查询需求。目前SDK提供的sql_select
方法允许用户直接优化查询语句,可以作为一个辅助方式帮助解决此问题。
综上,如遇到性能问题,请反馈你的具体场景和需求给SDK开发者。
3.2 数据库表带schema前缀
有些数据库(如SQL Server)允许将数据表按schema进行划分,如果要访问指定schema里表,在初始化DataFrameDB
对象时需要指定schema名称。例如:
db = DataFrameDB('mssql+pymssql://...', schema='schema1') # 访问schema1里的表
df = db.select('table1') # 若表名不加schema前缀,会自动使用初始化时指定的schema作为前缀
df = db.select('schema1.table1') # 使用完整表名也可以访问
df = db.select('schema2.table2') # 访问同一数据库内其他schema的表也是允许的
3.3 用户名或密码带有特殊字符
SDK通过特定格式的连接字符串识别要访问的数据库信息,若用户名或密码包含#
、@
、:
等特殊字符时,可能会导致SDK无法正确解析连接字符串。
此时需要将这些特殊字符替换为百分号编码
的形式。例如数据库密码是passw@rd
,则需要把其中的@
替换为%40
,连接字符串为mssql+pymssql://zhangsan:passw%40rd@hostname...
完整的替换规则参考:百分号编码对照表
3.4 调试选项
在开发调试阶段,初始化DataFrameDB
时可以指定debug=True
输出更详细的日志,此参数默认值是False
:
db = DataFrameDB('mssql+pymssql://...', schema='schema1', debug=True)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file k2magic-0.3.22-py3-none-any.whl
.
File metadata
- Download URL: k2magic-0.3.22-py3-none-any.whl
- Upload date:
- Size: 39.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.9.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7c2225514eafc09eda5307785d0ae04ce332663cdecb07a5e67b4d4acdc48404 |
|
MD5 | 61b9031fb13c6a8264db400d515d25bf |
|
BLAKE2b-256 | e6a5b1c8d92bd4ca67bf318f26ee0c23f5285365a0ee4ae7dc2aac18a0832596 |