Skip to main content

基于Flask-SQLAlchemy, 抽离SQL语句, 使用Jinja2语法实现动态SQL, 支持上下文事务, 支持分页

Project description

示例: https://www.cnblogs.com/miaokela/articles/17571427.html

pip install flask-sql-pro

注册

在create_app中注册

def create_app():
    # 注册Flask-SQL-Pro对象,并且注册Flask-SQLAlchemy
    sqlpro = FlaskSQLPro()
    db = sqlpro.init_app(app)

为什么要在init_app()中返回一个db呢? Flask-Migrate等插件数据库操作可能用到SQLAlchemy实例对象

from flask_migrate import Migrate


Migrate(app, db)

在models.py中引入

from flask_sql_pro import DataBaseHelper
from sqlalchemy import text
from sqlalchemy.dialects.mysql import TINYINT, BIGINT, VARCHAR, DATETIME, DOUBLE, INTEGER

# 其他文件中如果要使用db对象: from project_path.models import db
db = DataBaseHelper.db

class BaseModel(db.Model):
    __abstract__ = True
    created_at = db.Column(DATETIME, comment='创建时间', server_default=text('Now()'))
    updated_at = db.Column(DATETIME, comment='更新时间', server_default=text('Now()'), onupdate=datetime.now())
    is_deleted = db.Column(TINYINT, comment='是否逻辑删除', server_default=text('0'), index=True)

    @classmethod
    def queryset(cls):
        """
        Data that is not logically deleted
        """
        return cls.query.filter(cls.is_deleted == 0)

CRUD示例

from flask_sql_pro import DataBaseHelper  # 主工具类
from app.models import db # SQLAlchemy实例对象

with db.trans():
    _id = DataBaseHelper.execute_create(
        'transit_record',  # 数据库名称
        data=data,
    )

    if not _id:
        raise AddRecordException()
with db.trans():
    rows = DataBaseHelper.execute_delete(
        'transit_record',
        where={
            'id': _id,
        },
        logic=True
    )
    if not rows:
        raise DelRecordException()
with db.trans():
    rows = DataBaseHelper.execute_update(
        'transit_record',
        data=data,
        where={
            'id': _id
        }
    )
    if not rows:
        raise ModifyRecordException()

创建存放SQL语句的文件夹 默认是Flask的instance_path路径, 即: project_path/instance/ 则默认的SQL文件夹应该创建在: project_path/instance/sql 允许自定义路径,配置参数 DB_HELPER_SQL_FILE_PATH

import os


class BaseConfig:
    BASE_DIR = os.path.dirname(os.path.realpath(__file__))
    APP_DIR = os.path.join(BASE_DIR, 'app')
    DB_HELPER_SQL_FILE_PATH = os.path.join(
        APP_DIR,
        'sql'
    )

# 在创建Flask应用时,注册配置
# __init__.py
def create_app():
    # ...
    app.config.from_object(BaseConfig())
    # ...

其他Flask-SQL-Pro的配置

DB_HELPER_PAGE_PARAM = 'page'  # 默认分页第几页
DB_HELPER_PAGE_SIZE_PARAM = 'page_size'  # 默认分页每页数量
DB_HELPER_PRINT_MSG = True  # 是否在终端打印SQL执行的语句

查询示例

文件: sql/transit/index.yml

query_map: |
    SELECT
        TRG.latitude,
        TRG.longitude,
        TRG.location,
        TRG.location_type
    FROM
        transit_record_gps AS TRG
    LEFT JOIN
        transit_record AS TR
    ON
        TRG.transit_record_id = TR.id
    WHERE
        TRG.is_deleted = 0
    AND
        TR.is_deleted = 0
    AND
        TR.id = :transit_record_id

文件: app/api/transit.py

transit_record_gps = DataBaseHelper.select_all(
    'transit.index.query_map',
    params={
        'transit_record_id': transit_record_id
    },
    return_obj=False,  # return_obj默认为True,即返回的是对象可以通过 transit_record_gps[0].transit_record_id 点的方式获取数据,如果为False,返回的是字典
)
  • 分页

