Skip to main content

面向 Agent 和 RAG 的文档处理 Pipeline 客户端

Project description

xParse Client

面向 Agent 和 RAG 的新一代文档处理 Python SDK

PyPI version Python License


⚠️ 重要提示

Pipeline 类已在 v0.3.0 版本中完全移除。

如果你的代码中使用了 Pipeline 类,请立即查看 完整迁移指南


目录


SDK 安装

[!NOTE] Python 版本要求

本 SDK 支持 Python 3.9 及以上版本。一旦某个 Python 版本达到其官方生命周期结束日期,将提供 3 个月的宽限期供用户升级。

SDK 支持多种包管理器安装。

uv(推荐)

uv 是一个快速的 Python 包管理器,推荐用于现代 Python 项目。

uv add xparse-client

pip

pip install xparse-client

poetry

poetry add xparse-client

可选依赖

根据使用的 Destination 类型安装额外依赖:

# Milvus 向量数据库支持
pip install xparse-client[milvus]

# Qdrant 向量数据库支持
pip install xparse-client[qdrant]

# 完整安装(包含所有可选依赖)
pip install xparse-client[all]

Shell 脚本使用

使用 uv 可以快速编写独立的 Python 脚本,无需创建完整项目:

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.9"
# dependencies = [
#     "xparse-client",
# ]
# ///

from xparse_client import XParseClient

# 初始化客户端
client = XParseClient.from_env()

# 处理文档
with open("document.pdf", "rb") as f:
    result = client.parse.partition(file=f, filename="document.pdf")
    print(f"解析出 {len(result.elements)} 个元素")

保存为 process.py 后,直接运行:

uv run process.py

快速开始

1. 环境配置

首先设置 TextIn 认证信息:

export TEXTIN_APP_ID="your-app-id"
export TEXTIN_SECRET_CODE="your-secret-code"

可以在 TextIn 开发者控制台 获取认证凭证。

2. 单文件处理

场景 1:解析文档

from xparse_client import XParseClient
from xparse_client.models import ParseConfig

client = XParseClient.from_env()

# 解析文档
with open("document.pdf", "rb") as f:
    result = client.parse.partition(
        file=f,
        filename="document.pdf",
        config=ParseConfig(provider="textin")
    )

print(f"解析出 {len(result.elements)} 个元素")

场景 2:提取结构化数据

from xparse_client.models import ExtractConfig

# 定义提取 schema
schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string", "description": "文档标题"},
        "author": {"type": "string", "description": "作者"},
        "date": {"type": "string", "description": "日期"}
    },
    "required": ["title", "author", "date"]
}

with open("document.pdf", "rb") as f:
    result = client.extract.extract(
        file=f,
        filename="document.pdf",
        extract_config=ExtractConfig(schema=schema)
    )

# Extract API 返回的结构化数据在 result.result 中
print(result.result)

3. 本地批处理

批量处理本地文件并写入向量数据库:

from xparse_client import XParseClient
from xparse_client.connectors import LocalSource, MilvusDestination
from xparse_client.models import ParseStage, ChunkStage, EmbedStage
from xparse_client.models import ParseConfig, ChunkConfig, EmbedConfig

client = XParseClient.from_env()

result = client.local.run_workflow(
    source=LocalSource(directory="./docs", pattern=["*.pdf"]),
    destination=MilvusDestination(
        db_path="./vectors.db",
        collection_name="documents",
        dimension=1024
    ),
    stages=[
        ParseStage(config=ParseConfig(provider="textin")),
        ChunkStage(config=ChunkConfig(strategy="by_title")),
        EmbedStage(config=EmbedConfig(provider="qwen"))
    ]
)

print(f"处理完成: {result.success}/{result.total}")

4. 异步任务处理

处理大文件时使用服务端异步任务:

# 创建异步任务
with open("large_document.pdf", "rb") as f:
    job = client.parse.create_async_job(
        file=f,
        filename="large_document.pdf",
        config=ParseConfig(provider="textin")
    )

print(f"任务已创建: {job.job_id}")

# 等待任务完成
result = client.parse.wait_for_result(
    job_id=job.job_id,
    timeout=300.0,
    poll_interval=5.0
)

if result.is_completed:
    print(f"任务完成,结果 URL: {result.result_url}")
    print("💡 异步任务返回的是 result_url,需要另外下载来获取解析结果")
