基于Flask-SQLAlchemy, 抽离SQL语句, 使用Jinja2语法实现动态SQL, 支持上下文事务, 支持分页
Project description
示例: https://www.cnblogs.com/miaokela/articles/17571427.html
pip install flask-sql-pro
注册
在create_app中注册
def create_app():
# 注册Flask-SQL-Pro对象,并且注册Flask-SQLAlchemy
sqlpro = FlaskSQLPro()
db = sqlpro.init_app(app)
为什么要在init_app()中返回一个db呢? Flask-Migrate等插件数据库操作可能用到SQLAlchemy实例对象
from flask_migrate import Migrate
Migrate(app, db)
在models.py中引入
from flask_sql_pro import DataBaseHelper
from sqlalchemy import text
from sqlalchemy.dialects.mysql import TINYINT, BIGINT, VARCHAR, DATETIME, DOUBLE, INTEGER
# 其他文件中如果要使用db对象: from project_path.models import db
db = DataBaseHelper.db
class BaseModel(db.Model):
__abstract__ = True
created_at = db.Column(DATETIME, comment='创建时间', server_default=text('Now()'))
updated_at = db.Column(DATETIME, comment='更新时间', server_default=text('Now()'), onupdate=datetime.now())
is_deleted = db.Column(TINYINT, comment='是否逻辑删除', server_default=text('0'), index=True)
@classmethod
def queryset(cls):
"""
Data that is not logically deleted
"""
return cls.query.filter(cls.is_deleted == 0)
CRUD示例
增
from flask_sql_pro import DataBaseHelper # 主工具类
from app.models import db # SQLAlchemy实例对象
with db.trans():
_id = DataBaseHelper.execute_create(
'transit_record', # 数据库名称
data=data,
)
if not _id:
raise AddRecordException()
删
with db.trans():
rows = DataBaseHelper.execute_delete(
'transit_record',
where={
'id': _id,
},
logic=True
)
if not rows:
raise DelRecordException()
改
with db.trans():
rows = DataBaseHelper.execute_update(
'transit_record',
data=data,
where={
'id': _id
}
)
if not rows:
raise ModifyRecordException()
查
创建存放SQL语句的文件夹 默认是Flask的instance_path路径, 即: project_path/instance/ 则默认的SQL文件夹应该创建在: project_path/instance/sql 允许自定义路径,配置参数 DB_HELPER_SQL_FILE_PATH
import os
class BaseConfig:
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
APP_DIR = os.path.join(BASE_DIR, 'app')
DB_HELPER_SQL_FILE_PATH = os.path.join(
APP_DIR,
'sql'
)
# 在创建Flask应用时,注册配置
# __init__.py
def create_app():
# ...
app.config.from_object(BaseConfig())
# ...
其他Flask-SQL-Pro的配置
DB_HELPER_PAGE_PARAM = 'page' # 默认分页第几页
DB_HELPER_PAGE_SIZE_PARAM = 'page_size' # 默认分页每页数量
DB_HELPER_PRINT_MSG = True # 是否在终端打印SQL执行的语句
查询示例
文件: sql/transit/index.yml
query_map: |
SELECT
TRG.latitude,
TRG.longitude,
TRG.location,
TRG.location_type
FROM
transit_record_gps AS TRG
LEFT JOIN
transit_record AS TR
ON
TRG.transit_record_id = TR.id
WHERE
TRG.is_deleted = 0
AND
TR.is_deleted = 0
AND
TR.id = :transit_record_id
文件: app/api/transit.py
transit_record_gps = DataBaseHelper.select_all(
'transit.index.query_map',
params={
'transit_record_id': transit_record_id
},
return_obj=False, # return_obj默认为True,即返回的是对象可以通过 transit_record_gps[0].transit_record_id 点的方式获取数据,如果为False,返回的是字典
)
分页
默认需要传递的参数是 page/page_size, 两个参数都传递才会分页
文件: sql/history/index.yml
select_user_experiments: |
SELECT
experiment_id,
experiment_name,
date_format(update_datetime,"%Y-%m-%d") update_time
FROM
data_experiment_record
WHERE
delete_flag = 0
experiments = DataBaseHelper.select_all(
'history.index.select_user_experiments',
params={
'account_id': account_id,
},
options={
'page': 1,
'page_size': 20,
}
)
动态SQL
配合jinja2,实现条件语句,动态生成SQL
文件: sql/experiment/index.yml
select_history_data_by_id_and_time: |
SELECT
daedd.daq_data_id daqDataId,
daedd.vel_rms_value rmsVelocityValue,
daedd.peak_value peakValue,
daedd.peak_to_peak_value peaToPeakValue,
daedd.skewness_value skewnessValue,
daedd.mean_value meanValue,
daedd.kurtosis_value kurtosisValue,
daedd.rms_value rmsRawValue,
daedd.rpm_value rpmValue,
DATE_FORMAT(daedd.collection_datetime, '%Y-%m-%d %H:%i:%S') collectionDatetime
FROM
data_acquisition_equipment_daq_data daedd
LEFT JOIN
data_acquisition_equipment_daq_data_config daeddc
ON
daedd.data_config_id = daeddc.config_id
WHERE
daedd.sensor_id = :sensor_id
{% if query_start_time and query_end_time %}
AND
daedd.collection_datetime BETWEEN :query_start_time AND :query_end_time
{% endif %}
{% if experiment_id %}
AND
daedd.experiment_id = :experiment_id
{% endif %}
ORDER BY daedd.collection_datetime ASC
文件: app/api/experiment.py
daq_data_list = DataBaseHelper.select_all(
"experiment.index.select_history_data_by_id_and_time",
params={
"sensor_id": query.sensorId,
"query_start_time": query.queryStartTime,
"query_end_time": query.queryEndTime,
"experiment_id": experiment_id,
},
options={
"query_start_time": query.queryStartTime,
"query_end_time": query.queryEndTime,
"experiment_id": experiment_id,
},
)
多数据库操作
除了系统当前配置的 SQLALCHEMY_DATABASE_URI 对应的数据库之外,想操作其他数据库
配置参数
class BaseConfig:
SQLALCHEMY_BINDS = {
'cloud': 'mysql+pymysql://root:123456@127.0.0.1:3306/cloud_db?charset=utf8'
}
示例
add = DataBaseHelper.execute_create(
'daq_data',
data=online_data,
app=cp, # from flask import current_app as cp
bind='cloud' # 指定Bind的数据库
)
if not add:
raise Exception('推送线上数据失败')
DataBaseHelper.commit()
事务
默认不提交,使用DataBaseHelper.commit()来提交,或者 通过db.trans()上下文事务
from app.models import db
with db.trans():
add = DataBaseHelper.execute_create(
'daq_data',
data=online_data,
app=cp, # from flask import current_app as cp
bind='cloud' # 指定Bind的数据库
)
if not add:
raise Exception('推送线上数据失败')
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file Flask-SQL-Pro-3.0.tar.gz.
File metadata
- Download URL: Flask-SQL-Pro-3.0.tar.gz
- Upload date:
- Size: 8.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.16
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
816bbed324b6f7499f1a4d6963e04138256e60bb82f6cd29a1ee9785fc454e8d
|
|
| MD5 |
baceb8ee378fa62a3cfba1efb757c8db
|
|
| BLAKE2b-256 |
f0794e89aa40e399f1bd13abd6246fc7252f00d0e3eee79252cf18fb92c33c92
|