A Python package for querying Hive data and processing with AIGC applications
Project description
pypabhiveagent
一个用于查询 Hive 数据并通过 AIGC 应用处理的 Python 包。
pypabhiveagent 提供了一个简单易用的接口,用于从 Hive 表查询数据,将数据发送到大模型应用(Prompt 或 Agent 类型)进行处理,解析结果并将处理后的数据存储回 Hive 表或本地 Excel 文件。
功能特性
- ✅ Hive 数据查询:使用标准 SQL 查询 Hive 表数据
- ✅ DataFrame 输入:支持直接传入 pandas DataFrame 作为数据源
- ✅ DataFrame 输出:查询结果包含 DataFrame 对象,便于后续处理
- ✅ 大小写不敏感:列名匹配支持大小写不敏感,提高易用性
- ✅ AIGC 应用集成:支持 Prompt 和 Agent 两种应用类型
- ✅ 智能结果解析:自动解析 JSON 或字符串格式的结果
- ✅ 思维链支持:可选择获取 AIGC 的推理过程
- ✅ 多种存储方式:支持保存到 Hive 表或 Excel 文件
- ✅ 并发处理:使用多线程提高大数据集处理效率
- ✅ 完善的错误处理:详细的错误日志和重试机制
- ✅ 灵活的配置:支持链式调用和丰富的配置选项
安装
基础安装
pip install pypabhiveagent
完整安装(包含 Hive 支持)
如果需要使用 Hive 查询功能,需要额外安装 Hive 支持:
# 安装包含 PySpark 支持
pip install pypabhiveagent[hive]
从源码安装
git clone https://github.com/Nevernamed/pypabhiveagent.git
cd pypabhiveagent
pip install -e . # 基础安装
# 或
pip install -e .[hive] # 包含 Hive 支持
依赖说明
- 基础安装:只包含核心功能(pandas, requests, openpyxl),支持 DataFrame 输入输出
- Hive 支持:需要额外安装
pypabhiveagent[hive]来获得 PySpark 支持,用于 Hive 查询和存储 - 注意:如果只使用 DataFrame 作为数据源,不需要安装 Hive 支持
快速开始
基本使用示例
from pypabhiveagent import HiveAgent
# 1. 创建 HiveAgent 实例
agent = HiveAgent()
# 2. 配置连接参数
agent.set_config(
# AIGC 服务配置
url="https://aigc-api.example.com/prompt",
app_id="your_app_id",
token="your_token",
aigc_app_id="prompt_app_12345"
)
# 3. 执行查询和处理
result = agent.query(
# SQL 查询语句
sql="SELECT id, question FROM question_table WHERE dt='20231201' LIMIT 10",
# 应用类型:'prompt' 或 'agent'
app_type='prompt',
# 字段映射:将 Hive 字段映射到 AIGC 参数
field_mapping={'question': None},
# 是否包含思维链结果
include_reasoning=False,
# 存储选项
cache_to_excel=True,
save_to_hive=True,
result_table_name='question_answer_result'
)
# 4. 查看结果
print(f"处理状态: {result['success']}")
print(f"处理消息: {result['message']}")
print(f"处理行数: {result['rows_processed']}")
# 获取结果 DataFrame 用于后续处理
if result.get('df') is not None:
result_df = result['df']
print(f"结果 DataFrame: {len(result_df)} 行 x {len(result_df.columns)} 列")
if result.get('excel_path'):
print(f"Excel 文件: {result['excel_path']}")
if result.get('hive_table'):
print(f"Hive 表: {result['hive_table']}")
链式调用示例
from pypabhiveagent import HiveAgent
result = (
HiveAgent()
.set_config(
url="https://aigc-api.example.com/prompt",
app_id="your_app_id",
token="your_token",
aigc_app_id="prompt_app_12345"
)
.query(
sql="SELECT id, text FROM my_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'text': None},
cache_to_excel=True,
save_to_hive=False
)
)
print(result['message'])
# 获取结果 DataFrame
result_df = result['df']
配置说明
必需配置参数
使用 set_config() 方法配置以下必需参数:
| 参数 | 类型 | 说明 |
|---|---|---|
url |
str | AIGC 服务 URL |
app_id |
str | 应用 ID |
token |
str | 认证 token |
aigc_app_id |
str | AIGC 应用 ID |
可选配置参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
timeout |
int | 60 | AIGC 请求超时时间(秒) |
max_retries |
int | 3 | 最大重试次数 |
sleep |
float | 2 | 请求间隔时间(秒),用于避免 API 限流 |
配置示例
agent = HiveAgent()
agent.set_config(
# 必需参数
url="https://aigc-api.example.com/prompt",
app_id="your_app_id",
token="your_token",
aigc_app_id="prompt_app_12345",
# 可选参数
timeout=60, # 增加超时时间到 60 秒
max_retries=5, # 增加重试次数到 5 次
sleep=0.5 # 每个请求间隔 0.5 秒
)
使用指南
应用类型
pypabhiveagent 支持两种 AIGC 应用类型:
1. Prompt 应用 (app_type="prompt")
基于提示词的应用,适合模板化的查询场景。
单参数输入示例:
result = agent.query(
sql="SELECT id, question FROM qa_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'question': None}, # 单参数
cache_to_excel=True,
save_to_hive=True,
result_table_name='qa_result'
)
多参数输入示例:
result = agent.query(
sql="""
SELECT user_name, product_name, purchase_amount
FROM purchase_table
WHERE dt='20231201'
""",
app_type='prompt',
field_mapping={
'user_name': 'userName',
'product_name': 'productName',
'purchase_amount': 'amount'
}, # 多参数
cache_to_excel=True,
save_to_hive=True,
result_table_name='purchase_analysis'
)
2. Agent 应用 (app_type="agent")
基于智能体的应用,适合复杂推理任务。
单参数输入示例:
result = agent.query(
sql="SELECT id, content FROM content_table WHERE dt='20231201'",
app_type='agent',
field_mapping={'content': None}, # 单参数
include_reasoning=True, # 包含思维链
cache_to_excel=True,
save_to_hive=True,
result_table_name='content_analysis'
)
多参数输入示例:
result = agent.query(
sql="""
SELECT customer_name, order_status, order_amount
FROM order_table
WHERE dt='20231201'
""",
app_type='agent',
field_mapping={
'customer_name': 'customerName',
'order_status': 'status',
'order_amount': 'amount'
}, # 多参数
include_reasoning=True,
cache_to_excel=True,
save_to_hive=True,
result_table_name='order_analysis'
)
字段映射
字段映射用于指定 Hive 查询结果字段与 AIGC 应用参数的对应关系。
单参数映射
当只有一个输入参数时:
field_mapping = {'question': None}
# 或
field_mapping = {'question': 'query'}
- Prompt 应用:值会放入请求体的
query字段 - Agent 应用:值会放入请求体的
message字段(格式:{'content': value, 'content_type': 'text'})
多参数映射
当有多个输入参数时:
field_mapping = {
'field1': 'param1',
'field2': 'param2',
'field3': 'param3'
}
- Prompt 应用:键值对会放入请求体的
dynamicColumMap字段 - Agent 应用:键值对会放入请求体的
args字段
映射规则
- 键:Hive 查询结果中的字段名
- 值:发送到 AIGC 应用的参数名(可以为
None,表示使用字段名)
结果处理
pypabhiveagent 会自动解析 AIGC 返回的结果并合并到原始数据中。
JSON 结果
如果 AIGC 返回 JSON 格式:
{"answer": "这是答案", "confidence": 0.95}
会自动解析并添加为新列:
| id | question | answer | confidence |
|---|---|---|---|
| 1 | 问题1 | 这是答案 | 0.95 |
字符串结果
如果 AIGC 返回非 JSON 字符串,会创建 result 列:
| id | question | result |
|---|---|---|
| 1 | 问题1 | 这是答案 |
思维链结果
当 include_reasoning=True 时,会添加 reasoning_content 列:
| id | question | answer | reasoning_content |
|---|---|---|---|
| 1 | 问题1 | 这是答案 | 推理过程... |
存储选项
Excel 缓存
设置 cache_to_excel=True 将结果保存到本地 Excel 文件:
result = agent.query(
sql="SELECT * FROM my_table",
app_type='prompt',
field_mapping={'text': None},
cache_to_excel=True # 保存到 Excel
)
print(f"Excel 文件: {result['excel_path']}")
# 输出: Excel 文件: prompt_app_12345-20231201143025.xlsx
文件命名格式:{aigcAppId}-{年月日时分秒}.xlsx
Hive 存储
设置 save_to_hive=True 将结果保存到 Hive 表:
result = agent.query(
sql="SELECT * FROM my_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'text': None},
save_to_hive=True,
result_table_name='my_result_table', # 必需
write_mode='append' # 'append' 或 'overwrite'
)
print(f"Hive 表: {result['hive_table']}")
# 输出: Hive 表: my_result_table
自动表管理:
- 如果结果表不存在,会自动创建
- 如果源表有
dt字段,会创建分区表(按dt分区) - 自动推断字段类型
写入模式:
append(默认):追加数据到表中overwrite:覆盖表中的数据
并发处理
使用 max_workers 参数控制并发线程数:
result = agent.query(
sql="SELECT * FROM large_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'text': None},
max_workers=10 # 使用 10 个并发线程
)
性能建议:
- 默认值为 1,确保稳定性
- 如果 API 支持高并发,可以适当增加
max_workers(如 5-10) - 如果遇到 API 限流,降低
max_workers或增加sleep参数 - 建议每次处理 100-1000 行数据
DataFrame 输入输出
使用 DataFrame 作为数据源
除了使用 SQL 查询,pypabhiveagent 还支持直接传入 pandas DataFrame 作为数据源:
import pandas as pd
from pypabhiveagent import HiveAgent
# 创建或加载 DataFrame
data = {
'id': [1, 2, 3, 4, 5],
'Question': ['问题1', '问题2', '问题3', '问题4', '问题5'], # 注意大小写
'dt': ['20231201'] * 5
}
df = pd.DataFrame(data)
# 使用 DataFrame 作为数据源
agent = HiveAgent()
agent.set_config(
url="https://aigc-api.example.com/prompt",
app_id="your_app_id",
token="your_token",
aigc_app_id="prompt_app_12345"
)
result = agent.query(
df=df, # 传入 DataFrame 而不是 SQL
app_type='prompt',
field_mapping={'question': None}, # 大小写不敏感,会匹配到 'Question'
cache_to_excel=True,
save_to_hive=True,
result_table_name='result_table'
)
使用场景:
- 从 CSV、Excel 等文件加载数据
- 从 API 获取的数据
- 已经在内存中处理过的数据
- 需要预处理或筛选的数据
获取结果 DataFrame
查询结果中包含处理后的 DataFrame,可以直接用于后续处理:
result = agent.query(
sql="SELECT id, question FROM qa_table WHERE dt='20231201'",
app_type='prompt',
field_mapping={'question': None},
cache_to_excel=False,
save_to_hive=False
)
# 获取结果 DataFrame
if result['success'] and result['df'] is not None:
result_df = result['df']
# 进行后续处理
print(f"处理了 {len(result_df)} 行数据")
print(f"列名: {list(result_df.columns)}")
# 数据分析
print(result_df.describe())
# 保存到其他格式
result_df.to_csv('output.csv', index=False)
result_df.to_json('output.json', orient='records')
# 继续处理
filtered_df = result_df[result_df['score'] > 0.8]
使用场景:
- 需要对结果进行进一步分析
- 保存到多种格式(CSV、JSON、Parquet 等)
- 与其他数据处理流程集成
- 数据质量检查和验证
大小写不敏感的列名匹配
field_mapping 中的列名匹配是大小写不敏感的,提高了易用性:
# DataFrame 的列名可能是各种大小写
df = pd.DataFrame({
'ID': [1, 2, 3],
'UserName': ['张三', '李四', '王五'],
'ProductName': ['产品A', '产品B', '产品C'],
'DT': ['20231201'] * 3
})
# field_mapping 中可以使用任意大小写
result = agent.query(
df=df,
app_type='prompt',
field_mapping={
'username': 'userName', # 会匹配到 'UserName'
'productname': 'productName' # 会匹配到 'ProductName'
}
)
优势:
- 不需要担心列名的大小写
- 提高代码的健壮性
- 减少因大小写不匹配导致的错误
错误处理
pypabhiveagent 提供完善的错误处理机制。当部分行处理失败时:
处理策略:
- ✅ 保留原始数据
- ✅ 结果字段填充 None 值
- ✅ 保持与成功数据的结构一致
- ✅ 打印失败统计和提醒信息
result = agent.query(
sql="SELECT * FROM my_table",
app_type='prompt',
field_mapping={'text': None}
)
# 查看处理结果
print(f"处理状态: {result['success']}")
print(f"处理消息: {result['message']}")
# 获取结果 DataFrame
if result['df'] is not None:
result_df = result['df']
# 识别失败的行(结果列为 None)
if 'answer' in result_df.columns:
failed_rows = result_df[result_df['answer'].isna()]
success_rows = result_df[result_df['answer'].notna()]
print(f"成功: {len(success_rows)} 行")
print(f"失败: {len(failed_rows)} 行")
# 查看失败详情
if result.get('errors'):
for error in result['errors'][:5]:
print(f"行 {error['row_index']}: {error['error']}")
失败时的输出示例:
================================================================================
⚠️ PROCESSING COMPLETED WITH ERRORS
================================================================================
✓ Successfully processed: 8 rows
✗ Failed to process: 2 rows
📌 Failed rows are included in the result with:
- Original data preserved
- Result columns filled with None values
- Result columns: answer, confidence
💡 Tip: Filter failed rows using: df[df['result_column'].isna()]
================================================================================
数据结构示例:
# 成功的行
id | question | answer | confidence
1 | 问题1 | 答案1 | 0.95
# 失败的行(结果列为 None)
id | question | answer | confidence
2 | 问题2 | None | None
错误类型:
ConfigurationError:配置错误HiveQueryError:Hive 查询错误AIGCRequestError:AIGC 请求错误ResultParseError:结果解析错误StorageError:存储错误
详细的错误处理指南请参考 docs/ERROR_HANDLING_GUIDE.md
日志配置
pypabhiveagent 使用 Python 标准 logging 模块:
from pypabhiveagent import configure_logging, set_log_level
import logging
# 配置日志
configure_logging(
level=logging.INFO,
log_file='pypabhiveagent.log'
)
# 或者只设置日志级别
set_log_level(logging.DEBUG)
完整示例
查看 examples/ 目录获取更多示例:
basic_usage.py:基本使用示例prompt_app_example.py:Prompt 应用完整示例agent_app_example.py:Agent 应用完整示例dataframe_usage_example.py:DataFrame 输入输出示例error_handling_example.py:错误处理示例
或者使用 demo() 方法查看示例代码:
from pypabhiveagent import HiveAgent
# 直接打印完整的使用指南
HiveAgent.demo()
开发
安装开发依赖
pip install -r requirements-dev.txt
运行测试
pytest
代码格式化
black pypabhiveagent/
类型检查
mypy pypabhiveagent/
代码检查
flake8 pypabhiveagent/
常见问题
1. 如何避免 API 限流?
使用 sleep 参数设置请求间隔:
agent.set_config(
url="...",
app_id="...",
token="...",
aigc_app_id="...",
sleep=0.5 # 每个请求间隔 0.5 秒
)
或者降低并发数:
result = agent.query(
sql="...",
app_type='prompt',
field_mapping={'text': None},
max_workers=3 # 降低并发数
)
2. 如何处理大数据集?
建议分批处理:
# 按日期分批处理
dates = ['20231201', '20231202', '20231203']
for date in dates:
result = agent.query(
sql=f"SELECT * FROM my_table WHERE dt='{date}'",
app_type='prompt',
field_mapping={'text': None},
result_table_name='my_result_table',
write_mode='append' # 追加模式
)
print(f"处理 {date}: {result['message']}")
3. 如何自定义超时时间?
使用 timeout 参数:
agent.set_config(
url="...",
app_id="...",
token="...",
aigc_app_id="...",
timeout=60 # 超时时间 60 秒
)
4. 如何只保存到 Excel 不保存到 Hive?
result = agent.query(
sql="...",
app_type='prompt',
field_mapping={'text': None},
cache_to_excel=True,
save_to_hive=False # 不保存到 Hive
)
5. 如何使用 DataFrame 作为数据源?
import pandas as pd
# 准备 DataFrame
df = pd.DataFrame({
'id': [1, 2, 3],
'question': ['问题1', '问题2', '问题3'],
'dt': ['20231201'] * 3
})
# 使用 DataFrame 作为数据源
result = agent.query(
df=df, # 传入 DataFrame
app_type='prompt',
field_mapping={'question': None}
)
6. 如何获取结果 DataFrame 进行后续处理?
result = agent.query(
sql="SELECT * FROM my_table",
app_type='prompt',
field_mapping={'text': None}
)
# 获取结果 DataFrame
if result['success'] and result['df'] is not None:
result_df = result['df']
# 进行后续处理
result_df.to_csv('output.csv', index=False)
filtered_df = result_df[result_df['score'] > 0.8]
7. 列名大小写不匹配怎么办?
不用担心!pypabhiveagent 支持大小写不敏感的列名匹配:
# DataFrame 列名是 'UserName',field_mapping 中可以使用 'username'
df = pd.DataFrame({'UserName': ['张三', '李四']})
result = agent.query(
df=df,
app_type='prompt',
field_mapping={'username': None} # 自动匹配到 'UserName'
)
版本信息
当前版本:1.0.1
主要特性:
- 支持 Hive SQL 查询和 DataFrame 输入
- 支持 Prompt 和 Agent 两种 AIGC 应用类型
- 大小写不敏感的列名匹配
- 完善的错误处理机制
- 灵活的存储选项(Excel 和 Hive)
许可证
MIT License - 详见 LICENSE 文件。
贡献
欢迎贡献!请随时提交 Pull Request。
支持
如有问题或建议,请提交 Issue。
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pypabhiveagent-1.0.3.tar.gz.
File metadata
- Download URL: pypabhiveagent-1.0.3.tar.gz
- Upload date:
- Size: 40.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bb5b3aba6faa32537365f9ef4f62ad7cc534d399eb2a6620e612351ca53d9c37
|
|
| MD5 |
7b05ea0fa044e09e2726ec6820ebefa0
|
|
| BLAKE2b-256 |
bda24e05a6ee57ff931ca32caf7781dd3ceb6174274d977ecf701b1e6f1a3184
|
File details
Details for the file pypabhiveagent-1.0.3-py3-none-any.whl.
File metadata
- Download URL: pypabhiveagent-1.0.3-py3-none-any.whl
- Upload date:
- Size: 36.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
98e23a620110c0d0ada1f68b9ad5274a3eb90d430bc3c25004c49f11988c6bba
|
|
| MD5 |
7869b614514d7be31ae665f6f739c50d
|
|
| BLAKE2b-256 |
52f773c2ddaba1cef1e927f5b16338548b6f8d03bc020402e899db4a4ae43276
|