elif result.is_failed:
    print(f"任务失败: {result.error_message}")

核心特性

📥 灵活的数据源

  • S3 兼容存储:支持 MinIO、AWS S3、阿里云 OSS、腾讯云 COS、火山引擎 TOS、华为云 OBS
  • 本地文件系统:支持本地目录批量处理
  • FTP/SMB:支持 FTP 和 SMB 协议文件系统

详细配置请查看:云厂商配置指南

📤 灵活的输出目的地

  • 向量数据库:Milvus、Zilliz Cloud、Qdrant
  • 文件存储:本地文件系统、S3 兼容存储

🔄 统一的 Pipeline API

使用 /api/xparse/pipeline 一次性完成 parse → chunk → embed 全流程:

result = client.pipeline.execute(
    file=f,
    filename="document.pdf",
    stages=[ParseStage(...), ChunkStage(...), EmbedStage(...)]
)

📊 详细的处理统计

Pipeline API 返回详细的处理统计信息:

# 使用 Pipeline API 时可以获取统计信息
result = client.pipeline.execute(
    file=f,
    filename="document.pdf",
    stages=[ParseStage(...), ChunkStage(...), EmbedStage(...)]
)

# 访问统计信息
print(f"原始元素: {result.stats.original_elements}")
print(f"分块后: {result.stats.chunked_elements}")
print(f"向量化: {result.stats.embedded_elements}")

🔧 易于扩展

基于抽象类设计,可轻松添加自定义 Source 和 Destination:

from xparse_client.connectors.sources import Source

class MyCustomSource(Source):
    def list_files(self) -> List[str]:
        # 实现文件列表逻辑
        pass

    def read_file(self, file_path: str) -> Tuple[bytes, Dict[str, Any]]:
        # 实现文件读取逻辑
        pass

错误处理

错误类层次结构

SDK 提供了完善的错误类型系统:

错误类 说明 使用场景
XParseError 基础错误类 捕获所有 SDK 错误
AuthenticationError 认证失败 app_id 或 secret_code 错误
RateLimitError 超过速率限制 需要等待或提升配额
APIError API 请求错误 服务端错误
ValidationError 参数验证错误 检查输入参数

错误处理示例

from xparse_client import XParseClient
from xparse_client.exceptions import (
    XParseError,
    RateLimitError,
    AuthenticationError,
    APIError
)

client = XParseClient.from_env()

try:
    with open("document.pdf", "rb") as f:
        result = client.parse.partition(file=f, filename="document.pdf")

except AuthenticationError as e:
    print(f"认证失败: {e.message}")
    print("请检查 TEXTIN_APP_ID 和 TEXTIN_SECRET_CODE 环境变量")

except RateLimitError as e:
    print(f"超过速率限制: {e.message}")
    print(f"建议等待 {e.retry_after} 秒后重试")
    # 自动重试
    import time
    time.sleep(e.retry_after)
    result = client.parse.partition(file=f, filename="document.pdf")

except APIError as e:
    print(f"API 错误: {e.message}")
    print(f"请求 ID: {e.request_id}")  # 用于技术支持排查
    print(f"状态码: {e.status_code}")

except XParseError as e:
    print(f"SDK 错误: {e.message}")

获取请求 ID

当遇到问题需要技术支持时,提供 request_id 可以帮助快速定位问题:

try:
    result = client.parse.partition(...)
except APIError as e:
    # 记录 request_id
    logger.error(f"请求失败,request_id: {e.request_id}")

资源管理

XParseClient 实现了上下文管理器协议,会自动管理底层 HTTP 连接和资源释放。

使用上下文管理器(推荐)

from xparse_client import XParseClient

def main():
    with XParseClient.from_env() as client:
        # 应用逻辑
        result = client.parse.partition(...)
        # 退出时自动关闭连接

异步场景

async def amain():
    async with XParseClient.from_env() as client:
        result = await client.parse.partition_async(...)

最佳实践

✅ 推荐做法:

# 长时间运行的程序,复用 client 实例
with XParseClient.from_env() as client:
    for file_path in file_list:
        result = client.parse.partition(...)

⚠️ 不推荐:

# 避免频繁创建和销毁 client
for file_path in file_list:
    client = XParseClient.from_env()  # ❌ 每次都创建新实例
    result = client.parse.partition(...)

