Skip to main content

K2data内部的数据分析工具包

Reason this release was yanked:

bug

Project description

K2Magic

K2Magic是K2Assets提供的数据分析开发包(以下简称SDK),用于简化Python里访问各类数据库的操作。

一、安装

1.1 安装SDK

安装SDK最新版本:

pip install -U k2magic

1.2 安装数据库驱动

SDK兼容多种数据库的方言,用户通过连接字符串参数配置目标数据库的地址等信息,同时需要确保已安装此数据库的驱动包,例如对MySQL数据库需要pip install pymysql

常见的数据库的驱动包和连接字符串如下:

数据库 驱动包 连接字符串 备注
PostgreSQL psycopg2 postgresql+psycopg2://user:password@hostname:port/db_name
MySQL pymysql mysql+pymysql://user:password@hostname:port/db_name 未测试
Oracle cx_oracle oracle+cx_oracle://user:password@hostname:port/db_name 配置说明
SQL Server pymssql mssql+pymssql://user:password@hostname:port/db_name
K2Assets k2assets://hostname:port/k2repo 只读
Repo repo://user:password_md5@hostname:port/repo_name 只读

表中未包含的数据库请参考:https://docs.sqlalchemy.org/en/20/dialects/

二、使用SDK

2.1 访问数据库

在python代码里使用SDK可以直接连接到指定数据库,并读写其中数据表的数据。假设数据库里有数据表table1的结构如下:

CREATE TABLE table1 (
    k_device VARCHAR(255) PRIMARY KEY,
    col1 FLOAT,
    col2 FLOAT,
    col3 FLOAT
);

对上述数据表操作的示例代码:

import pandas as pd
from k2magic.dataframe_db import DataFrameDB

db = DataFrameDB('postgresql+psycopg2://...')  # 此为连接字符串
df = pd.DataFrame({'k_device': ['a', 'b', 'c'], 'col1': [1, 2, 3], 'col2': [4, 5, 6]})

db.insert('table1', df)
db.update('table1', df, index_keys=['k_device'])
db.upsert('table1', df, index_keys=['k_device'])
df = db.select('table1', condition='col1 > 1')
df = db.select('table1', limit=3, order_by=['k_device DESC'])
df = db.sql_select('select * from table1')
db.delete('table1')

db.create_table(df, 'table2', primary_keys=['k_device'])
db.drop_table('table2')

2.2 访问K2Assets(直接对接底层数据库)

除了能够直接访问各类数据库外,SDK还支持访问K2Assets平台的数据,即可以将K2Assets当作数据库,其中的Repo就是数据库里的表。 使用方法与其他数据库是相同的,只要更换连接字符串即可,下面是一个示例,其中用户名和密码是K2Assets的用户和密码,地址和端口也是K2Assets服务的:

import pandas as pd
from k2magic.dataframe_db import K2DataFrameDB

# 连接到dev环境的K2Assets,每个repo视为一张表
db = K2DataFrameDB('k2assets+repo://admin:0c475ffd8960c17046b531e2384e89d8@192.168.132.167:443/kuitonggou_6')

# 查询数据(带时间和设备过滤条件)
result_df = db.select('kuitonggou_6', columns=['k_device', 'k_ts', 'WGEN.GenActivePW', 'WGEN.GenSpd'],
                          condition='k_ts >= 1712505600000 AND k_ts < 1712592000000 and k_device = \'#163\'', limit=100)

注意:

  • Repo基本信息里,存储引擎配置里的地址不能用如k2a-postgresql这样的内部地址和端口,要改为外部地址和端口;
  • 支持对Repo读写数据,前提是底层存储引擎也同时支持读写;
  • 只支持https部署的K2Assets,不支持http的部署情况,并且端口号443不可省略;
  • 数据库连接中的密码需进行md5编码(小写32字节),例如admin:0c475ffd8960c17046b531e2384e89d8@myhost:443
  • 目前只实现了postgresql作为底层存储的repo,很快将兼容tsfysdbtdengine这三种存储引擎。
  • 若Repo的数据结构与底层表结构不一致,目前是以底层表结构为准,未来会改为以Repo的数据结构为准;

2.3 访问K2Assets(通过rest接口)

如果将连接字符串里的repo://换成k2assets://,则表示使用rest接口对接K2Assets的数据,这种方式的效率会比较低,但好处是兼容性强。

