grab SQL data to csv file.
Project description
DB Grab SDK
DB Grab SDK 是一个用于内部数据源 的 SQL 数据获取/采集的工具,提供了便捷的数据库数据导出pandas csv的功能。
功能特性
- 基于SQLAlchemy,支持多种数据库,添加OceanBase支持
- 提供统一的数据获取接口
- 支持大数据量的分批读取和导出
- 内置日志记录功能
- 灵活的配置方式(支持.yml和.env配置文件)
- 支持从上次中断处继续采集数据
- 提供引擎管理器,统一管理多数据库连接
- 支持T+1模式、指定日期范围和迭代抽取模式
安装
# 从源码安装
pip install -e .
tips: 安装java jdk,设置JAVA_HOME, 以便采集 oceanbase 数据库
快速开始
1. 使用 ConfigGenerator 生成SQL配置的模板
from dbgrab import ConfigGenerator
ENV_FILE_PATH = "/YOUR_PATH/.env"
SQL_FILE_PATH = "/YOUR_PATH/sql.yml"
# 初始化配置生成器
generator = ConfigGenerator()
# 生成env配置的模板
generator.gen_env_template(ENV_FILE_PATH)
# 生成SQL配置的模板
generator.gen_sql_template(SQL_FILE_PATH)
4. 使用抓取器 DataBaseFetcher 采集数据
from dbgrab import log
from dbgrab.dt_fetcher import DataBaseFetcher
env_file= ".env" # generator 生成模板
sql_file= "sql.yml" # generator 生成模板
# 设置日志(可选) - 数据量较大时建议配置,可根据日志断点,继续配置采集
log_file = "logs/your_fetcher.log"
log.to_file(log_file)
# 初始化数据库抓取器并加载配置
dbf = DataBaseFetcher(env_file, sql_file)
# Output Example:
# xxxx: SQL Tables List: dict_keys(['credible_db.fetcher_task_record', 'user.info'])
# xxxx: Engines List: dict_keys(['CREDIBLE', 'USER'])
# 方法1:使用 T+1 模式采集导出
print(dbf.with_engine("CREDIBLE").to_csv(table_name='credible_db.fetcher_task_record', T_1=True))
# 方法2:指定日期范围导出 - start_date, end_date参数,会自动替换tables.yml配置的sql
print(dbf.with_engine("CREDIBLE").to_csv(table_name='credible_db.fetcher_task_record', start_date="20250818", end_date="20251026"))
# 方法3:使用迭代方式导出大数据 - 可在时间段内,按天、月、年的模式导出,建议数据量级在千万级以上时使用
print(dbf.with_engine("CREDIBLE").to_csv_iter(table_name='credible_db.fetcher_task_record', start_date="20250818", end_date="20251026", mode="day"))
3. 使用提取器 DataExtractor 采集数据
from dbgrab import DataExtractor, check_data, get_engine_manager
env_file= ".env" # generator 生成模板
# 初始化引擎管理器
manager = get_engine_manager(env_file)
# 查看所有引擎
manager.list_engines()
# 获取指定引擎
engine = manager.get_engine("ENGINE_NAME")
# 配置SQL查询语句
sql_query = "select * from YOUR_DB.DB limit 1000" # Mysql SQL
# sql_query = "SELECT * FROM YOUR_DB.DB t where ROWNUM <= 1000" # OceanBase SQL
# 检查数据量
sql_query_count = sql_query.lower().replace('select * from', "select count(*) from")
max_info = check_data(engine, sql_query_count)
print("Count总数: ", max_info)
# 创建抽取器对象, 设置工作路径和引擎
dte = DataExtractor(engine, manager._config.WORK_PATH) # manager._config 即 get_db_config返回的 DynamicDBConfig 类的实例
# 配置抽取器执行时的,输出文件文件名及SQL
dte.extractor.set_file("credible_test").to_csv(sql_query, chunk_size=20000)
配置文件
项目使用 .env 配置文件管理数据库连接信息:
from dbgrab.configs import get_db_config
# 读取.env配置
config = get_db_config(".env")
print(f"数据模型配置: {config.model_config}")
print(f"数据库配置: {config.DATABASES.keys()}")
print(f"OceanBase配置: {config.OCEANBASES.keys()}")
引擎管理器
from dbgrab.jaydebe import get_engine_manager
# 获取引擎管理器
manager = get_engine_manager(".env")
# 列出所有引擎信息
engines = manager.list_engines()
print("已配置的数据库引擎:")
for name, info in engines.items():
print(f" 🔹 {name}: {info}")
# 获取指定的引擎
engine = manager.get_engine(f"{name}")
# 重新加载引擎配置
manager.reload_engines()
# 关闭所有引擎
manager.close_all_engines()
核心组件
DataBaseFetcher
- 提供统一的数据获取接口
- 支持普通模式和迭代模式
- 自动处理数据分批和导出
- 支持 T+1 模式的数据获取
- 支持通过
with_config方法加载配置 - 支持从上次中断处继续采集数据
DataExtractor
- 提供基本的数据提取功能
- 支持通过引擎实例直接执行SQL
- 内置文件命名和路径管理
- 支持自定义 chunk_size 进行分批读取
ConfigGenerator
- 基于模板生成配置文件
- 支持获取模板到当前目录
- 提供灵活的配置生成选项
引擎管理器
get_engine_manager:获取引擎管理器实例- 支持从 .env 文件加载配置
- 统一管理多数据库连接
- 提供
list_engines、get_engine、get_session等方法 - 支持自动关闭所有引擎
配置管理
get_db_config:从 .env 文件获取配置- 支持管理多个数据库配置
- 支持管理 OceanBase 数据库配置
工具函数
check_data:检查数据量get_month_start_end_dates:获取月份起止日期get_year_start_end_dates:获取年份起止日期get_day_dates:获取每天的日期
日志功能
log:日志工具- 支持输出到文件
- 提供统一的日志接口
配置说明
SDK 提供了灵活的配置方式,支持使用 .yml 文件和 .env 文件进行管理:
1. 使用 .env 配置文件
- ENV配置文件,使用大写字母进行配置
示例 .env 文件:
# 数据库配置
DB_ENGINE_DATABASES__MAIN__NAME=main
DB_ENGINE_DATABASES__MAIN__URL=mysql+pymysql://user:password@localhost:3306/main_db
# 日志数据库 (SQLite)
DB_ENGINE_DATABASES__LOG__NAME=log
DB_ENGINE_DATABASES__LOG__URL=sqlite:///./logs.db
DB_ENGINE_DATABASES__LOG__ECHO=false
# Ocean Base oracle模式
DB_ENGINE_OCEANBASES__USER__JDBC=jdbc:oceanbase://localhost:2883/OCEANBASE_DB?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true
DB_ENGINE_OCEANBASES__USER__USERNAME=user
DB_ENGINE_OCEANBASES__USER__PASSWORD=password
DB_ENGINE_OCEANBASES__USER__DRIVER=com.oceanbase.jdbc.Driver
DB_ENGINE_OCEANBASES__USER__CLIENT_JAR=/soft/oceanbase/oceanbase-client-2.4.9.jar
# 配置工作路径
DB_ENGINE_WORK_PATH=/opt/db_grab
DB_ENGINE_LOG_PATH=/opt/db_grab/logs
DB_ENGINE_FILE_PATH=/opt/db_grab/files
DB_ENGINE_DEBUG=false
- 配置使用前缀
DB_ENGINE_用于区分不同的配置项,避免与其他环境变量冲突。
如:DB_ENGINE_DATABASES__MAIN__* 为嵌套结构,会被解析为 MAIN数据库的配置
配置文件,示例结构:
DATABASES: {
'MAIN': DatabaseConfig(NAME='main', URL='mysql+pymysql://user:password@localhost:3306/main_db', ...),
'LOG': DatabaseConfig(NAME='log', URL='sqlite:///./logs.db', DRIVER=None, POOL_SIZE=5, MAX_OVERFLOW=10, POOL_RECYCLE=3600, ECHO=False)
},
OCEANBASES: {
'USER': OceanbaseConfig(JDBC='jdbc:oceanbase://localhost:2883/OCEANBASE_DB?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true', ...),
},
'DEBUG': False,
'FILE_PATH': '/opt/db_grab/files',
'LOG_PATH': '/opt/db_grab/logs',
...
2. 使用 .yml 配置文件
sql_file 配置文件结构:
# 表配置
tables: # 固定的字段
SCHEMA.TABLE_NAME: # 业务表名
db_alias: "oracle_engine" # 字段已弃用
desc: "表描述" # 业务表的描述
filter: "过滤字段" # 字段已弃用
sql: | # SQL语句 - {start_date} 和 {end_date} 固定格式, DataBaseFetcher会自动替换
SELECT column1 AS column1
,column2 AS column2
FROM SCHEMA.TABLE_NAME
WHERE {filter} >= '{start_date}'
AND {filter} <= '{end_date}'
order by column1
3. 使用 JayDeBeConfig 加载配置
from dbgrab.cons import JayDeBeConfig
j_conf = JayDeBeConfig(env_file=env_file, sql_file=sql_file)
print(f"SQL配置: {j_conf.sql_config}")
print(f"引擎管理器: {j_conf.manager}")
print(f"ENV配置: {[i for i in dir(j_conf.db_config) if i.isupper()]}")
注意事项
- 配置文件准备:使用前请确保已正确配置,可以使用ConfigGenerator 生成 env 和 yaml 文件配置模板
- 环境变量设置:使用 OceanBase 时,需要确保已正确配置 JAVA_HOME 环境变量
- 大数据量处理:对于大数据量的查询,建议使用迭代模式(to_csv_iter)和适当的 chunk_size 参数
- 日志配置:建议在使用前配置日志文件,以便于排查问题
- OceanBase 配置:使用 OceanBase 时,需要确保已正确配置 JDBC 驱动路径和相关参数
- 路径设置:确保工作目录(如 temp 目录)存在,以便存储日志和导出的 CSV 文件
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rtsf_dbgrab-0.1.0.tar.gz.
File metadata
- Download URL: rtsf_dbgrab-0.1.0.tar.gz
- Upload date:
- Size: 20.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.29 {"installer":{"name":"uv","version":"0.9.29","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f253efa309aa4f264981745e45216505514c07bc7e423cdcbc8c602d27e3c7b6
|
|
| MD5 |
1b5c31feb6f731ef6cf71bee0bfc432e
|
|
| BLAKE2b-256 |
903e6e50a84556e6a1345e4a4746f6e9073cfbace3b1b68d448d93efd630885f
|
File details
Details for the file rtsf_dbgrab-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rtsf_dbgrab-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.29 {"installer":{"name":"uv","version":"0.9.29","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b6f2ca878b1eb513b9885432d4ad120b120e0f444b1381dfbe25a3df9e8faa3b
|
|
| MD5 |
8f3b6c21feef2922b9c4a6224aad4888
|
|
| BLAKE2b-256 |
fcfa1e6bc7682aaf2103de0228833b94e9ca6a949033c127543d541de251eccc
|