何时可以不使用上下文管理器

  • 短生命周期脚本(处理单个文件后即退出)
  • Jupyter Notebook 交互式环境
  • 快速原型开发

在这些场景下,Python 的垃圾回收机制会自动清理资源:

# 短脚本中可以不使用 with
client = XParseClient.from_env()
result = client.parse.partition(...)
# 脚本退出时自动清理

配置说明

认证配置

方式 1:环境变量(推荐)

export TEXTIN_APP_ID="your-app-id"
export TEXTIN_SECRET_CODE="your-secret-code"
client = XParseClient.from_env()

方式 2:直接传参

client = XParseClient(
    app_id="your-app-id",
    secret_code="your-secret-code"
)

方式 3:配置文件

import os
from dotenv import load_dotenv

load_dotenv()  # 从 .env 文件加载
client = XParseClient.from_env()

.env 文件内容:

TEXTIN_APP_ID=your-app-id
TEXTIN_SECRET_CODE=your-secret-code

数据源配置

本地文件系统

from xparse_client.connectors import LocalSource

source = LocalSource(
    directory='./documents',
    pattern=['*.pdf', '*.docx'],  # 可选,文件类型过滤
    recursive=True                # 可选,递归子目录
)

S3 兼容存储

from xparse_client.connectors import S3Source

source = S3Source(
    endpoint='https://s3.amazonaws.com',
    access_key='your-access-key',
    secret_key='your-secret-key',
    bucket='my-bucket',
    prefix='documents/',           # 可选,指定前缀
    region='us-east-1',
    pattern=['*.pdf'],             # 可选,文件类型过滤
    recursive=True                  # 可选,递归子目录
)

支持的云厂商:

  • AWS S3
  • MinIO
  • 阿里云 OSS
  • 腾讯云 COS
  • 火山引擎 TOS
  • 华为云 OBS

详细配置请查看:云厂商配置指南

FTP 数据源

from xparse_client.connectors import FtpSource

source = FtpSource(
    host='ftp.example.com',
    port=21,
    username='user',
    password='pass',
    pattern=['*.pdf'],
    recursive=True
)

SMB 数据源

from xparse_client.connectors import SmbSource

source = SmbSource(
    host='smb.example.com',
    share_name='documents',
    username='user',
    password='pass',
    domain='WORKGROUP',
    pattern=['**/*.pdf'],
    recursive=True
)

目的地配置

本地文件系统

from xparse_client.connectors import LocalDestination

destination = LocalDestination(
    output_dir='./output'
)

输出文件格式:{filename}_{timestamp}.json

Milvus 本地数据库

from xparse_client.connectors import MilvusDestination

destination = MilvusDestination(
    db_path='./vectors.db',
    collection_name='documents',
    dimension=1024  # 必须与 embed 模型维度一致
)

Collection 必需字段:

  • element_id - 元素唯一标识
  • text - 文本内容
  • embeddings - 向量
  • record_id - 记录 ID

Zilliz Cloud

destination = MilvusDestination(
    db_path='https://your-instance.cloud.zilliz.com.cn',
    collection_name='documents',
    dimension=1024,
    api_key='your-api-key'
)

Qdrant

from xparse_client.connectors import QdrantDestination

# 本地 Qdrant
destination = QdrantDestination(
    url='http://localhost:6333',
    collection_name='documents',
    dimension=1024
)

# Qdrant Cloud
destination = QdrantDestination(
    url='https://your-cluster.cloud.qdrant.io',
    collection_name='documents',
    dimension=1024,
    api_key='your-api-key'
)

S3 兼容存储

from xparse_client.connectors import S3Destination

destination = S3Destination(
    endpoint='https://s3.amazonaws.com',
    access_key='your-access-key',
    secret_key='your-secret-key',
    bucket='my-bucket',
    prefix='output/',
    region='us-east-1'
)

高级配置

超时和重试

client = XParseClient(
    app_id="...",
    secret_code="...",
    timeout=120.0,      # 请求超时时间(秒),默认 630
    max_retries=3,      # 最大重试次数,默认 3
    retry_delay=1.0     # 重试间隔(秒),默认 1.0
)

自定义 API 地址

client = XParseClient(
    app_id="...",
    secret_code="...",
    base_url="https://custom-api.example.com/api/xparse"
)

自定义 HTTP 客户端

import httpx

