Skip to main content

SAGE Middleware Module

Project description

SAGE 微服务架构改造

📋 改造概述

本次改造将SAGE的集成式memory service重构为真正的服务化架构,符合您提出的要求:

  1. 使用sage-kernel的BaseServiceTask - 所有服务继承自sage.kernels.runtime.service.base_service_task.BaseServiceTask
  2. 服务作为Task运行 - 服务本质上是特殊的Task,可以在DAG中作为节点运行
  3. 移除服务启动脚本 - 不再有独立的服务启动器,服务由应用程序在构建DAG时初始化
  4. 支持Ray分布式 - 服务可以作为Ray Actor运行,支持集群部署

🏗️ 新架构

用户应用
    ↓ 注册服务
SAGE Environment
    ↓ 创建DAG
Service Tasks (BaseServiceTask)
    ├── KVService (键值存储)
    ├── VDBService (向量数据库) 
    └── MemoryOrchestratorService (记忆编排)
        ↓ 队列通信
    其他Function Tasks

🔧 核心组件

1. KVService

  • 文件: src/sage/service/kv/kv_service.py
  • 基类: BaseServiceTask
  • 后端: 内存 或 Redis
  • 方法: get(), put(), delete(), list_keys(), clear()

2. VDBService

  • 文件: src/sage/service/vdb/vdb_service.py
  • 基类: BaseServiceTask
  • 后端: ChromaDB
  • 方法: add(), search(), get(), delete(), update()

3. MemoryOrchestratorService

  • 文件: src/sage/service/memory_orchestrator/memory_service.py
  • 基类: BaseServiceTask
  • 功能: 协调KV和VDB服务,提供统一记忆管理
  • 方法: store_memory(), search_memories(), get_memory()

🚀 使用方式

第一步:注册服务

from sage.api.local_environment import LocalEnvironment
from sage.middleware.services import (
    create_kv_service_factory,
    create_vdb_service_factory,
    create_memory_service_factory
)

# 创建环境
env = LocalEnvironment("my_app", {})

# 注册服务
env.register_service("kv_service", KVService, create_kv_service_factory())
env.register_service("vdb_service", VDBService, create_vdb_service_factory()) 
env.register_service("memory_service", MemoryOrchestratorService, create_memory_service_factory())

第二步:在Function中使用

from sage.api.function.base_function import BaseFunction

class MyProcessor(BaseFunction):
    def process(self, data):
        # 直接调用服务
        self.call_service["kv_service"].put("key1", data)
        result = self.call_service["memory_service"].store_memory(
            session_id="session_1",
            content=data['content'],
            vector=data['vector']
        )
        return result

第三步:构建DAG并运行

# 创建数据流
stream = env.from_kafka_source(...)

# 应用处理函数
processed = stream.map(MyProcessor())

# 运行(服务自动启动)
env.execute()

📁 项目结构

packages/sage-middleware/
├── src/sage/service/
│   ├── __init__.py                     # 统一接口
│   ├── kv/kv_service.py               # KV服务任务
│   ├── vdb/vdb_service.py             # VDB服务任务
│   ├── memory_orchestrator/memory_service.py  # Memory编排服务
│   └── memory/                        # 旧版服务(兼容)
├── examples/
│   ├── dag_microservices_demo.py      # 完整使用演示
│   └── run_demo.sh                    # 快速启动脚本
├── MICROSERVICES_GUIDE.md             # 详细使用指南
└── README.md                          # 本文件

🎯 关键特性

1. 真正的服务化

  • 每个服务都是独立的BaseServiceTask
  • 服务可以单独运行、测试和扩展
  • 支持不同的技术栈和存储后端

2. DAG集成

  • 服务作为Task节点在DAG中运行
  • 与其他数据处理Task无缝集成
  • 统一的生命周期管理

3. 队列通信

  • 使用SAGE的高性能队列机制
  • 支持同步和异步调用
  • 自动请求/响应匹配

4. 分布式支持

  • 服务可以作为Ray Actor运行
  • 支持跨节点服务调用
  • 自动负载均衡和故障恢复

5. 应用控制

  • 应用程序控制服务的启动和配置
  • 不需要外部服务管理器
  • 灵活的服务组合和依赖关系

