K2data内部的数据分析工具包
Project description
K2Magic
K2Magic是K2Assets提供的数据分析开发包(以下简称SDK),用于简化Python里访问各类数据库的操作。
一、安装
1.1 安装SDK
安装SDK最新版本:
pip install -U k2magic
1.2 安装数据库驱动
SDK兼容多种数据库的方言,用户通过连接字符串
参数配置目标数据库的地址等信息,同时需要确保已安装此数据库的驱动包,例如对MySQL数据库需要pip install pymysql
。
常见的数据库的驱动包和连接字符串如下:
数据库 | 驱动包 | 连接字符串 | 备注 |
---|---|---|---|
PostgreSQL | psycopg2 | postgresql+psycopg2://user:password@hostname:port/db_name | |
MySQL | pymysql | mysql+pymysql://user:password@hostname:port/db_name | 未测试 |
Oracle | cx_oracle | oracle+cx_oracle://user:password@hostname:port/db_name | 配置说明 |
SQL Server | pymssql | mssql+pymssql://user:password@hostname:port/db_name | |
TDengine | taospy | tdengine+taospy://user:password@hostname:port/db_name | 需安装同版本驱动 |
K2Assets | Rest接口 | k2assets+rest://hostname:port/k2repo | 只读 |
Repo | 数据库原生接口 | k2assets+repo://user:password_md5@hostname:port/repo_name | 只读 |
表中未包含的数据库请参考:https://docs.sqlalchemy.org/en/20/dialects/
二、使用SDK
2.1 访问数据库
在python代码里使用SDK可以直接连接到指定数据库,并读写其中数据表的数据。假设数据库里有数据表table1
的结构如下:
CREATE TABLE table1 (
k_device VARCHAR(255) PRIMARY KEY,
col1 FLOAT,
col2 FLOAT,
col3 FLOAT
);
对上述数据表操作的示例代码:
import pandas as pd
from k2magic.dataframe_db import DataFrameDB
db = DataFrameDB('postgresql+psycopg2://...') # 此为连接字符串
df = pd.DataFrame({'k_device': ['a', 'b', 'c'], 'col1': [1, 2, 3], 'col2': [4, 5, 6]})
db.insert('table1', df)
db.update('table1', df, index_keys=['k_device'])
db.upsert('table1', df, index_keys=['k_device'])
df = db.select('table1', condition='col1 > 1')
df = db.select('table1', limit=3, order_by=['k_device DESC'])
df = db.sql_select('select * from table1')
db.delete('table1')
db.create_table(df, 'table2', primary_keys=['k_device'])
db.drop_table('table2')
2.2 访问K2Assets(直接对接底层数据库)
除了能够直接访问各类数据库外,SDK还支持访问K2Assets平台的数据,即将K2Assets中的每个Repo视为数据库。
入口类是K2DataFrameDB
,查询数据使用get_repo_data
函数,下面是一个示例:
import pandas as pd
from k2magic.k2_dataframe_db import K2DataFrameDB
# 连接到dev环境K2Assets的指定Repo
# 地址和端口是K2Assets服务的,用户名和密码也是K2Assets的
# 用户密码需进行md5编码,在用户管理的用户信息页可以直接看到md5处理后的密码
db = K2DataFrameDB(
'k2assets+repo://admin:0c475ffd8960c17046b531e2384e89d8@192.168.132.167:443/kuitonggou_6')
# 从repo查询数据(带时间和设备过滤条件)
result_df = db.get_repo_data('kuitonggou_6',
columns=['k_device', 'k_ts', 'WGEN.GenActivePW', 'WGEN.GenSpd'],
start_time='2024-04-28 00:00:00',
end_time='2024-04-29 00:00:00',
devices=['#163'],
limit=100)
注意:
- Repo基本信息里,
存储引擎配置
里的地址不能用如k2a-postgresql
这样的内部地址和端口,要改为外部地址和端口;请联系K2Assets管理员确认。 - 默认使用
https
协议连接K2Assets,如要使用http
协议,在数据库连接末尾追加?protocol=http
; - 数据库连接中的端口号(例如443)不可省略;
- 如要指定租户,在数据库连接末尾追加
?tenant=tenant_name
; - 若Repo的数据结构与底层表结构不一致,目前是以底层表结构为准,未来会改为以Repo的数据结构为准;
start_time
和end_time
参数支持字符串、毫秒时间戳和datetime三种格式;- 目前只实现了
postgresql
作为底层存储的repo,后续计划兼容tsf
、ysdb
和tdengine
这三种存储引擎。
2.3 访问K2Assets(通过rest接口)
如果将连接字符串里的k2assets+repo://
换成k2assets+rest://
,则表示使用rest接口对接K2Assets的数据,这种方式的效率会比较低,但好处是兼容性强。
import pandas as pd
from k2magic.dataframe_db import DataFrameDB
# 连接到dev环境的K2Assets,每个repo视为一张表
db = DataFrameDB('k2assets+rest://192.168.132.167:8765/k2repo')
# 查询数据(默认返回最新数据)
df = db.select('repo_XiLinHaoTe_1sec', columns=['k_device', 'k_ts', 'NacWdSpdFltS', 'CnvW'])
# 查询数据(带时间和设备过滤条件)
df = db.select('repo_XiLinHaoTe_1sec', columns=['k_device', 'k_ts', 'NacWdSpdFltS', 'CnvW'],
condition='k_ts between 1656691200000 AND 1656777600000 and k_device = \'XiLinHaoTe002\'')
注意:
- 这种方式只支持读取Repo数据,未来有可能支持写数据;
- K2Assets环境需要部署
k2a-calcite-service
服务,目前此服务还没有包含在K2Assets产品里,只在dev环境部署了用于测试。
2.3 异常处理
SDK进行数据库操作时,如果中途遇到异常情况会抛出DataFrameDBException
,并自动回滚数据库到操作前的状态。用户可以使用try..except
捕获此异常进行必要的处理:
try:
db.insert('table1', df)
except DataFrameDBException as e:
print('数据插入(部分)失败,操作已回滚')
三、常见问题
3.1 性能问题
当需要通过SDK读写大量数据时,性能往往会成为项目成败的关键因素。一方面加载到内存中的DataFrame
对象过大可能导致内存溢出,另一方面若底层数据库的读写效率低可能导致操作超时。
解决第一个问题的思路是将大的DataFrame
分为小批处理,同时尽量对使用者保持透明,降低对使用者的要求。目前SDK是过渡阶段,将视实际需求情况改进这个问题。
解决第二个问题则比较复杂,需要针对具体场景讨论,例如底层所使用的数据库类型,数据情况以及查询需求。目前SDK提供的sql_select
方法允许用户直接优化查询语句,可以作为一个辅助方式帮助解决此问题。
3.2 数据库表带schema前缀
有些数据库允许将数据表按schema进行划分,如果要访问指定schema里表,在初始化DataFrameDB
对象时需要指定schema名称。例如:
db = DataFrameDB('mssql+pymssql://...', schema='schema1') # 访问schema1里的表
df = db.select('table1') # 若表名不加schema前缀,会自动使用初始化时指定的schema作为前缀
df = db.select('schema1.table1') # 使用完整表名也可以访问
df = db.select('schema2.table2') # 访问同一数据库内其他schema的表也是允许的
3.3 用户名或密码带有特殊字符
SDK通过特定格式的连接字符串识别要访问的数据库信息,若用户名或密码包含#
、@
、:
等特殊字符时,可能会导致SDK无法正确解析连接字符串。
此时需要将这些特殊字符替换为百分号编码
的形式。例如数据库密码是passw@rd
,则需要把其中的@
替换为%40
,连接字符串为mssql+pymssql://zhangsan:passw%40rd@hostname...
完整的替换规则参考:百分号编码对照表
3.4 调试选项
在开发调试阶段,初始化DataFrameDB
时可以指定debug=True
输出更详细的日志,此参数默认值是False
:
db = DataFrameDB('mssql+pymssql://...', schema='schema1', debug=True)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file k2magic-0.3.20-py3-none-any.whl
.
File metadata
- Download URL: k2magic-0.3.20-py3-none-any.whl
- Upload date:
- Size: 39.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.9.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 79f21f92327de61d383de103d2309104d37e09d1be91ecdd16de4e2161dc29b0 |
|
MD5 | 679b85652a230cde27e0db9db7f2c7fd |
|
BLAKE2b-256 | 736b2e7d43e5f499a0d555b6311501360aad6316b6d13ec35bf5a62bee8ae20e |