http_client = httpx.Client(
    headers={"x-custom-header": "value"},
    proxies="http://proxy.example.com:8080"
)

client = XParseClient(
    app_id="...",
    secret_code="...",
    client=http_client
)

使用示例

完整示例代码请查看 example/ 目录。

示例 1:基础 API 使用

解析文档、提取结构化数据、Pipeline 完整流程。

from xparse_client import XParseClient
from xparse_client.models import ParseConfig

client = XParseClient.from_env()

# 解析单个文档
with open("document.pdf", "rb") as f:
    result = client.parse.partition(
        file=f,
        filename="document.pdf",
        config=ParseConfig(provider="textin")
    )

print(f"解析出 {len(result.elements)} 个元素")

完整示例:example/1_basic_api_usage.py

示例 2:服务端异步任务

处理大文件时使用服务端异步任务,支持轮询和自动等待。

# 创建异步任务
job = client.parse.create_async_job(file=f, filename="large.pdf")

# 方式 1:自动等待完成
result = client.parse.wait_for_result(job_id=job.job_id, timeout=300)
if result.is_completed:
    print(f"任务完成,结果 URL: {result.result_url}")
    # 注意:异步任务返回的是 result_url,需要另外下载来获取解析结果

# 方式 2:手动轮询
while True:
    status = client.parse.get_result(job_id=job.job_id)
    if status.is_completed:
        print(f"结果 URL: {status.result_url}")
        break
    time.sleep(5)

完整示例:example/2_async_job.py

示例 3:本地批处理工作流

批量处理本地文件,支持进度回调和错误处理。

result = client.local.run_workflow(
    source=LocalSource(directory="./docs", pattern=["*.pdf"]),
    destination=MilvusDestination(db_path="./vectors.db", ...),
    stages=[ParseStage(...), ChunkStage(...), EmbedStage(...)],
    progress_callback=lambda c, t, m: print(f"[{c}/{t}] {m}"),
    on_error="continue",  # 遇到错误继续处理
    max_retries=3
)

print(f"成功: {result.success}, 失败: {result.failed}")

完整示例:example/3_local_workflow.py

示例 4:生产环境最佳实践

包含错误处理、日志记录、进度回调、自定义 Source 的完整工作流。

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 自定义 Source(按文件大小过滤)
class FilteredLocalSource(Source):
    def __init__(self, directory, min_size=0, max_size=10*1024*1024):
        self.directory = Path(directory)
        self.min_size = min_size
        self.max_size = max_size

    def list_files(self) -> List[str]:
        files = []
        for f in self.directory.glob("**/*.pdf"):
            size = f.stat().st_size
            if self.min_size <= size <= self.max_size:
                files.append(str(f))
        return files

    # ... read_file 实现

完整示例:example/4_advanced_workflow.py


调试与日志

启用调试日志

from xparse_client import XParseClient
import logging

# 配置日志级别
logging.basicConfig(level=logging.DEBUG)

# 传入自定义 logger
client = XParseClient(
    app_id="...",
    secret_code="...",
    debug_logger=logging.getLogger("xparse_client")
)

日志级别说明

级别 用途 输出内容
DEBUG 开发调试 详细的请求/响应日志
INFO 正常运行 关键操作日志(默认)
WARNING 警告信息 潜在问题提示
ERROR 错误信息 错误详情和堆栈

日志示例

import logging

# 配置文件日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('xparse.log'),
        logging.StreamHandler()
    ]
)

client = XParseClient.from_env()

# 日志会自动记录关键操作
result = client.parse.partition(...)
# 输出: 2024-01-15 10:30:00 - xparse_client - INFO - Parsing document.pdf

本地开发

环境准备

推荐使用 uv 管理开发环境:

# 克隆仓库
git clone https://github.com/intsig-textin/xparse-python-client.git
cd xparse-python-client

# 安装开发依赖
uv sync --dev

# 运行测试
make test

# 代码格式化
make format

# 运行示例
python example/1_basic_api_usage.py

项目结构

xparse-client/
├── xparse_client/          # 主包
│   ├── __init__.py
│   ├── client.py           # XParseClient 主类
│   ├── models/             # 数据模型
│   └── connectors/         # Source/Destination
├── tests/                  # 测试
│   ├── unit/               # 单元测试
│   └── integration/        # 集成测试
├── example/                # 示例代码
├── docs/                   # 文档
└── Makefile                # 开发命令

