Communication Layer for sageLLM distributed inference
Project description
sagellm-comm
Protocol Compliance (Mandatory)
- MUST follow Protocol v0.1: https://github.com/intellistream/sagellm-docs/blob/main/docs/specs/protocol_v0.1.md
- Any globally shared definitions (fields, error codes, metrics, IDs, schemas) MUST be added to Protocol first.
通信硬件抽象层 - 为 sageLLM 提供分布式通信能力(NCCL/HCCL/Gloo)
Overview
sagellm-comm 是 sageLLM 的通信硬件抽象层,与 sagellm-backend(计算硬件抽象层)平行,专注于分布式推理的通信需求。
⚠️ 架构说明:本包不依赖 sagellm-backend,两者是平行的 L1 层抽象。计算相关操作请使用 sagellm-backend。
功能一览
| 功能 | 任务 | 说明 |
|---|---|---|
| 拓扑发现 | Task1.1 | 自动发现节点、GPU、互联拓扑 |
| 集合操作 | Task1.2 | AllReduce, AllGather, ReduceScatter 等 |
| 计算/通信重叠 | Task1.4, 1.8 | Multi-stream overlap, pipeline |
| 国产互联适配 | Task1.5, 1.6 | CXL/UB/RDMA 适配器 |
| 跨节点通信 | Task1.7 | 跨节点集合操作优化 |
注意: Task1.3 (KV Transfer) 已移至
sagellm-kv-cache仓库,本包提供底层CommBackend供其使用。
架构定位
┌─────────────────────────────────────────────────────────────────────────────┐
│ sagellm-core (L2) │
│ 引擎层:LLMEngine / Scheduler / Executor / ModelRunner │
│ │
│ ⬇️ 计算相关调用 ⬇️ 通信相关调用 │
├─────────────────────────────────┬────────────────────────────────────────────┤
│ sagellm-backend (L1) │ sagellm-comm (L1) ← 本仓库 │
│ 计算硬件抽象层 │ 通信硬件抽象层 │
│ │ │
│ • Device / Stream / Event │ • CommBackend 通信后端抽象 │
│ • Memory Allocator │ • Topology 拓扑发现 │
│ • Kernel Registry │ • Collective Ops (all_reduce 等) │
│ • Attention Backend │ • P2P Ops (send/recv) │
│ • KV Block 基础操作 │ • CommGroup 通信组管理 │
│ │ • 计算通信重叠 (Overlap) │
│ Providers: │ │
│ CUDA│Ascend│Kunlun│DCU│CPU │ Backends: NCCL│HCCL│RCCL│Gloo │
├─────────────────────────────────┴────────────────────────────────────────────┤
│ sagellm-protocol (L0) │
│ 协议定义:Schema / Errors / Types │
└──────────────────────────────────────────────────────────────────────────────┘
职责边界
| 职责 | sagellm-comm | sagellm-backend |
|---|---|---|
| 通信操作 (all_reduce) | ✅ | ❌ |
| 拓扑发现 | ✅ | ❌ |
| P2P 通信 (send/recv) | ✅ | ❌ |
| 通信组管理 | ✅ | ❌ |
| 计算/通信重叠 | ✅ | ❌ |
| Device/Stream/Event | ❌ | ✅ |
| 内存分配与管理 | ❌ | ✅ |
| Kernel 注册/选择 | ❌ | ✅ |
关键约束:
- ✅ 本仓库负责:通信后端抽象、拓扑发现、集合操作、P2P 通信、通信组管理
- ❌ 不依赖:sagellm-backend(两者是平行的 L1 层)
- ❌ 不负责:设备内存管理(由 backend 负责)
- 🔗 被使用于:sagellm-core(分布式推理)、sagellm-kv-cache(KV 传输)
与 sagellm-backend 的协作方式
在分布式推理场景中,core 层同时使用 backend 和 comm:
from sagellm_backend import get_provider
from sagellm_comm import CommBackend, ReduceOp
# backend: 计算相关
backend = get_provider("cuda")
tensor = backend.allocate(1024, DType.FP16)
# comm: 通信相关
comm = CommBackend.create("nccl")
comm.init_process_group(world_size=4, rank=0)
comm.all_reduce(tensor, op=ReduceOp.SUM)
📦 职责边界图
┌─────────────────────────────────────────────────────────────────────┐
│ sagellm-core │
│ (分布式推理:TP/PP 并行) │
└────────────────────────────┬────────────────────────────────────────┘
│ 使用 CommBackend 进行张量通信
▼
┌─────────────────────────────────────────────────────────────────────┐
│ sagellm-comm (本仓库) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Topology │ │ Collective │ │ Overlap │ │ Domestic │ │
│ │ (Task1.1) │ │ (Task1.2) │ │ (Task1.4/8) │ │ (Task1.5) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ CommBackend Interface │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ NCCL │ │ HCCL │ │ RCCL │ │ Gloo │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────────┘
▲
│ KV Transfer 使用 CommBackend
┌────────────────────────────┴────────────────────────────────────────┐
│ sagellm-kv-cache │
│ (KV Transfer 使用本包的网络能力) │
└─────────────────────────────────────────────────────────────────────┘
🔍 Research Context
sagellm-comm is conceptually similar to the Transfer Engine in Mooncake:
| Aspect | Mooncake Transfer Engine | sagellm-comm |
|---|---|---|
| Core Function | KV cache data movement | Network communication layer |
| Scope | Cross-node KV transfer | Topology + collectives + overlap |
| Focus | RDMA/NVLink optimization | Hardware-agnostic abstraction |
| KV Transfer | Integrated | Provided to sagellm-kv-cache |
Key differences:
- sagellm-comm provides a unified communication layer that integrates with sageLLM's backend abstraction, supporting NCCL, HCCL, and domestic interconnects (CXL/UB/RDMA)
- Compute/communication overlap (Task1.4/1.8) is a first-class design goal
- Adapter pattern ensures zero vendor lock-in: swappable backends without core logic changes
- KV Transfer (Task1.3) is implemented in sagellm-kv-cache, using this package's
CommBackendfor data-aware optimization
Installation
# 从 PyPI 安装(自动安装依赖)
pip install isagellm-comm
🚀 开发者快速开始
git clone git@github.com:intellistream/sagellm-comm.git
cd sagellm-comm
./quickstart.sh --standard # 标准模式:从 PyPI 安装稳定包
./quickstart.sh --dev # 开发模式:先 PyPI,再本地 editable 覆盖(--no-deps)
# 查看完整帮助
./quickstart.sh --help
# 或手动安装
pip install -e ".[dev]"
模式说明:
--standard:稳定/发布导向,依赖优先从 PyPI 安装。--dev:在standard基础上做本地 editable 覆盖,避免重复依赖解析。- 每次执行安装前会动态清理同前缀历史安装包(默认
isagellm*)。
运行测试:
pytest tests/ -v
⚠️ GitHub Actions 账单阻塞时的本地替代 CI
当 Actions 因 billing/quota 无法启动时,可在仓库根目录执行:
bash scripts/local_ci_fallback.sh
该脚本按 ci.yml/version-source-guard.yml 核心路径执行:pre-commit、version source guard、pytest+coverage、build+twine check。
💡
isagellm-protocol会作为依赖自动从 PyPI 安装;sagellm-comm与sagellm-backend是平行层,无直接依赖。
Quick Start
from sagellm_comm import CommGroup, Topology, CollectiveOps
# Discover topology
topology = Topology.discover()
# Create communication group
group = CommGroup.create(world_size=4, rank=0)
# Collective operations (for distributed inference)
CollectiveOps.all_reduce(tensor, group=group)
CollectiveOps.all_gather(tensor, group=group)
Note: For KV block transfer, use
sagellm-kv-cache.KVTransferEnginewhich utilizes this package'sCommBackendinternally.
Benchmarking
Quick Benchmark
Run all-reduce micro-benchmark with default settings:
# Single process (world_size=1)
python examples/benchmark_all_reduce.py
# Multi-process with torchrun (2 processes)
torchrun --nproc_per_node=2 examples/benchmark_all_reduce.py
CLI Tool
Use the CLI for more control:
# Run with custom message sizes
torchrun --nproc_per_node=2 -m benchmark.run_bench --message-sizes 1024 4096 16384
# Specify backend (default: gloo)
torchrun --nproc_per_node=4 -m benchmark.run_bench --backend gloo
# Adaptive selection (default): backend + algorithm auto policy
torchrun --nproc_per_node=4 -m benchmark.run_bench --backend auto --algo auto
# Explicit algorithm
torchrun --nproc_per_node=4 -m benchmark.run_bench --algo tree
# Custom selector policy config
python -m benchmark.run_bench --selector-config ./selector_policy.json
# Custom warmup and benchmark iterations
python -m benchmark.run_bench --warmup 20 --iters 200
# Specify output directory
python -m benchmark.run_bench --output-dir ./my_results
Output Format
Benchmark results are exported in both JSON and CSV formats:
.benchmarks/collective/
├── all_reduce_ws2_20260217_120000.json # JSON format
└── all_reduce_ws2_20260217_120000.csv # CSV format
Output fields:
bytes: Message size in byteslatency_ms: Latency in millisecondsbandwidth_gbps: Bandwidth in GB/salgo_id: Algorithm identifier (e.g., "ring")backend_kind: Backend type (e.g., "gloo", "nccl")topology_source: Topology source (e.g., "env", "cuda")world_size: Number of processesop_type: Operation type (e.g., "all_reduce")data_type: Tensor data type (e.g., "float32")warmup_iters: Number of warmup iterationsbenchmark_iters: Number of benchmark iterationsselection_rule_id: Rule ID used by adaptive selectorselection_reason: Human-readable selection reason for traceabilitytimestamp: Benchmark timestamp
Python API
from sagellm_comm import get_comm_backend
from sagellm_comm.benchmark import run_all_reduce_benchmark
# Initialize backend
backend = get_comm_backend("gloo")
backend.init(rank=0, world_size=1, master_addr="localhost", master_port=29500)
# Run benchmark
results = run_all_reduce_benchmark(
backend=backend,
message_sizes=[1024, 4096, 16384], # Element counts
output_dir=".benchmarks/collective",
warmup_iters=10,
benchmark_iters=100,
)
# Results are automatically exported to JSON + CSV
print(f"Completed {len(results)} benchmark runs")
Overlap Benchmark
Measure communication-computation overlap efficiency:
# Run overlap benchmark with default settings
python -m sagellm_comm.benchmark.run_overlap_bench
# Custom communication sizes and compute durations
python -m sagellm_comm.benchmark.run_overlap_bench \
--comm-bytes 1024 4096 16384 \
--compute-ms 10 50 100
# Custom iterations
python -m sagellm_comm.benchmark.run_overlap_bench --warmup 10 --iters 20
# Specify output directory
python -m sagellm_comm.benchmark.run_overlap_bench --output-dir ./.benchmarks/overlap
Output fields:
comm_bytes: Communication payload size in bytescompute_duration_ms: Configured computation duration in millisecondscomm_submit_ms: Time to submit communication operationcomm_wait_ms: Time waiting for communication to completecompute_ms: Time for computationtotal_ms: Total elapsed timeoverlap_ratio: Percentage of overlap (0-1), higher is betteroverlap_efficiency: Actual overlap efficiency vs theoretical (0-1)theoretical_sequential_ms: Expected time without overlaptheoretical_overlap_ms: Expected time with perfect overlap
Python API:
from sagellm_comm.benchmark import run_overlap_benchmark
# Run overlap benchmark
results = run_overlap_benchmark(
comm_bytes=[1024, 4096, 16384], # Communication sizes
compute_durations_ms=[10, 50, 100], # Compute durations
output_dir=".benchmarks/overlap",
warmup_iters=5,
benchmark_iters=10,
)
# Results are automatically exported to JSON + CSV
for r in results:
print(f"Overlap ratio: {r.overlap_ratio:.2%}, Efficiency: {r.overlap_efficiency:.2%}")
Supported Backends
- NCCL (NVIDIA)
- HCCL (Huawei Ascend)
- RCCL (AMD ROCm)
- Gloo (CPU fallback)
Dependencies
isagellm-protocol>=0.1.0- Protocol definitionsisagellm-backend>=0.1.0- Backend abstraction
Development
Setup
# Install dev dependencies
pip install -e ".[dev]"
# Install pre-commit hooks
pip install pre-commit
pre-commit install
Pre-commit Hooks
This project uses pre-commit to ensure code quality:
# Run on all files
pre-commit run --all-files
# Run on staged files (automatic on git commit)
git commit
Configured hooks:
- Ruff linter and formatter
- MyPy type checking
- Trailing whitespace, end-of-file fixer
- YAML/TOML/JSON validation
Testing
# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ -v --cov=sagellm_comm --cov-report=html
# Run specific test file
pytest tests/test_imports.py -v
Code Quality
# Format code
ruff format .
# Lint code
ruff check . --fix
# Type check
mypy src/
提交流程:先创建 Issue,分支开发并通过测试与 lint,再发起 PR。
版本信息
- 当前版本:0.4.0.6
- 变更记录:见 CHANGELOG.md
从 sagellm-backend 迁移
如果你之前从 sagellm-backend 调用通信 API,请按以下方式迁移:
迁移前(v0.3.x)
# ❌ 旧版:通信操作在 backend(已废弃)
from sagellm_backend import get_provider
backend = get_provider("cuda")
backend.all_reduce(tensor, op="sum")
迁移后(v0.4.0+)
# ✅ 新版:通信操作使用 sagellm-comm
from sagellm_comm import CommBackend, ReduceOp
comm = CommBackend.create("nccl") # 或 "hccl"/"gloo"
comm.init_process_group(world_size=4, rank=0)
comm.all_reduce(tensor, op=ReduceOp.SUM)
API 对照表
| 旧 API (backend) | 新 API (comm) | 说明 |
|---|---|---|
backend.all_reduce() |
comm.all_reduce() |
集合归约 |
backend.all_gather() |
comm.all_gather() |
集合收集 |
backend.broadcast() |
comm.broadcast() |
广播 |
backend.send() |
comm.send() |
P2P 发送 |
backend.recv() |
comm.recv() |
P2P 接收 |
| N/A | Topology.discover() |
拓扑发现(新增) |
| N/A | CommGroup.create() |
通信组管理(新增) |
详细迁移指南
完整的迁移指南请参阅:
License
Private - IntelliStream Research Project
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file isagellm_comm-0.5.4.15.tar.gz.
File metadata
- Download URL: isagellm_comm-0.5.4.15.tar.gz
- Upload date:
- Size: 171.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a51917ddc35458874f5471a8d9a28b580a977d17009c4cb0a5dba1c4272a3b6a
|
|
| MD5 |
d87e87cbaa4892c3ab1fe5ff350853ff
|
|
| BLAKE2b-256 |
5bd589bc5d9778cb64f5192b75f62a7f1888d4acd63c2cabe4ed7254fa33288b
|
File details
Details for the file isagellm_comm-0.5.4.15-py2.py3-none-any.whl.
File metadata
- Download URL: isagellm_comm-0.5.4.15-py2.py3-none-any.whl
- Upload date:
- Size: 215.1 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d6841de2a584d6a541153c1bd392acc4b954832676e5386134e296aefb7e18f2
|
|
| MD5 |
0ff2a39cdcf1401663964c74ade7da9c
|
|
| BLAKE2b-256 |
b6e4773191bbd90b08a1471d4c14fdd1fb9b78d07811eb216588d0425cb605ce
|