AI并发处理框架(可嵌入、可扩展)
Project description
AI Processing Framework
🌟 为什么选择这个框架?
🚀 超高性能
处理速度可达 3,000+ 条/分钟(取决于模型和任务复杂度),基于纯异步架构,最大化 I/O 效率。
🔄 真正的多数据库支持
一行代码切换数据库,无需修改业务逻辑:
# SQLite(开发环境)
DB_TYPE=sqlite
DB_PATH=database.db
# PostgreSQL(生产环境)
DB_TYPE=postgresql
DB_HOST=localhost
DB_USER=postgres
DB_PASSWORD=secret
# MySQL(企业环境)
DB_TYPE=mysql
DB_HOST=localhost
DB_USER=root
DB_PASSWORD=secret
📦 开箱即用
内置 租约机制、断路器、重试策略、优雅关机,无需额外开发即可投入生产。
🔌 多提供商无缝集成
支持 OpenAI、Anthropic、Azure 等主流 AI 提供商,自动负载均衡和故障转移。
📊 全方位可观测性
内置事件总线、Prometheus 指标、进度追踪,让你实时掌握系统状态。
🎯 零成本迁移
多数据库版本,只需修改配置,无需改动一行业务代码。
🌟 核心特性
- 🚀 纯异步架构: 基于
asyncio,从 API 请求到数据库操作完全异步化,最大化 I/O 性能 - 🔄 多数据库支持: SQLite → PostgreSQL → MySQL,一行配置切换,适配不同规模部署
- 🔄 多提供商支持: 统一管理 OpenAI、Anthropic、Azure 等,自动负载均衡和故障转移
- 🔗 链式任务执行: 支持基于
previous_task_id的任务依赖,实现复杂的多步骤工作流 - 📦 上下文数据传递: 后续任务可访问前置任务的处理结果,支持有状态的链式处理
- ⚡ 自适应并发: AI 驱动的动态并发控制,根据成功率自动优化吞吐量
- 🛡️ 企业级容错: 指数退避重试、断路器模式、租约式数据锁定
- 🔄 自动重试机制: 智能重试失败任务,达到最大次数后自动标记失败并排除
- 🔐 API 密钥加密: 内置 KeyManager 支持密钥加密存储,避免明文泄露
- 📊 完整可观测性: 事件总线、Prometheus 指标、进度追踪、健康检查
- 🔧 插件化架构: 处理器模式,添加新任务只需实现 4 个方法
- 🎯 生产就绪: 优雅关机、资源清理、错误恢复、连接池管理
- 🌍 极致可移植: 开发用 SQLite,生产用 PostgreSQL/MySQL,零代码改动
- 📈 水平扩展: 基于 Lease 机制,支持多 Worker 分布式部署
📊 性能基准
翻译任务性能对比(1000 条文本,GPT-4o-mini)
| 数据库 | 平均处理速度 | 并发数 | CPU 占用 | 内存占用 | 适用场景 |
|---|---|---|---|---|---|
| SQLite | 2,800 条/分钟 | 5-10 | 低 (12%) | 低 (150MB) | 开发、测试、小规模 |
| PostgreSQL | 3,200 条/分钟 | 10-30 | 中 (25%) | 中 (300MB) | 生产、高并发、大规模 |
| MySQL | 3,000 条/分钟 | 8-25 | 中 (22%) | 中 (280MB) | 企业、Web 应用、通用 |
测试环境: AMD Ryzen 7 5800X / 32GB RAM / NVMe SSD
并发处理能力
- 单机 SQLite: 最多 20 个 Worker
- 单机 PostgreSQL: 最多 50 个 Worker(推荐 30)
- 单机 MySQL: 最多 40 个 Worker(推荐 25)
🚀 快速开始
1. 安装依赖
# 核心依赖(最小安装)
pip install aiohttp aiosqlite python-dotenv tenacity pydantic sqlalchemy
# 根据数据库选择安装
pip install asyncpg # PostgreSQL
pip install aiomysql # MySQL
# AI 提供商
pip install openai # OpenAI
pip install anthropic # Claude
2. 配置环境变量
创建 .env 文件:
# 数据库配置(SQLite 示例)
DB_TYPE=sqlite
DB_PATH=database.db
# AI 提供商配置
API_KEY_1=sk-your-openai-key
BASE_URL_1=https://api.openai.com/v1
MODEL_1=gpt-4o-mini
MODEL_TYPE_1=openai
NAME_1=openai-primary
# 系统配置
PER_TASK_NUM=5 # 初始并发数
TASK_SLEEP_TIME=2 # 批次间隔(秒)
TIMEOUT=300 # API 超时(秒)
MAX_RETRIES=3 # 最大重试次数
3. 运行你的第一个任务
import asyncio
from ai_processing.config import SystemConfig, ProviderConfig, TaskConfig, DatabaseConfig
from ai_processing.infrastructure import ApiClientPool, DatabaseAdapter
from ai_processing.concurrency.async_executor import AsyncExecutor
from ai_processing.types import TaskContext
from ai_processing.core.base_task_processor import BaseTaskProcessor
class SimpleTranslationProcessor(BaseTaskProcessor):
"""简单翻译处理器示例"""
def get_filter_condition(self, task_type: str):
from sqlalchemy import text
return text("translated_content IS NULL")
async def process_item(self, item, task_type: str = "") -> bool:
# 构建 prompt
prompt = f"请将以下文本翻译成英文:\n\n{item.content}"
# 调用 LLM
messages = [{"role": "user", "content": prompt}]
response, need_retry = await self.api_client.call_with_retry(messages)
if need_retry:
return False
# 提取结果并保存
translated = self.api_client.extract_content(response)
await self.db_adapter.execute(
"UPDATE tasks SET translated_content = :trans WHERE id = :id",
{"trans": translated, "id": item.id}
)
self.context.increment_success()
return True
# 运行
asyncio.run(main())
完整示例和详细说明: 请查看 快速开始指南
📖 文档导航
入门指南
核心功能
部署与运维
高级主题
🏗️ 核心架构
┌─────────────────────────────────────────────────────────────┐
│ 应用层 (Your Code) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ main.py │ │ Processor A │ │ Processor B │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ 框架核心层 (Framework) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ AsyncExecutor (执行引擎) │ │
│ │ • 批次管理 • 并发控制 • 进度追踪 • 错误恢复 │ │
│ └────┬─────────────────────────────────────────────┬───┘ │
│ │ │ │
│ ┌────▼─────────┐ ┌──────────────┐ ┌─────────────▼────┐ │
│ │ TaskContext │ │ ProgressTrack │ │ MaintenanceCoord │ │
│ │ (上下文) │ │ (进度追踪) │ │ (维护协调) │ │
│ └──────────────┘ └──────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ 基础设施层 (Infrastructure) │
│ ┌──────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ApiClientPool │ │ DatabaseAdapter│ │Persistence │ │
│ │(API池) │ │ (数据库适配) │ │Manager │ │
│ └──────┬───────┘ └──────┬──────────┘ └──────────────┘ │
│ │ │ │
│ ┌──────▼───────┐ ┌──────▼──────────────┐ │
│ │ LLMClient │ │ DatabaseBackend │ │
│ │ (HTTP客户端) │ │ ├─ SQLiteBackend │ │
│ └──────────────┘ │ ├─ PostgreSQLBackend│ │
│ │ └─ MySQLBackend │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ 外部服务 (External) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ OpenAI │ │ Anthropic │ │ Database │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
详细架构说明: 请查看 框架架构文档
🤝 贡献指南
我们欢迎所有形式的贡献!请查看 CONTRIBUTING.md 了解如何参与项目。
贡献方式
- 🐛 报告 Bug
- 💡 提出新功能建议
- 📝 改进文档
- 🔧 提交代码
📄 许可证
本项目采用 MIT License 开源协议。
🔗 相关链接
- GitHub: yourusername/ai-processing-framework
- 文档: 完整文档
- 讨论: GitHub Discussions
- 问题反馈: GitHub Issues
如果这个项目对你有帮助,请给我们一个 ⭐️ Star!
Made with ❤️ by the AI Processing Framework Team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ai_processing-0.9.0.tar.gz.
File metadata
- Download URL: ai_processing-0.9.0.tar.gz
- Upload date:
- Size: 89.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a9f68cdb253f138d090eb64505f9ec875b67c870468f1234eab4b8554a97fd5c
|
|
| MD5 |
7837989b46ffe3a0ef8ec17e660a98d7
|
|
| BLAKE2b-256 |
aa10f3d284c5452551fd9317beafa03875f7b97cec61c7ed674d11e47fd89cbe
|
File details
Details for the file ai_processing-0.9.0-py3-none-any.whl.
File metadata
- Download URL: ai_processing-0.9.0-py3-none-any.whl
- Upload date:
- Size: 103.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
006973cbae3b20c55bc33fdc4d9a5d65f2feb5b25cd0cefa71e8b1b6a6ad250b
|
|
| MD5 |
a48a1833495e35b1b6c38d839e18c220
|
|
| BLAKE2b-256 |
f9005f225db4d90d74ca843df764ab82c6b7503dd55cd4adcff9e65b68b3754c
|