常用命令

# 运行所有测试
make test

# 运行单元测试
make test-unit

# 代码覆盖率
make test-cov

# 代码格式化
make format

# 代码检查
make lint

# 清理缓存
make clean

贡献流程

  1. Fork 本仓库
  2. 创建特性分支:git checkout -b feature/amazing-feature
  3. 编写代码和测试
  4. 确保测试通过:make test
  5. 提交更改:git commit -m 'Add amazing feature'
  6. 推送到分支:git push origin feature/amazing-feature
  7. 提交 Pull Request

测试要求

  • 所有新功能必须包含单元测试
  • 确保所有测试通过:make test
  • 代码覆盖率不低于 80%:make test-cov
  • 遵循代码风格规范:make format && make lint

版本成熟度

当前版本:v0.3.0

  • ✅ 核心 API 已稳定
  • ✅ 生产环境可用
  • ⚠️ 破坏性变更请查看 CHANGELOG.md

我们遵循语义化版本规范(SemVer),主版本号变更可能包含破坏性变更。建议锁定版本号:

# 锁定主版本
pip install "xparse-client>=0.3,<1.0"

# 锁定次版本
pip install "xparse-client==0.3.*"

相关资源

📖 文档

🔗 链接

💰 计费

Pipeline 接口按页计费,具体价格请查看:通用文档解析


许可证

MIT License


故障排查

常见问题

1. 认证失败

错误AuthenticationError: Invalid app_id or secret_code

解决方案

  1. 检查环境变量:echo $TEXTIN_APP_ID
  2. 登录 TextIn 控制台 确认凭证
  3. 确保没有多余的空格或引号

2. 文件过大

错误ValidationError: File size exceeds maximum limit of 100MB

解决方案

  1. 压缩文件
  2. 分割成多个文件
  3. 联系技术支持提升限制

3. 向量维度不匹配

错误MilvusException: dimension mismatch

解决方案: 确保 Destination 的 dimension 参数与 embed 模型一致(当前所有模型都是 1024 维):

destination = MilvusDestination(
    dimension=1024  # 必须与 embed 模型维度一致
)

4. 连接超时

错误TimeoutException: Request timeout

解决方案

  1. 检查网络连接
  2. 增加超时时间:client = XParseClient(..., timeout=300.0)
  3. 使用异步任务处理大文件

5. Rate Limit

错误RateLimitError: Rate limit exceeded

解决方案

  1. 等待 retry_after 秒后重试
  2. 联系客服提升配额
  3. 使用批量处理降低请求频率

获取帮助

如果遇到问题,可以通过以下方式获取帮助:

  1. 查看文档https://docs.textin.com/pipeline/overview
  2. 提交 IssueGitHub Issues
  3. 联系技术支持:提供 request_id 可以加快问题定位
try:
    result = client.parse.partition(...)
except APIError as e:
    print(f"请联系技术支持,request_id: {e.request_id}")

感谢使用 xParse Client!

⭐ Star on GitHub | 📖 Read the Docs | 💬 Discussions

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xparse_client-0.3.0b11.tar.gz (92.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

xparse_client-0.3.0b11-py3-none-any.whl (120.8 kB view details)

Uploaded Python 3

File details

Details for the file xparse_client-0.3.0b11.tar.gz.

File metadata

  • Download URL: xparse_client-0.3.0b11.tar.gz
  • Upload date:
  • Size: 92.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for xparse_client-0.3.0b11.tar.gz
Algorithm Hash digest
SHA256 cc1b856cf60d05b84aabe6ea62d6816666627ac307c28d415b6eeb01bcf4d194
MD5 eda9215851334e73711e3cea6af9a97e
BLAKE2b-256 493981eb739ee9abf435993af9493a7e8f4b6b511a7e365a614e48e8cf7aeb28

See more details on using hashes here.

File details

Details for the file xparse_client-0.3.0b11-py3-none-any.whl.

File metadata

File hashes

Hashes for xparse_client-0.3.0b11-py3-none-any.whl
Algorithm Hash digest
SHA256 0c78016470c096a163d4b9e697f7828db9882efefe8da045f835ecaf0a8ba439
MD5 9fca42ac607ed60cee5ce5df949d0f88
BLAKE2b-256 83da3e4195c22baece1d234c4e662635e14838c8fba17c09c6f8f0ab732b42c3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page