Skip to main content

A simple database API

Project description

pydbapi

Installation

pip install pydbapi

支持的数据库类型

  • sqlite
from pydbapi.api import SqliteDB
db = SqliteDB(database=None)  # 或者传入路径
sql = 'select * from [table];'
cur, row, action, result = db.execute(sql)
  • Amazon Redshift
from pydbapi.api import RedshiftDB
db = RedshiftDB(host, user, password, database, port='5439', safe_rule=True)
sql = 'select * from [schema].[table];'
cur, row, action, result = db.execute(sql)
  • Mysql
from pydbapi.api import MysqlDB
db = MysqlDB(host, user, password, database, port=3306, safe_rule=True, isdoris=False)
sql = 'select * from [table];'
cur, row, action, result = db.execute(sql)
  • Trino
from pydbapi.api import TrinoDB
db = TrinoDB(host, user, password, database, catalog, port=8443, safe_rule=True)
sql = 'select * from [table];'
cur, row, action, result = db.execute(sql)
  • Snowflake(删除)
from pydbapi.api import SnowflakeDB
db = SnowflakeDB(user, password, account, warehouse, database, schema, safe_rule=True)
sql = 'select * from [table];'
cur, row, action, result = db.execute(sql)
  • instance模式
from pydbapi.api import SqliteDB
db = SqliteDB.get_instance(database=None)  # 或者传入路径
sql = 'select * from [table];'
cur, row, action, result = db.execute(sql)

Result

  • 转换成dataframe
from pydbapi.api import TrinoDB
db = TrinoDB(host, user, password, database, catalog, port=8443, safe_rule=True)
sql = 'select * from [table];'
cur, row, action, result = db.execute(sql)

df = result.to_dataframe()
df
  • 输出到csv
from pydbapi.api import TrinoDB
db = TrinoDB(host, user, password, database, catalog, port=8443, safe_rule=True)
sql = 'select * from [table];'
cur, row, action, result = db.execute(sql)

result.to_csv(outfile)

Column

from pydbapi.model import ColumnModel

  • ColumnModel

    • 代码 col = ColumnModel(newname, coltype='varchar', sqlexpr=None, func=None, order=0)
    • params
      • newname: 新命名;
      • coltype: 类型
      • sqlexpr: 查询sql表达式
      • func: 查询函数,暂时支持'min', 'max', 'sum', 'count'
      • order: 排序
  • ColumnsModel

    • 代码 cols = ColumnsModel(ColumnModel, ColumnModel, ……)
    • property
      • func_cols: 返回col列表
      • nonfunc_cols: 返回col列表
      • new_cols: 返回拼接字符串
      • create_cols: 返回拼接字符串
      • select_cols: 返回拼接字符串
      • group_cols: 返回拼接字符串
      • order_cols: 返回拼接字符串
    • mothed
      • get_column_by_name
        • cols.get_column_by_name(name)
        • 返回ColumnModel

SqlParse

from pydbapi.sql import SqlParse
-- purpose
-- comment
sql = 'select * from tablename where part_date >= $part_date;'
sqlparser = SqlParse(sql)
  • 属性

    • sql
    • split_sqls
    • purpose
    • action
    • comment
    • tablename
    • parameters
    • split_withsqls
    • combination_sqls
  • 方法

    • substitute_parameters
    sql = sqlparser.substitute_parameters(part_date='2024-01-01')
    