默认需要传递的参数是 page/page_size, 两个参数都传递才会分页

文件: sql/history/index.yml

select_user_experiments: |
    SELECT
        experiment_id,
        experiment_name,
        date_format(update_datetime,"%Y-%m-%d") update_time
    FROM
        data_experiment_record
    WHERE
        delete_flag = 0
experiments = DataBaseHelper.select_all(
    'history.index.select_user_experiments',
    params={
        'account_id': account_id,
    },
    options={
        'page': 1,
        'page_size': 20,
    }
)
  • 动态SQL

配合jinja2,实现条件语句,动态生成SQL

文件: sql/experiment/index.yml

select_history_data_by_id_and_time: |
    SELECT
        daedd.daq_data_id daqDataId,
        daedd.vel_rms_value rmsVelocityValue,
        daedd.peak_value peakValue,
        daedd.peak_to_peak_value peaToPeakValue,
        daedd.skewness_value skewnessValue,
        daedd.mean_value meanValue,
        daedd.kurtosis_value kurtosisValue,
        daedd.rms_value rmsRawValue,
        daedd.rpm_value rpmValue,
        DATE_FORMAT(daedd.collection_datetime, '%Y-%m-%d %H:%i:%S') collectionDatetime
    FROM
        data_acquisition_equipment_daq_data daedd
    LEFT JOIN
        data_acquisition_equipment_daq_data_config daeddc
    ON
        daedd.data_config_id = daeddc.config_id
    WHERE
        daedd.sensor_id = :sensor_id
    {% if query_start_time and query_end_time %}
    AND
        daedd.collection_datetime BETWEEN :query_start_time AND :query_end_time
    {% endif %}
    {% if experiment_id %}
    AND
        daedd.experiment_id = :experiment_id
    {% endif %}
    ORDER BY daedd.collection_datetime ASC

文件: app/api/experiment.py

daq_data_list = DataBaseHelper.select_all(
    "experiment.index.select_history_data_by_id_and_time",
    params={
        "sensor_id": query.sensorId,
        "query_start_time": query.queryStartTime,
        "query_end_time": query.queryEndTime,
        "experiment_id": experiment_id,
    },
    options={
        "query_start_time": query.queryStartTime,
        "query_end_time": query.queryEndTime,
        "experiment_id": experiment_id,
    },
)
  • 多数据库操作

除了系统当前配置的 SQLALCHEMY_DATABASE_URI 对应的数据库之外,想操作其他数据库

配置参数

class BaseConfig:
    SQLALCHEMY_BINDS = {
        'cloud': 'mysql+pymysql://root:123456@127.0.0.1:3306/cloud_db?charset=utf8'
    }

示例

add = DataBaseHelper.execute_create(
    'daq_data',
    data=online_data,
    app=cp,  # from flask import current_app as cp
    bind='cloud'  # 指定Bind的数据库
)
if not add:
    raise Exception('推送线上数据失败')

DataBaseHelper.commit()
  • 事务

默认不提交,使用DataBaseHelper.commit()来提交,或者 通过db.trans()上下文事务

from app.models import db

with db.trans():
    add = DataBaseHelper.execute_create(
        'daq_data',
        data=online_data,
        app=cp,  # from flask import current_app as cp
        bind='cloud'  # 指定Bind的数据库
    )
    if not add:
        raise Exception('推送线上数据失败')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Flask-SQL-Pro-3.0.tar.gz (8.2 kB view details)

Uploaded Source

File details

Details for the file Flask-SQL-Pro-3.0.tar.gz.

File metadata

  • Download URL: Flask-SQL-Pro-3.0.tar.gz
  • Upload date:
  • Size: 8.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.16

File hashes

Hashes for Flask-SQL-Pro-3.0.tar.gz
Algorithm Hash digest
SHA256 816bbed324b6f7499f1a4d6963e04138256e60bb82f6cd29a1ee9785fc454e8d
MD5 baceb8ee378fa62a3cfba1efb757c8db
BLAKE2b-256 f0794e89aa40e399f1bd13abd6246fc7252f00d0e3eee79252cf18fb92c33c92

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page