import pandas as pd
from k2magic.dataframe_db import DataFrameDB

# 连接到dev环境的K2Assets,每个repo视为一张表
db = DataFrameDB('k2assets://192.168.132.167:8765/k2repo')

# 查询数据(默认返回最新数据)
df = db.select('repo_XiLinHaoTe_1sec', columns=['k_device', 'k_ts', 'NacWdSpdFltS', 'CnvW'])

# 查询数据(带时间和设备过滤条件)
df = db.select('repo_XiLinHaoTe_1sec', columns=['k_device', 'k_ts', 'NacWdSpdFltS', 'CnvW'],
              condition='k_ts between 1656691200000 AND 1656777600000 and k_device = \'XiLinHaoTe002\'')

注意:

  • 这种方式只支持读取Repo数据,未来有可能支持写数据;
  • K2Assets环境需要部署k2a-calcite-service服务,目前此服务还没有包含在K2Assets产品里,只在dev环境部署了用于测试。

2.3 异常处理

SDK进行数据库操作时,如果中途遇到异常情况会抛出DataFrameDBException,并自动回滚数据库到操作前的状态。用户可以使用try..except捕获此异常进行必要的处理:

try:
    db.insert('table1', df)
except DataFrameDBException as e:
    print('数据插入(部分)失败,操作已回滚')

三、常见问题

3.1 性能问题

当需要通过SDK读写大量数据时,性能往往会成为项目成败的关键因素。一方面加载到内存中的DataFrame对象过大可能导致内存溢出,另一方面若底层数据库的读写效率低可能导致操作超时。

解决第一个问题的思路是将大的DataFrame分为小批处理,同时尽量对使用者保持透明,降低对使用者的要求。目前SDK是过渡阶段,将视实际需求情况改进这个问题。

解决第二个问题则比较复杂,需要针对具体场景讨论,例如底层所使用的数据库类型,数据情况以及查询需求。目前SDK提供的sql_select方法允许用户直接优化查询语句,可以作为一个辅助方式帮助解决此问题。

3.2 数据库表带schema前缀

有些数据库允许将数据表按schema进行划分,如果要访问指定schema里表,在初始化DataFrameDB对象时需要指定schema名称。例如:

db = DataFrameDB('mssql+pymssql://...', schema='schema1')   # 访问schema1里的表
df = db.select('table1')   # 若表名不加schema前缀,会自动使用初始化时指定的schema作为前缀
df = db.select('schema1.table1')  # 使用完整表名也可以访问
df = db.select('schema2.table2')  # 访问同一数据库内其他schema的表也是允许的

3.3 用户名或密码带有特殊字符

SDK通过特定格式的连接字符串识别要访问的数据库信息,若用户名或密码包含#@:等特殊字符时,可能会导致SDK无法正确解析连接字符串。 此时需要将这些特殊字符替换为百分号编码的形式。例如数据库密码是passw@rd,则需要把其中的@替换为%40,连接字符串为mssql+pymssql://zhangsan:passw%40rd@hostname...

完整的替换规则参考:百分号编码对照表

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

k2magic-0.3.2.tar.gz (36.7 kB view details)

Uploaded Source

Built Distribution

k2magic-0.3.2-py3-none-any.whl (36.2 kB view details)

Uploaded Python 3

File details

Details for the file k2magic-0.3.2.tar.gz.

File metadata

  • Download URL: k2magic-0.3.2.tar.gz
  • Upload date:
  • Size: 36.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.9.13

File hashes

Hashes for k2magic-0.3.2.tar.gz
Algorithm Hash digest
SHA256 e50139d8a59af6ea6bcae832628986a75738af84e82a1dfb773fe4a9ef4deb77
MD5 ae469983d6f414d66aa0175dbe6cae93
BLAKE2b-256 377c7272cba71fb1c63b324b24fec31abc38fb63cfdf5bd538b2c15a36a4f4f1

See more details on using hashes here.

File details

Details for the file k2magic-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: k2magic-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 36.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.9.13

File hashes

Hashes for k2magic-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 c7fe3b3773930b438e433585117ffab09aaad6cee1c6b32b9a787e2ee963751f
MD5 ecfea61a5bd0ee888bd0db8b4026ed88
BLAKE2b-256 a278c15684a8926af74e2c0631fe6de46052c12e4455c3ec5ec0f20269f723ec

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page