Skip to main content

K2data内部的数据分析工具包

Project description

K2Magic

K2Magic是K2Assets提供的数据分析开发包(以下简称SDK),用于简化Python里访问各类数据库的操作。

一、安装

1.1 安装SDK

安装SDK最新版本:

pip install -U k2magic

1.2 安装数据库驱动

SDK兼容多种数据库的方言,用户通过连接字符串参数配置目标数据库的地址等信息,同时需要确保已安装此数据库的驱动包,例如对MySQL数据库需要pip install pymysql

常见的数据库的驱动包和连接字符串如下:

数据库 驱动包 连接字符串 备注
PostgreSQL psycopg2 postgresql+psycopg2://user:password@hostname:port/db_name
MySQL pymysql mysql+pymysql://user:password@hostname:port/db_name 未测试
Oracle cx_oracle oracle+cx_oracle://user:password@hostname:port/db_name 配置说明
SQL Server pymssql mssql+pymssql://user:password@hostname:port/db_name
TDengine taospy tdengine+taospy://user:password@hostname:port/db_name 需安装同版本驱动
Repo (Native) 底层库对应驱动 k2assets+repo://user:password_md5@hostname:port/repo_name 只读
Repo (REST) 无需驱动 k2assets+repo://user:password_md5@hostname:port/repo_name 只读

表中未包含的数据库请参考:https://docs.sqlalchemy.org/en/20/dialects/

二、使用SDK

2.1 访问数据库

在python代码里使用SDK可以直接连接到指定数据库,并读写其中数据表的数据。假设数据库里有数据表table1的结构如下:

CREATE TABLE table1 (
    k_device VARCHAR(255) PRIMARY KEY,
    col1 FLOAT,
    col2 FLOAT,
    col3 FLOAT
);

对上述数据表操作的示例代码:

import pandas as pd
from k2magic.dataframe_db import DataFrameDB

db = DataFrameDB('postgresql+psycopg2://...')  # 此为连接字符串
df = pd.DataFrame({'k_device': ['a', 'b', 'c'], 'col1': [1, 2, 3], 'col2': [4, 5, 6]})

db.insert('table1', df)
db.update('table1', df, index_keys=['k_device'])
db.upsert('table1', df, index_keys=['k_device'])
df = db.select('table1', condition='col1 > 1')
df = db.select('table1', limit=3, order_by=['k_device DESC'])
df = db.sql_select('select * from table1')
db.delete('table1')

db.create_table(df, 'table2', primary_keys=['k_device'])
db.drop_table('table2')

2.2 读取K2Assets数据

除了能够直接访问各类数据库外,SDK还支持访问K2Assets平台的数据,即将K2Assets中的每个Repo视为数据库。 入口类是K2DataFrameDB,查询数据使用get_repo_data函数(不支持写操作)。

目前支持postgresqlTDengine_3作为底层存储的repo,获取数据时将直接查询底层数据库以提高效率,后续计划支持ysdb存储引擎。

同时也支持通过REST接口访问repo数据,在构造方法里指定rest=True即可,对于不支持直接获取数据的repo类型可使用此方式。

注:对于tsf(文件)类型的repo,为了降低服务器配置复杂度,目前统一通过REST接口获取文件内容。

下面是一个示例:

import pandas as pd
from k2magic.k2_dataframe_db import K2DataFrameDB

# 连接到dev环境K2Assets的指定Repo
db = K2DataFrameDB(
    'k2assets+repo://zhangsan1:4669526643b35bf8affc1761e1155b6a@192.168.132.167/repodata_ut_v2_c5_r180_pg')
    
# 查询时序数据,返回DataFrame
df = db.get_repo_data(
    columns=['k_device', 'k_ts', 'col1', 'col2'],
    start_time='2024-08-01 00:00:00',
    end_time='2024-08-01 00:00:10',
    devices=['dev001'],
    limit=100, desc=False)

