grab SQL data to csv file.
Project description
DB Grab SDK
DB Grab SDK 是一个用于内部数据源 的 SQL 数据获取/采集的工具,提供了便捷的数据库数据导出pandas csv的功能。
功能特性
- 基于SQLAlchemy,支持多种数据库,添加OceanBase支持
- 提供统一的数据获取接口
- 支持大数据量的分批读取和导出
- 内置日志记录功能
- 灵活的配置方式(支持.yml和.env配置文件)
- 支持从上次中断处继续采集数据
- 提供引擎管理器,统一管理多数据库连接
- 支持T+1模式、指定日期范围和迭代抽取模式
安装
# 从源码安装
pip install -e .
tips: 安装java jdk,设置JAVA_HOME, 以便采集 oceanbase 数据库
快速开始
1. 使用 ConfigGenerator 生成SQL配置的模板
from dbgrab import ConfigGenerator
ENV_FILE_PATH = "/YOUR_PATH/.env"
SQL_FILE_PATH = "/YOUR_PATH/sql.yml"
# 初始化配置生成器
generator = ConfigGenerator()
# 生成env配置的模板
generator.gen_env_template(ENV_FILE_PATH)
# 生成SQL配置的模板
generator.gen_sql_template(SQL_FILE_PATH)
4. 使用抓取器 DataBaseFetcher 采集数据
from dbgrab import log
from dbgrab.dt_fetcher import DataBaseFetcher
env_file= ".env" # generator 生成模板
sql_file= "sql.yml" # generator 生成模板
# 设置日志(可选) - 数据量较大时建议配置,可根据日志断点,继续配置采集
log_file = "logs/your_fetcher.log"
log.to_file(log_file)
# 初始化数据库抓取器并加载配置
dbf = DataBaseFetcher(env_file, sql_file)
# Output Example:
# xxxx: SQL Tables List: dict_keys(['credible_db.fetcher_task_record', 'user.info'])
# xxxx: Engines List: dict_keys(['CREDIBLE', 'USER'])
# 方法1:使用 T+1 模式采集导出
print(dbf.with_engine("CREDIBLE").to_csv(table_name='credible_db.fetcher_task_record', T_1=True))
# 方法2:指定日期范围导出 - start_date, end_date参数,会自动替换tables.yml配置的sql
print(dbf.with_engine("CREDIBLE").to_csv(table_name='credible_db.fetcher_task_record', start_date="20250818", end_date="20251026"))
# 方法3:使用迭代方式导出大数据 - 可在时间段内,按天、月、年的模式导出,建议数据量级在千万级以上时使用
print(dbf.with_engine("CREDIBLE").to_csv_iter(table_name='credible_db.fetcher_task_record', start_date="20250818", end_date="20251026", mode="day"))
3. 使用提取器 DataExtractor 采集数据
from dbgrab import DataExtractor, check_data, get_engine_manager
env_file= ".env" # generator 生成模板
# 初始化引擎管理器
manager = get_engine_manager(env_file)
# 查看所有引擎
manager.list_engines()
# 获取指定引擎
engine = manager.get_engine("ENGINE_NAME")
# 配置SQL查询语句
sql_query = "select * from YOUR_DB.DB limit 1000" # Mysql SQL
# sql_query = "SELECT * FROM YOUR_DB.DB t where ROWNUM <= 1000" # OceanBase SQL
# 检查数据量
sql_query_count = sql_query.lower().replace('select * from', "select count(*) from")
max_info = check_data(engine, sql_query_count)
print("Count总数: ", max_info)
# 创建抽取器对象, 设置工作路径和引擎
dte = DataExtractor(engine, manager._config.WORK_PATH) # manager._config 即 get_db_config返回的 DynamicDBConfig 类的实例
# 配置抽取器执行时的,输出文件文件名及SQL
dte.extractor.set_file("credible_test").to_csv(sql_query, chunk_size=20000)
配置文件
项目使用 .env 配置文件管理数据库连接信息:
from dbgrab.configs import get_db_config
# 读取.env配置
config = get_db_config(".env")
print(f"数据模型配置: {config.model_config}")
print(f"数据库配置: {config.DATABASES.keys()}")
print(f"OceanBase配置: {config.OCEANBASES.keys()}")
引擎管理器
from dbgrab.jaydebe import get_engine_manager
# 获取引擎管理器
manager = get_engine_manager(".env")
# 列出所有引擎信息
engines = manager.list_engines()
print("已配置的数据库引擎:")
for name, info in engines.items():
print(f" 🔹 {name}: {info}")
# 获取指定的引擎
engine = manager.get_engine(f"{name}")
# 重新加载引擎配置
manager.reload_engines()
# 关闭所有引擎
manager.close_all_engines()
核心组件
DataBaseFetcher
- 提供统一的数据获取接口
- 支持普通模式和迭代模式
- 自动处理数据分批和导出
- 支持 T+1 模式的数据获取
- 支持通过
with_config方法加载配置 - 支持从上次中断处继续采集数据
DataExtractor
- 提供基本的数据提取功能
- 支持通过引擎实例直接执行SQL
- 内置文件命名和路径管理
- 支持自定义 chunk_size 进行分批读取
ConfigGenerator
- 基于模板生成配置文件
- 支持获取模板到当前目录
- 提供灵活的配置生成选项
引擎管理器
get_engine_manager:获取引擎管理器实例- 支持从 .env 文件加载配置
- 统一管理多数据库连接
- 提供
list_engines、get_engine、get_session等方法 - 支持自动关闭所有引擎
配置管理
get_db_config:从 .env 文件获取配置- 支持管理多个数据库配置
- 支持管理 OceanBase 数据库配置
工具函数
check_data:检查数据量get_month_start_end_dates:获取月份起止日期get_year_start_end_dates:获取年份起止日期get_day_dates:获取每天的日期
日志功能
log:日志工具- 支持输出到文件
- 提供统一的日志接口
配置说明
SDK 提供了灵活的配置方式,支持使用 .yml 文件和 .env 文件进行管理:
1. 使用 .env 配置文件
- ENV配置文件,使用大写字母进行配置
示例 .env 文件:
# 数据库配置
DB_ENGINE_DATABASES__MAIN__NAME=main
DB_ENGINE_DATABASES__MAIN__URL=mysql+pymysql://user:password@localhost:3306/main_db
# 日志数据库 (SQLite)
DB_ENGINE_DATABASES__LOG__NAME=log
DB_ENGINE_DATABASES__LOG__URL=sqlite:///./logs.db
DB_ENGINE_DATABASES__LOG__ECHO=false
# Ocean Base oracle模式
DB_ENGINE_OCEANBASES__USER__JDBC=jdbc:oceanbase://localhost:2883/OCEANBASE_DB?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true
DB_ENGINE_OCEANBASES__USER__USERNAME=user
DB_ENGINE_OCEANBASES__USER__PASSWORD=password
DB_ENGINE_OCEANBASES__USER__DRIVER=com.oceanbase.jdbc.Driver
DB_ENGINE_OCEANBASES__USER__CLIENT_JAR=/soft/oceanbase/oceanbase-client-2.4.9.jar
# 配置工作路径
DB_ENGINE_WORK_PATH=/opt/db_grab
DB_ENGINE_LOG_PATH=/opt/db_grab/logs
DB_ENGINE_FILE_PATH=/opt/db_grab/files
DB_ENGINE_DEBUG=false
- 配置使用前缀
DB_ENGINE_用于区分不同的配置项,避免与其他环境变量冲突。
如:DB_ENGINE_DATABASES__MAIN__* 为嵌套结构,会被解析为 MAIN数据库的配置
配置文件,示例结构:
DATABASES: {
'MAIN': DatabaseConfig(NAME='main', URL='mysql+pymysql://user:password@localhost:3306/main_db', ...),
'LOG': DatabaseConfig(NAME='log', URL='sqlite:///./logs.db', DRIVER=None, POOL_SIZE=5, MAX_OVERFLOW=10, POOL_RECYCLE=3600, ECHO=False)
},
OCEANBASES: {
'USER': OceanbaseConfig(JDBC='jdbc:oceanbase://localhost:2883/OCEANBASE_DB?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true', ...),
},
'DEBUG': False,
'FILE_PATH': '/opt/db_grab/files',
'LOG_PATH': '/opt/db_grab/logs',
...
2. 使用 .yml 配置文件
sql_file 配置文件结构:
# 表配置
tables: # 固定的字段
SCHEMA.TABLE_NAME: # 业务表名
db_alias: "oracle_engine" # 字段已弃用
desc: "表描述" # 业务表的描述
filter: "过滤字段" # 字段已弃用
sql: | # SQL语句 - {start_date} 和 {end_date} 固定格式, DataBaseFetcher会自动替换
SELECT column1 AS column1
,column2 AS column2
FROM SCHEMA.TABLE_NAME
WHERE {filter} >= '{start_date}'
AND {filter} <= '{end_date}'
order by column1
3. 使用 JayDeBeConfig 加载配置
from dbgrab.cons import JayDeBeConfig
j_conf = JayDeBeConfig(env_file=env_file, sql_file=sql_file)
print(f"SQL配置: {j_conf.sql_config}")
print(f"引擎管理器: {j_conf.manager}")
print(f"ENV配置: {[i for i in dir(j_conf.db_config) if i.isupper()]}")
注意事项
- 配置文件准备:使用前请确保已正确配置,可以使用ConfigGenerator 生成 env 和 yaml 文件配置模板
- 环境变量设置:使用 OceanBase 时,需要确保已正确配置 JAVA_HOME 环境变量
- 大数据量处理:对于大数据量的查询,建议使用迭代模式(to_csv_iter)和适当的 chunk_size 参数
- 日志配置:建议在使用前配置日志文件,以便于排查问题
- OceanBase 配置:使用 OceanBase 时,需要确保已正确配置 JDBC 驱动路径和相关参数
- 路径设置:确保工作目录(如 temp 目录)存在,以便存储日志和导出的 CSV 文件
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
rtsf_dbgrab-0.1.0.tar.gz
(20.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rtsf_dbgrab-0.1.0.tar.gz.
File metadata
- Download URL: rtsf_dbgrab-0.1.0.tar.gz
- Upload date:
- Size: 20.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.29 {"installer":{"name":"uv","version":"0.9.29","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f253efa309aa4f264981745e45216505514c07bc7e423cdcbc8c602d27e3c7b6
|
|
| MD5 |
1b5c31feb6f731ef6cf71bee0bfc432e
|
|
| BLAKE2b-256 |
903e6e50a84556e6a1345e4a4746f6e9073cfbace3b1b68d448d93efd630885f
|
File details
Details for the file rtsf_dbgrab-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rtsf_dbgrab-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.29 {"installer":{"name":"uv","version":"0.9.29","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b6f2ca878b1eb513b9885432d4ad120b120e0f444b1381dfbe25a3df9e8faa3b
|
|
| MD5 |
8f3b6c21feef2922b9c4a6224aad4888
|
|
| BLAKE2b-256 |
fcfa1e6bc7682aaf2103de0228833b94e9ca6a949033c127543d541de251eccc
|