ThinkSQL link think-orm(ThinkPHP)
Project description
ThinkSQL 类似 ThinkPHP 的数据库引擎
Drivers
- MySQL
- 达梦(DM8)
Install
- use mysql
pip install think_sql[mysql]
- use 达梦(DM8)
pip install think_sql[dm]
How to use
1. simple demo
Database:
test
Table:user
- example dict params
from think_sql import DB
config = {
'type': 'mysql',
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'root',
'database': 'test',
}
with DB(config) as db:
data = db.table('user').where('id',1).find()
print(data)
- example dsn str
from think_sql import DB
with DB("root:'root'@127.0.0.1:3306/test") as db:
data = db.table('user').where('id',1).find()
print(data)
- example DBConfig
from think_sql import DB
from think_sql.tool.util import DBConfig
config = DBConfig(
type='mysql',
host='127.0.0.1',
port=3306,
user='root',
password='root',
database='test',
)
with DB(config) as db:
data = db.table('user').where('id',1).find()
print(data)
result
{
"id": 1,
"username": "hbh112233abc",
"age": "36",
"address": "FUJIAN.XIAMEN"
}
- use
db
function
from think_sql import db
config = {
'type': 'mysql',
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'root',
'database': 'test',
}
conn = db(config)
data = conn.table('user').where('id',1).find()
print(data)
conn.close()
2. Introduction
DB
-
init(config:Union[str,dict,DBConfig],params={})
init database, return DB instance
- config:Union[str,dict,DBConfig]
- str:
type://user:'password'@host:port/database
- dict:
{'type':'mysql','host':'127.0.0.1','port':3306,'user':'root','password':'root','database':'test'}
- DBConfig:
DBConfig(type='mysql',host='127.0.0.1',port=3306,user='root',password='root',database='test')
- str:
- params:dict pymysql connect other params
- config:Union[str,dict,DBConfig]
-
connect() connect database use init params
-
table(table_name):Table return class Table <think_sql.tool.interface.TableInterface>
-
check_connected():bool check connected, try reconnect database
-
query(sql,params=()) query sql return cursor.fetchall List[dict]
-
execute(sql,params=()) execute sql write operate(ex:insert,update,delete,...)
Table
-
init(connector: Connection,cursor: Cursor,table_name: str,debug: bool = True)
-
init() initialize query condition
-
debug(flag=True) set debug flag
-
set_cache_storage(storage: CacheStorage) set cache storage ex: Redis
-
cache(key: str = None, expire: int = 3600) use cache at query
-
cursor(sql: str, params: list = []) -> Cursor return cursor object
-
get_last_sql() -> str return last sql string
-
get_lastid() -> str return last row id
-
get_rowcount() -> int return affect rows count
-
fetch_sql(flag: bool = True) set fetch sql flag,if flag = True then
query
andexecute
will only return sql -
build_sql(operation: str, params: list = []) -> str return build sql
-
query(sql: str, params: list = []) -> list execute read operation sql and return cursor.fetchall() when
fetch_sql
=True then return sql and not execute the sql -
execute(sql: str, params: list = []) -> int execute write operation sql and return affect rows count when
fetch_sql
=True then return sql and not execute the sql -
where(field: Union[str, list, tuple], symbol: str = '', value: Any = None) set query conditions, support multipe use
where(field,value)
where field = value
where(field,symbol,value)
where field symbol value
where( [ [field1,symbol1,value1], [field2,symbol2,value2] ] )
where field1 symbol1 value1 and field2 symbol2 value2
where(field1,symbol1,value1).where(field2,symbol2,value2)
where field1 symbol1 value1 and field2 symbol2 value2
- symbol
symbol another demo =
eq
,=
where('id','=',1) <>
neq
,!=
,<>
where('id','<>',1) >
gt
,>
where('id','>',1) >=
egt
,>=
where('id','>=',1) <
lt
,<
where('id','<',1) <=
elt
,<=
where('id','<=',1) in
in
,not in
where('id','in',[1,2,3]) between
between
,not between
where('id','between',[1,5]) where('id','between','1,5') where('id','between','1 and 5') like
like
,not like
where('name','like','%hbh%') null
is null
,null
where('remark','is null') not null
is not null
,not null
where('remark','is not null') exists
exists
,not exists
where('remark','exists') exp
exp
where('id','exp','in (1,2,3)') -
where_or(field: Union[str, list], symbol: str = '', value: Any = None)
where('id',1).where_or('id',5)
where id = 1 or id = 5
-
limit(start: int, step: int = None) LIMIT start,step
-
page(index: int = 1, size: int = 20) LIMIT index*size-1,size
-
order(field: str, sort: str = 'asc') ORDER BY field sort
-
group(field:str) GROUP BY field
-
distinct(field:str) SELECT DISTINCT field
-
field(fields: Any, exclude: bool = False) SELECT fields if
exclude
=True then select the fields of table (exlude:fields
) -
select(build_sql: bool = False) -> list return select query result if
build_sql
=True then return sql -
find() return select ... limit 1
-
value(field: str) return the field of first row
-
column(field: str,key: str = '')
column('name')
return ['hbh','mondy']
column('name,score')
return [{'hbh':80},{'mondy':88}]
column('score','name')
return {'hbh':80, 'mondy':88}
column('id,score','name')
return { 'hbh':{'id':1,'score':80}, 'mondy':{'id':2,'score':88} }
-
alias(short_name: str = '') set alias table_name
-
join(table_name: str, as_name: str = '', on: str = '', join: str = 'inner', and_str: str = '')
table_name
join table_nameas_name
alias short_table_name fortable_name
on
join conditionjoin
join type in 'INNER', 'LEFT', 'RIGHT', 'FULL OUTER'and_str
and condition demo
db.table('table1').alias('a').join( 'table2','b','a.id=b.a_id','left' ).join( 'table3','c','c.a_id=a.id' ).field( 'a.id,a.name,b.id as b_id,b.score,c.id as c_id,c.remark' ).where( 'a.id',1 ).find()
sql
SELECT a.id, a.name, b.id AS b_id, b.score, c.id AS c_id, c.remark FROM table1 AS a LEFT JOIN table2 AS b ON a.id = b.a_id INNER JOIN table3 AS c ON c.a_id = a.id WHERE a.id = 1 LIMIT 1
-
union(sql1: str, sql2: str, union_all: bool = False) union sql1 and sql2
- union_all if union_all is True then
UNION ALL
demo
sql1 = db.table('table1').field('name,score').where('status',1).select(build_sql=True) sql2 = db.table('table2').field('name,score').where('status',1).select(build_sql=True) result = db.table().union(sql1,sql2).where('score','>',60).select()
sql
SELECT * FROM ( SELECT `name`, `score` FROM table1 WHERE `status` = 1 ) UNION ( SELECT `name`, `score` FROM table2 WHERE `status` = 1 ) WHERE score > 60
- union_all if union_all is True then
-
insert(data: Union[dict, List[dict]], replace: bool = False) -> int insert data to database
data
dict: insert one record; list: insert multiple recordsreplace
bool ifreplace
is True then useREPLACE INTO
demo insert one record
db.table('table1').insert({'name':'test','score':100})
INSERT INTO table1 (`name`, `score`) VALUES ('test', 100)
insert multiple records
db.table('table1').insert([{'name':'test','score':100},{'name':'test2','score':101}])
INSERT INTO table1 (`name`, `score`) VALUES ('test', 100), ('test2', 101)
replace mode
db.table('table1').insert({'id':1,'name':'test','score':100},replace=True)
REPLACE INTO table1 (`id`, `name`, `score`) VALUES (1,'test', 100)
-
update(data: dict, all_record: bool = False) -> int update data
data
dict you want update dataall_record
bool ifall_record
is False then you must set update condition; if you want to update all records then you need setall_record
= True
-
delete(all_record: bool = False) -> int delete record
all_record
bool ifall_record
is False then you must set delete condition; if you want to delete all records then you need setall_record
= True
-
inc(field: str, step: Union[str, int, float] = 1) -> int
increase
field
+step
-
dec(field: str, step: int = 1) -> int
decrease
field
-step
-
max(field: str) -> Union[int, float]
get the max value of
field
-
sum(field: str) -> Union[int, float, Decimal]
get the sum value of
field
-
avg(field: str) -> Union[int, float, Decimal]
get the avg value of
field
-
count(field: str = '*') -> int
get the count of records
-
copy_to(new_table: str = None, create_blank_table: bool = False) -> int
copy data to
new_table
new_table
ifnew_table
is None thennew_table
will auto set like{table_name}_copy
create_blank_table
bool ifcreate_blank_table
is True then only create a blank table like current table.
demo
db.table('user').field( 'name,score' ).where( 'score','>',60 ).copy_to('good_boy')
sql
SELECT `name`, `score` INTO `good_boy` FROM `user` WHERE score > 60
-
insert_to(new_table: str, fields: Union[str, list, tuple] = None) -> int
INSERT INTO {new_table} SELECT {select_fields} FROM {table} {join} WHERE {where}{group}{order}{limit}
-
exists(self) -> bool
check record exists with some query conditions, it use
SELECT 1 FROM {table} {join} WHERE {where} LIMIT 1
-
batch_update(data:List[dict],key:str) -> int
batch update multiple records
demo
data = [ {'id':1,'score':66}, {'id':2,'score':59}, {'id':3,'score':86}, {'id':4,'score':90}, ] db.table('user').batch(data,key='id')
sql
update `user` set score = 66 where id = 1; update `user` set score = 59 where id = 2; update `user` set score = 86 where id = 3; update `user` set score = 90 where id = 4;
support transaction
from think_sql import DB
db_dsn = "root:'password'@127.0.0.1:3306/database"
with DB(db_dsn) as db:
# result: insert two records into database
with db.start_trans():
db.table('user').insert({'name':'think_sql1','score':98})
db.table('user').insert({'name':'think_sql2','score':99})
# result: nothing inserted
with db.start_trans():
db.table('user').insert({'name':'think_sql1','score':98})
db.table('user').insert({'name':'think_sql2','score':99})
raise Exception('error')
# The above operation does not affect subsequent operations.
db.table('user').insert({'name':'think_sql3','score':100})
sql_helper for mysql
from think_sql import DB
from think_sql.mysql.sql_helper import help
db_dsn = "root:'password'@127.0.0.1:3306/database"
with DB(db_dsn) as db:
sql = "slow query sql"
help(db, sql)
result
1) 输入的SQL语句是:
----------------------------------------------------------------------------------------------------
SELECT *
FROM hy_cabrecs
WHERE finished_count > 0
----------------------------------------------------------------------------------------------------
2) EXPLAIN执行计划:
+------+---------------+------------+--------------+--------+-----------------+-------+-----------+-------+--------+------------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+======+===============+============+==============+========+=================+=======+===========+=======+========+============+=============+
| 1 | SIMPLE | hy_cabrecs | None | ALL | None | None | None | None | 14422 | 33.33 | Using where |
+------+---------------+------------+--------------+--------+-----------------+-------+-----------+-------+--------+------------+-------------+
3) 索引优化建议:
----------------------------------------------------------------------------------------------------
取出表 【hy_cabrecs】 where条件字段 【finished_count】 100000 条记录,重复的数据有:【16093】 条,没有必要为该字段创建索引。
【hy_cabrecs】 表,无需添加任何索引。
parse for mysql
-
parse alter sql
from think_sql.mysql import parse sql = """ alter table slow_log_test add iii int not null default 0 comment 'a'; """ print(parse.alter_sql(sql))
result:
{ "table": "slow_log_test", "field": "iii", "field_type": "int", "is_null": "NOT NULL", "default": "0", "comment": "'a'", "after": "" }
Development
poetry 包管理器
# 配置虚拟环境在项目目录下
poetry config virtualenvs.path true
# 安装依赖
poetry install
# 进入虚拟环境
poetry shell
poetry command
名称 | 功能 |
---|---|
new | 创建一个项目脚手架,包含基本结构、pyproject.toml 文件 |
init | 基于已有的项目代码创建 pyproject.toml 文件,支持交互式填写 |
install | 安装依赖库 |
update | 更新依赖库 |
add | 添加依赖库 |
remove | 移除依赖库 |
show | 查看具体依赖库信息,支持显示树形依赖链 |
build | 构建 tar.gz 或 wheel 包 |
publish | 发布到 PyPI |
run | 运行脚本和代码 |
unit test
pytest --cov --cov-report=html
publish
poetry build
poetry config pypi-token.pypi "your pypi.org api token"
poetry publish -n
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file think_sql-0.7.7.tar.gz
.
File metadata
- Download URL: think_sql-0.7.7.tar.gz
- Upload date:
- Size: 36.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.7.9 Windows/10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 328a01728d5e0529d4790ac252c105494f12ec55f82e0135cfec97128eb28aba |
|
MD5 | 207f9f5375c000e929cc65a45b13e8a6 |
|
BLAKE2b-256 | d570d690f8774fa866884952314f56cd53ae2c64532f190bf1f2fd5fe613ba83 |
File details
Details for the file think_sql-0.7.7-py3-none-any.whl
.
File metadata
- Download URL: think_sql-0.7.7-py3-none-any.whl
- Upload date:
- Size: 38.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.7.9 Windows/10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c03fe6259c143d8ee3b20417f0bb68c31f4d358935ecf4bba3d4d3738d076759 |
|
MD5 | d9ed8f704fc3d972969e0ea4df0a6fac |
|
BLAKE2b-256 | 1920e4810ced4fa6e56e6445f1ede2f38484abde568e58b7f894f62763cd5bb0 |