Skip to main content

AI并发处理框架(可嵌入、可扩展)

Project description

AI Processing Framework

一个轻量级、高并发、生产就绪的 AI 任务处理框架,专为批量调用大模型 API 而设计。

Python 3.10+ License: MIT Last Commit Issues

📖 完整文档💬 讨论🚀 快速开始📚 示例


🌟 为什么选择这个框架?

🚀 超高性能

处理速度可达 3,000+ 条/分钟(取决于模型和任务复杂度),基于纯异步架构,最大化 I/O 效率。

🔄 真正的多数据库支持

一行代码切换数据库,无需修改业务逻辑:

# SQLite(开发环境)
DB_TYPE=sqlite
DB_PATH=database.db

# PostgreSQL(生产环境)
DB_TYPE=postgresql
DB_HOST=localhost
DB_USER=postgres
DB_PASSWORD=secret

# MySQL(企业环境)
DB_TYPE=mysql
DB_HOST=localhost
DB_USER=root
DB_PASSWORD=secret

📦 开箱即用

内置 租约机制断路器重试策略优雅关机,无需额外开发即可投入生产。

🔌 多提供商无缝集成

支持 OpenAI、Anthropic、Azure 等主流 AI 提供商,自动负载均衡和故障转移。

📊 全方位可观测性

内置事件总线、Prometheus 指标、进度追踪,让你实时掌握系统状态。

🎯 零成本迁移

多数据库版本,只需修改配置,无需改动一行业务代码。


🌟 核心特性

  • 🚀 纯异步架构: 基于 asyncio,从 API 请求到数据库操作完全异步化,最大化 I/O 性能
  • 🔄 多数据库支持: SQLite → PostgreSQL → MySQL,一行配置切换,适配不同规模部署
  • 🔄 多提供商支持: 统一管理 OpenAI、Anthropic、Azure 等,自动负载均衡和故障转移
  • 🔗 链式任务执行: 支持基于 previous_task_id 的任务依赖,实现复杂的多步骤工作流
  • 📦 上下文数据传递: 后续任务可访问前置任务的处理结果,支持有状态的链式处理
  • ⚡ 自适应并发: AI 驱动的动态并发控制,根据成功率自动优化吞吐量
  • 🛡️ 企业级容错: 指数退避重试、断路器模式、租约式数据锁定
  • 🔄 自动重试机制: 智能重试失败任务,达到最大次数后自动标记失败并排除
  • 🔐 API 密钥加密: 内置 KeyManager 支持密钥加密存储,避免明文泄露
  • 📊 完整可观测性: 事件总线、Prometheus 指标、进度追踪、健康检查
  • 🔧 插件化架构: 处理器模式,添加新任务只需实现 4 个方法
  • 🎯 生产就绪: 优雅关机、资源清理、错误恢复、连接池管理
  • 🌍 极致可移植: 开发用 SQLite,生产用 PostgreSQL/MySQL,零代码改动
  • 📈 水平扩展: 基于 Lease 机制,支持多 Worker 分布式部署

📊 性能基准

翻译任务性能对比(1000 条文本,GPT-4o-mini)

数据库 平均处理速度 并发数 CPU 占用 内存占用 适用场景
SQLite 2,800 条/分钟 5-10 低 (12%) 低 (150MB) 开发、测试、小规模
PostgreSQL 3,200 条/分钟 10-30 中 (25%) 中 (300MB) 生产、高并发、大规模
MySQL 3,000 条/分钟 8-25 中 (22%) 中 (280MB) 企业、Web 应用、通用

测试环境: AMD Ryzen 7 5800X / 32GB RAM / NVMe SSD

并发处理能力

  • 单机 SQLite: 最多 20 个 Worker
  • 单机 PostgreSQL: 最多 50 个 Worker(推荐 30)
  • 单机 MySQL: 最多 40 个 Worker(推荐 25)

🚀 快速开始

1. 安装依赖

# 核心依赖(最小安装)
pip install aiohttp aiosqlite python-dotenv tenacity pydantic sqlalchemy

# 根据数据库选择安装
pip install asyncpg      # PostgreSQL
pip install aiomysql     # MySQL

# AI 提供商
pip install openai       # OpenAI
pip install anthropic    # Claude

2. 配置环境变量

创建 .env 文件:

# 数据库配置(SQLite 示例)
DB_TYPE=sqlite
DB_PATH=database.db

# AI 提供商配置
API_KEY_1=sk-your-openai-key
BASE_URL_1=https://api.openai.com/v1
MODEL_1=gpt-4o-mini
MODEL_TYPE_1=openai
NAME_1=openai-primary

# 系统配置
PER_TASK_NUM=5              # 初始并发数
TASK_SLEEP_TIME=2           # 批次间隔(秒)
TIMEOUT=300                 # API 超时(秒)
MAX_RETRIES=3               # 最大重试次数

3. 运行你的第一个任务

import asyncio
from ai_processing.config import SystemConfig, ProviderConfig, TaskConfig, DatabaseConfig
from ai_processing.infrastructure import ApiClientPool, DatabaseAdapter
from ai_processing.concurrency.async_executor import AsyncExecutor
from ai_processing.types import TaskContext
from ai_processing.core.base_task_processor import BaseTaskProcessor