📊 性能优势

  1. 高效通信: 队列机制比HTTP调用更高效
  2. 内存优化: 服务和数据处理共享内存空间
  3. 批处理: 支持批量服务调用
  4. 并发处理: 服务可以并发处理多个请求

🔄 迁移路径

从HTTP微服务迁移

  • 移除HTTP客户端代码
  • 使用self.call_service[service_name].method()替代HTTP调用
  • 服务注册到SAGE环境而不是独立启动

从集成式服务迁移

  • 将大型服务拆分为独立的Service Task
  • 使用服务调用替代直接方法调用
  • 保持相同的业务逻辑和API

🧪 运行演示

# 快速启动
cd packages/sage-middleware
./examples/run_demo.sh

# 或直接运行Python
python examples/dag_microservices_demo.py

📚 参考文档


✅ 改造成果

这次改造成功实现了您要求的所有目标:

  1. 继承sage-kernel的BaseServiceTask - 所有服务都继承正确的基类
  2. 服务即Task - 服务本质上是在DAG中运行的特殊Task
  3. 应用控制 - 用户在构建DAG时初始化服务,而非独立启动脚本
  4. 支持Ray - 服务可以作为Ray Actor分布式运行
  5. 队列通信 - 使用SAGE统一的高性能队列机制
  6. 向后兼容 - 保留原有API兼容性

现在SAGE拥有了真正的服务化架构,每个服务都是独立的Task,可以灵活组合、分布式部署,完全符合现代微服务架构的设计理念!

SAGE Middleware - 中间件组件

SAGE Middleware提供中间件服务,包含LLM中间件、API服务、任务队列等企业级功能。

主要功能

LLM中间件服务

  • 多模型支持: OpenAI、Ollama、智谱AI、Cohere等
  • 统一API: 标准化的LLM调用接口
  • 高性能推理: 基于vLLM的优化推理服务
  • 模型管理: 动态模型加载和卸载

API服务

  • RESTful API: 基于FastAPI的高性能API服务
  • 认证授权: JWT令牌和密码加密支持
  • 服务发现: 自动服务注册和发现

任务队列

  • 异步处理: 基于Celery的分布式任务队列
  • 监控界面: Flower监控和管理界面
  • 容错机制: 任务重试和错误处理

向量检索

  • FAISS集成: 高性能向量相似度搜索
  • BM25搜索: 传统文本检索算法
  • 混合检索: 向量和关键词混合检索

安装

pip install intellistream-sage-middleware

基本使用

启动LLM服务

from sage.middleware.llm import LLMService

# 创建LLM服务
service = LLMService()

# 注册模型
service.register_model("gpt-3.5-turbo", provider="openai")
service.register_model("llama2", provider="ollama")

# 启动服务
service.start()

API调用

import requests

# 文本生成
response = requests.post("http://localhost:8000/generate", json={
    "model": "gpt-3.5-turbo",
    "prompt": "Hello, how are you?",
    "max_tokens": 100
})

result = response.json()
print(result["text"])

向量检索

from sage.middleware.retrieval import VectorStore

# 创建向量存储
store = VectorStore()

# 添加文档
store.add_documents([
    "This is document 1",
    "This is document 2"
])

# 搜索
results = store.search("document", top_k=5)
for result in results:
    print(f"Score: {result.score}, Text: {result.text}")

配置

中间件服务可以通过环境变量或配置文件进行配置:

# config.yaml
llm:
  providers:
    openai:
      api_key: "your-api-key"
    ollama:
      base_url: "http://localhost:11434"

api:
  host: "0.0.0.0"
  port: 8000
  
queue:
  broker: "redis://localhost:6379"
  backend: "redis://localhost:6379"

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

isage_middleware-0.1.1-py3-none-any.whl (446.7 kB view details)

Uploaded Python 3

File details

Details for the file isage_middleware-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for isage_middleware-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 bdfb6f83ed8ad2b3dccaf467a4886bd0932980be982b98485ad8c32798a84672
MD5 b31fdcc7419d33b19bbc045b4cc84bc6
BLAKE2b-256 9cc55994bdf420df172e71f1f892ca1816d2e16ec3848cb9993412e65119f85a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page