# 查询文件数据,抽取文件到本地指定目录
db.get_repo_file(
    dest_path='/tmp',
    start_time='2024-08-05 00:00:00',
    end_time='2024-08-05 00:00:10',
    devices=['dev001'])

注意:

  • 连接字符串里地址、用户名和密码都是K2Assets服务的,而非底层数据库的
  • 连接字符串里的密码需进行md5编码,在K2Assets用户管理页面可以直接查看md5处理后的密码
  • 默认通过https协议连接K2Assets,如要使用http协议,在连接字符串末尾追加:?protocol=http
  • 若要指定租户,在连接字符串末尾追加:?tenant=xxx
  • 若底层数据库端口号不是默认值,需要在构造方法里指定: db_port=xxx
  • 若需要更详细的日志输出,可在构造方法里指定:debug=True
  • 若强制要求通过REST接口获取数据,可在构造方法里指定:rest=True

2.3 异常处理

SDK进行数据库操作时,如果中途遇到异常情况会抛出DataFrameDBException,并自动回滚数据库到操作前的状态。用户可以使用try..except捕获此异常进行必要的处理:

try:
    db.insert('table1', df)
except DataFrameDBException as e:
    print('数据插入(部分)失败,操作已回滚')

三、常见问题

3.1 性能问题

当需要通过SDK读写大量数据时,性能往往会成为项目成败的关键因素。一方面加载到内存中的DataFrame对象过大可能导致内存溢出,另一方面若底层数据库的读写效率低可能导致操作超时。

解决第一个问题的思路是将大的DataFrame分为小批处理,同时SDK内部实现此处理过程,以尽量对使用者保持透明,降低对使用者的要求。

解决第二个问题更复杂些,也需要针对具体场景讨论,例如底层所使用的数据库类型,数据情况以及查询需求。目前SDK提供的sql_select方法允许用户直接优化查询语句,可以作为一个辅助方式帮助解决此问题。

综上,如遇到性能问题,请反馈你的具体场景和需求给SDK开发者。

3.2 数据库表带schema前缀

有些数据库(如SQL Server)允许将数据表按schema进行划分,如果要访问指定schema里表,在初始化DataFrameDB对象时需要指定schema名称。例如:

db = DataFrameDB('mssql+pymssql://...', schema='schema1')   # 访问schema1里的表
df = db.select('table1')   # 若表名不加schema前缀,会自动使用初始化时指定的schema作为前缀
df = db.select('schema1.table1')  # 使用完整表名也可以访问
df = db.select('schema2.table2')  # 访问同一数据库内其他schema的表也是允许的

3.3 用户名或密码带有特殊字符

SDK通过特定格式的连接字符串识别要访问的数据库信息,若用户名或密码包含#@:等特殊字符时,可能会导致SDK无法正确解析连接字符串。 此时需要将这些特殊字符替换为百分号编码的形式。例如数据库密码是passw@rd,则需要把其中的@替换为%40,连接字符串为mssql+pymssql://zhangsan:passw%40rd@hostname...

完整的替换规则参考:百分号编码对照表

3.4 调试选项

在开发调试阶段,初始化DataFrameDB时可以指定debug=True输出更详细的日志,此参数默认值是False

db = DataFrameDB('mssql+pymssql://...', schema='schema1', debug=True)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

k2magic-0.3.22-py3-none-any.whl (39.8 kB view details)

Uploaded Python 3

File details

Details for the file k2magic-0.3.22-py3-none-any.whl.

File metadata

  • Download URL: k2magic-0.3.22-py3-none-any.whl
  • Upload date:
  • Size: 39.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.9.13

File hashes

Hashes for k2magic-0.3.22-py3-none-any.whl
Algorithm Hash digest
SHA256 7c2225514eafc09eda5307785d0ae04ce332663cdecb07a5e67b4d4acdc48404
MD5 61b9031fb13c6a8264db400d515d25bf
BLAKE2b-256 e6a5b1c8d92bd4ca67bf318f26ee0c23f5285365a0ee4ae7dc2aac18a0832596

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page