class SimpleTranslationProcessor(BaseTaskProcessor):
    """简单翻译处理器示例"""

    def get_filter_condition(self, task_type: str):
        from sqlalchemy import text
        return text("translated_content IS NULL")

    async def process_item(self, item, task_type: str = "") -> bool:
        # 构建 prompt
        prompt = f"请将以下文本翻译成英文:\n\n{item.content}"

        # 调用 LLM
        messages = [{"role": "user", "content": prompt}]
        response, need_retry = await self.api_client.call_with_retry(messages)

        if need_retry:
            return False

        # 提取结果并保存
        translated = self.api_client.extract_content(response)
        await self.db_adapter.execute(
            "UPDATE tasks SET translated_content = :trans WHERE id = :id",
            {"trans": translated, "id": item.id}
        )

        self.context.increment_success()
        return True

# 运行
asyncio.run(main())

完整示例和详细说明: 请查看 快速开始指南


📖 文档导航

入门指南

核心功能

部署与运维

高级主题

  • 扩展开发 - 自定义处理器、AI 提供商和数据库后端
  • 高级特性 - 钩子、事件总线、调度器、分布式部署

🏗️ 核心架构

┌─────────────────────────────────────────────────────────────┐
│                      应用层 (Your Code)                     │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │
│  │   main.py    │  │ Processor A  │  │ Processor B  │       │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘       │
└─────────┼──────────────────┼──────────────────┼─────────────┘
          │                  │                  │
          ▼                  ▼                  ▼
┌─────────────────────────────────────────────────────────────┐
│                    框架核心层 (Framework)                    │
│  ┌──────────────────────────────────────────────────────┐   │
│  │              AsyncExecutor (执行引擎)                 │   │
│  │  • 批次管理  • 并发控制  • 进度追踪  • 错误恢复        │   │
│  └────┬─────────────────────────────────────────────┬───┘   │
│       │                                             │       │
│  ┌────▼─────────┐  ┌──────────────┐  ┌─────────────▼────┐   │
│  │ TaskContext  │  │ ProgressTrack │  │ MaintenanceCoord │   │
│  │ (上下文)     │  │ (进度追踪)   │  │ (维护协调)        │   │
│  └──────────────┘  └──────────────┘  └──────────────────┘   │
└─────────────────────────────────────────────────────────────┘
          │                  │                  │
          ▼                  ▼                  ▼
┌─────────────────────────────────────────────────────────────┐
│                     基础设施层 (Infrastructure)              │
│  ┌──────────────┐  ┌─────────────────┐  ┌──────────────┐    │
│  │ApiClientPool │  │  DatabaseAdapter│  │Persistence   │    │
│  │(API池)       │  │ (数据库适配)    │  │Manager       │     │
│  └──────┬───────┘  └──────┬──────────┘  └──────────────┘    │
│         │                  │                                │
│  ┌──────▼───────┐  ┌──────▼──────────────┐                  │
│  │ LLMClient    │  │ DatabaseBackend     │                  │
│  │ (HTTP客户端) │  │ ├─ SQLiteBackend    │                  │
│  └──────────────┘  │ ├─ PostgreSQLBackend│                  │
│                    │ └─ MySQLBackend     │                  │
│                    └─────────────────────┘                  │
└─────────────────────────────────────────────────────────────┘
          │                  │
          ▼                  ▼
┌─────────────────────────────────────────────────────────────┐
│                      外部服务 (External)                    │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │
│  │   OpenAI     │  │  Anthropic   │  │   Database   │       │
│  └──────────────┘  └──────────────┘  └──────────────┘       │
└─────────────────────────────────────────────────────────────┘

详细架构说明: 请查看 框架架构文档


🤝 贡献指南

我们欢迎所有形式的贡献!请查看 CONTRIBUTING.md 了解如何参与项目。

贡献方式

  • 🐛 报告 Bug
  • 💡 提出新功能建议
  • 📝 改进文档
  • 🔧 提交代码

📄 许可证

本项目采用 MIT License 开源协议。


🔗 相关链接


如果这个项目对你有帮助,请给我们一个 ⭐️ Star!

Made with ❤️ by the AI Processing Framework Team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ai_processing-0.9.0.tar.gz (89.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ai_processing-0.9.0-py3-none-any.whl (103.2 kB view details)

Uploaded Python 3

File details

Details for the file ai_processing-0.9.0.tar.gz.

File metadata

  • Download URL: ai_processing-0.9.0.tar.gz
  • Upload date:
  • Size: 89.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for ai_processing-0.9.0.tar.gz
Algorithm Hash digest
SHA256 a9f68cdb253f138d090eb64505f9ec875b67c870468f1234eab4b8554a97fd5c
MD5 7837989b46ffe3a0ef8ec17e660a98d7
BLAKE2b-256 aa10f3d284c5452551fd9317beafa03875f7b97cec61c7ed674d11e47fd89cbe

See more details on using hashes here.

File details

Details for the file ai_processing-0.9.0-py3-none-any.whl.

File metadata

  • Download URL: ai_processing-0.9.0-py3-none-any.whl
  • Upload date:
  • Size: 103.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for ai_processing-0.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 006973cbae3b20c55bc33fdc4d9a5d65f2feb5b25cd0cefa71e8b1b6a6ad250b
MD5 a48a1833495e35b1b6c38d839e18c220
BLAKE2b-256 f9005f225db4d90d74ca843df764ab82c6b7503dd55cd4adcff9e65b68b3754c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page