SAGE Platform Services - Queue, Storage, and Service Abstractions
Project description
SAGE Platform
平台服务层 (L2) - SAGE 基础设施抽象
📋 Overview
SAGE Platform 提供核心基础设施抽象,位于基础层(sage-common)和执行引擎(sage-kernel)之间。这个第二层平台服务提供:
- 队列抽象:Python 和 RPC 队列的统一接口(Flownet 运行时对齐)
- 存储抽象:可插拔的键值存储后端
- 服务基类:构建 SAGE 服务的基础
- 平台接口:分布式系统的通用模式
该包使应用程序代码能够在本地和分布式执行模式之间无缝切换。
🧭 Governance / 团队协作制度
本仓库协作规范与质量门槛请参考:
.github/copilot-instructions.mdquickstart.shhooks/pre-commithooks/pre-push
✨ Features
- 多态队列:Python Queue 和 RPC Queue 的单一 API
- 可插拔存储:内存、Redis 和自定义存储后端
- 服务框架:构建平台服务的基类
- 类型安全:完整的类型提示和运行时验证
- 零开销:本地执行的最小抽象成本
🚀 Quick Start
使用队列
from sage.platform.queue import PythonQueueDescriptor
# 创建队列
queue_desc = PythonQueueDescriptor(maxsize=100)
queue_desc.put("message")
item = queue_desc.get()
print(item) # "message"
使用存储
from sage.platform.storage.kv_backend import DictKVBackend
# 创建存储
backend = DictKVBackend()
backend.set("user:123", {"name": "Alice", "age": 30})
user = backend.get("user:123")
print(user["name"]) # "Alice"
创建服务
from sage.platform.service import BaseService
class MyService(BaseService):
def setup(self):
self.logger.info("Service starting...")
def process(self, data):
return f"Processed: {data}"
def teardown(self):
self.logger.info("Service stopped")
service = MyService()
result = service.call("input data")
🚀 Installation
# Basic installation
pip install isage-platform
# Development installation
cd packages/sage-platform
pip install -e .
📦 Package Structure
sage-platform/
├── src/
│ └── sage/
│ └── platform/
│ ├── queue/ # Queue abstractions
│ ├── storage/ # Storage backends
│ └── service/ # Service base classes
├── tests/
├── pyproject.toml
└── README.md
组件
🔄 队列 (sage.platform.queue)
支持多种后端的多态队列描述符:
from sage.platform.queue import (
BaseQueueDescriptor,
PythonQueueDescriptor,
RPCQueueDescriptor,
)
# 创建 RPC 队列
queue_desc = RPCQueueDescriptor(queue_id="my_queue")
queue = queue_desc.queue_instance
# 使用队列操作
queue_desc.put(item)
item = queue_desc.get()
特性:
- 延迟初始化
- 序列化支持
- 跨进程通信
- 后端无关的 API
💾 存储 (sage.platform.storage)
键值存储抽象:
from sage.platform.storage.kv_backend import BaseKVBackend, DictKVBackend
# 使用内存后端
backend = DictKVBackend()
backend.set("key", "value")
value = backend.get("key")
# 使用自定义后端扩展
class RedisKVBackend(BaseKVBackend):
# 实现抽象方法
...
支持的操作:
get(key),set(key, value),delete(key)has(key),clear(),get_all_keys()- 磁盘持久化:
store_data_to_disk(),load_data_to_memory()
🔌 服务 (sage.platform.service)
SAGE 服务的基类:
from sage.platform.service import BaseService
class MyService(BaseService):
def __init__(self, config):
super().__init__(name="my_service")
self.config = config
def process(self, request):
# 服务逻辑
return response
📦 包结构
sage-platform/
├── src/
│ └── sage/
│ └── platform/
│ ├── __init__.py
│ ├── queue/ # 队列抽象
│ │ ├── base.py
│ │ ├── python_queue_descriptor.py
│ │ └── rpc_queue_descriptor.py
│ ├── storage/ # 存储后端
│ │ └── kv_backend.py
│ └── service/ # 服务基类
│ └── base.py
├── tests/
├── pyproject.toml
└── README.md
🚀 安装
基础安装
pip install isage-platform
开发安装
cd packages/sage-platform
pip install -e .
安装可选依赖
# 安装 Redis 支持(分布式存储)
pip install isage-platform[redis]
# 完整安装
pip install isage-platform[all]
📖 快速开始
使用队列
from sage.platform.queue import RPCQueueDescriptor
# 创建分布式队列
queue_desc = RPCQueueDescriptor(queue_id="my_distributed_queue")
# 生产者
queue_desc.put({"task": "process_data", "data": [1, 2, 3]})
# 消费者
task = queue_desc.get()
print(f"处理中: {task}")
# 检查队列状态
print(f"队列大小: {queue_desc.qsize()}")
print(f"是否为空: {queue_desc.empty()}")
使用存储
from sage.platform.storage.kv_backend import DictKVBackend
# 创建存储后端
storage = DictKVBackend()
# 存储数据
storage.set("user:1", {"name": "Alice", "age": 30})
storage.set("user:2", {"name": "Bob", "age": 25})
# 检索数据
user = storage.get("user:1")
print(f"用户: {user}")
# 列出所有键
keys = storage.get_all_keys()
print(f"所有键: {keys}")
# 持久化到磁盘
storage.store_data_to_disk("storage.pkl")
创建服务
from sage.platform.service import BaseService
class DataProcessingService(BaseService):
def __init__(self, config):
super().__init__(name="data_processing")
self.config = config
self.initialize()
def initialize(self):
"""初始化服务资源"""
self.logger.info(f"初始化 {self.name}")
def process(self, request):
"""处理传入请求"""
self.logger.debug(f"处理请求: {request}")
result = self._transform_data(request["data"])
return {"status": "success", "result": result}
def _transform_data(self, data):
# 服务逻辑
return [x * 2 for x in data]
# 使用服务
service = DataProcessingService({"param": "value"})
result = service.process({"data": [1, 2, 3]})
print(result) # {"status": "success", "result": [2, 4, 6]}
🔧 Configuration
服务可以通过环境变量或配置文件进行配置:
# platform_config.yaml
platform:
queue:
backend: rpc # 或 python
maxsize: 1000
storage:
backend: dict # 或 redis
persist: true
save_path: ./storage
架构位置
L1: sage-common ← 基础层
L2: sage-platform ← 当前层
L3: sage-kernel ← 执行引擎
sage-libs
L4: sage-middleware ← 领域组件
L5: sage-cli ← 命令行接口
sage-tools ← 开发工具
独立仓库 (不在 SAGE 核心架构中):
- sage-benchmark - 基准测试
- sage-examples - 应用示例
- sage-studio - Web UI
- sageLLM - LLM 推理引擎
设计原则
- 通用基础设施:平台服务不是 SAGE 特定的
- 后端无关:支持多种实现(Python、RPC、Redis 等)
- 最小依赖:仅依赖
sage-common - 可扩展性:易于添加新后端
为什么需要 L2 层?
原本这些抽象被分散在:
- Queue Descriptor 在
sage-kernel(L3) ✖️ - KV Backend 在
sage-middleware(L4) ✖️ - BaseService 在
sage-kernel(L3) ✖️
这造成了:
- 架构混乱(基础设施与业务逻辑混合)
- 依赖违规(L1 → L3)
- 有限的可重用性
通过创建 L2:
- ✅ 清晰的关注点分离
- ✅ 正确的依赖方向
- ✅ 更好的组件间可重用性
🧪 Testing
# 运行单元测试
pytest tests/unit
# 运行集成测试
pytest tests/integration
# 运行覆盖率测试
pytest --cov=sage.platform --cov-report=html
📚 Documentation
- 用户指南:查看 docs-public
- API 参考:查看包的文档字符串和类型提示
- 架构:查看 平台层设计
🤝 Contributing
欢迎贡献!请查看 CONTRIBUTING.md 了解指南。
📄 License
该项目采用 MIT 许可证 - 详情请查看 LICENSE 文件。
🔗 相关包
- sage-common:基础层 (L1) - 提供基本工具
- sage-kernel:执行引擎 (L3) - 使用平台抽象
- sage-middleware:服务层 (L4) - 使用存储和队列
- sage-libs:库层 (L5) - 使用所有平台服务
📮 支持
- 文档:https://intellistream.github.io/SAGE-Pub/
- 问题反馈:https://github.com/intellistream/SAGE/issues
- 讨论:https://github.com/intellistream/SAGE/discussions
SAGE 框架的一部分 | 主仓库
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file isage_platform-0.2.4.28.tar.gz.
File metadata
- Download URL: isage_platform-0.2.4.28.tar.gz
- Upload date:
- Size: 33.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
61cab4bc8fb0cfb98d183842e6169c31da19216faea02d42d0360e7e673d05f8
|
|
| MD5 |
77db58c3637db0c7e2f8a5006358027a
|
|
| BLAKE2b-256 |
1644e9321bfa2d19a12229027f7bc6f80d38e75970fac8c6d422cd9a1f2dbdb4
|
File details
Details for the file isage_platform-0.2.4.28-py2.py3-none-any.whl.
File metadata
- Download URL: isage_platform-0.2.4.28-py2.py3-none-any.whl
- Upload date:
- Size: 40.7 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
25f6add11b685c61e440cd1706b334acb4df5231a1869088a7ba16a776fefe37
|
|
| MD5 |
c82df3cd39c66ff3907932fb57cf0f6b
|
|
| BLAKE2b-256 |
d510a7148701d07dd007fce3c3ba1c7969be293f05f6a35f91658305c68c81f0
|