支持的操作

  • execute【db/base.py】
    • 代码
      db.execute(sql, count=None, ehandling=None, verbose=0)
    • params
      • count: 返回结果的数量;
      • ehandling: sql执行出错的时候处理方式, default: None
      • verbose: 执行的进度展示方式(0:不打印, 1:文字进度, 2:进度条)
  • select
    • 代码
      db.select(tablename, columns, condition=None, verbose=0)
    • params
      • tablename: 表名;
      • columns: 列内容;
      • condition: sql where 中的条件
  • create
    • sqlite/redshift
      • 代码
        db.create(tablename, columns, indexes=None, verbose=0)
      • params
        • tablename: 表名;
        • columns: 列内容;
        • indexes: 索引,sqlite暂不支持索引
        • verbose: 是否打印执行进度。
    • mysql
      • 代码
        db.create(tablename, columns, indexes=None, index_part=128, ismultiple_index=True, partition=None, verbose=0)
      • params
        • tablename: 表名;
        • columns: 列内容;
        • indexes: 索引
        • index_part: 索引part
        • ismultiple_index: 多重索引
        • partition: 分区
        • verbose: 是否打印执行进度。
    • trino
      • 代码
        db.create(tablename, columns, partition=None, verbose=0)
      • params
        • tablename: 表名;
        • columns: 列内容;
        • partition: 分区
        • verbose: 是否打印执行进度。
  • insert【db/base.py】
    • 代码
      db.insert(tablename, columns, inserttype='value', values=None, chunksize=1000, fromtable=None, condition=None)
    • params
      • tablename: 表名;
      • columns: 列内容;
      • inserttype: 插入数据类型,支持value、select
      • values: inserttype='value',插入的数值;
      • chunksize: inserttype='value', 每个批次插入的量级;
      • fromtable: inserttype='select',数据来源表;
      • condition: inserttype='select',数据来源条件;
  • drop【db/base.py】
    • 代码
      db.drop(tablename)
    • params
      • tablename: 表名;
  • delete【db/base.py】
    • 代码
      db.delete(tablename, condition)
    • params
      • tablename: 表名;
      • condition: 插入的数值;
  • get_columns
    • 代码
      db.get_columns(tablename)
    • params
      • tablename: 表名;
  • add_columns
    • 代码
      db.add_columns(tablename, columns)
    • params
      • tablename: 表名;
      • columns: 列内容;
  • get_filesqls【db/fileexec.py】
    • 代码
      db.get_filesqls(filepath, **kw)
    • params
      • filepath: sql文件路径;
      • kw: sql文件中需要替换的参数,会替换sqlfile中的arguments;
  • file_exec【db/fileexec.py】
    • 代码
      db.file_exec(filepath, ehandling=None, verbose=0, **kw)
    • params
      • filepath: sql文件路径; 文件名以test开始或者结尾会打印sql执行的步骤;
      • ehandling: sql执行出错的时候处理方式, default: None
      • verbose: 执行的进度展示方式(0:不打印, 1:文字进度, 2:进度条)
      • kw: sql文件中需要替换的参数 在sql文件中用$param, 会替换sqlfile中的arguments;
    • sql文件格式(在desc中增加verbose会打印sql执行的步骤;)
      #arguments#
      ts = '2020-06-28'
      date = today
      date_max = date + timedelta(days=10)
      #arguments#
      ###
      --【desc1 [verbose]】 #sql描述
      --step1
      sql1;
      --step2
      sql2 where name = $name;
      ###
      ###
      --【desc2 [verbose]】 #sql描述
      --step1
      sql1;
      --step2
      sql2;
      ###
      
    • arguments
      • 支持python表达式(datetime、date、timedelta)
      • 支持全局变量和当前sqlfile设置过的变量
      • now:获取执行的时间
      • today: 获取执行的日期

魔法命令

  • 注册方法
    命令行中执行pydbapimagic

  • 参数

    • 帮助
      %dbconfig
    • 配置
      %dbconfig DBTYPE = 'mysql'
      %dbconfig HOST = 'localhost'
      %dbconfig USER = 'longfengpili'
      %dbconfig PASSWORD = '123456abc'
      %dbconfig DATABASE = 'test'
      %dbconfig PORT = 3306
      
    • 查看
      %dbconfig DBTYPE

支持的的settings【conf/settings.py】

  • AUTO_RULES
    可以自动执行表名(表名包含即可)

调用日志格式

  1. 查看每步sql可以使用如下日志格式(如果还出错,同时加上上面的内容)
import logging
dblogger = logging.getLogger('pydbapi.db.base')
dblogger.setLevel(logging.DEBUG)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydbapi-0.0.136.tar.gz (29.3 kB view hashes)

Uploaded Source

Built Distribution

pydbapi-0.0.136-py3-none-any.whl (37.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page