Skip to main content

Communication Layer for sageLLM distributed inference

Project description

sagellm-comm

Protocol Compliance (Mandatory)

CI PyPI version Python Version codecov

通信硬件抽象层 - 为 sageLLM 提供分布式通信能力(NCCL/HCCL/Gloo)

Overview

sagellm-comm 是 sageLLM 的通信硬件抽象层,与 sagellm-backend(计算硬件抽象层)平行,专注于分布式推理的通信需求。

⚠️ 架构说明:本包不依赖 sagellm-backend,两者是平行的 L1 层抽象。计算相关操作请使用 sagellm-backend

功能一览

功能 任务 说明
拓扑发现 Task1.1 自动发现节点、GPU、互联拓扑
集合操作 Task1.2 AllReduce, AllGather, ReduceScatter 等
计算/通信重叠 Task1.4, 1.8 Multi-stream overlap, pipeline
国产互联适配 Task1.5, 1.6 CXL/UB/RDMA 适配器
跨节点通信 Task1.7 跨节点集合操作优化

注意: Task1.3 (KV Transfer) 已移至 sagellm-kv-cache 仓库,本包提供底层 CommBackend 供其使用。

架构定位

┌─────────────────────────────────────────────────────────────────────────────┐
│                           sagellm-core (L2)                                  │
│            引擎层:LLMEngine / Scheduler / Executor / ModelRunner            │
│                                                                              │
│          ⬇️ 计算相关调用                      ⬇️ 通信相关调用                  │
├─────────────────────────────────┬────────────────────────────────────────────┤
│     sagellm-backend (L1)        │       sagellm-comm (L1) ← 本仓库           │
│     计算硬件抽象层               │       通信硬件抽象层                        │
│                                 │                                            │
│  • Device / Stream / Event      │  • CommBackend 通信后端抽象                │
│  • Memory Allocator             │  • Topology 拓扑发现                       │
│  • Kernel Registry              │  • Collective Ops (all_reduce 等)          │
│  • Attention Backend            │  • P2P Ops (send/recv)                     │
│  • KV Block 基础操作            │  • CommGroup 通信组管理                    │
│                                 │  • 计算通信重叠 (Overlap)                  │
│  Providers:                     │                                            │
│  CUDA│Ascend│Kunlun│DCU│CPU     │  Backends: NCCL│HCCL│RCCL│Gloo            │
├─────────────────────────────────┴────────────────────────────────────────────┤
│                         sagellm-protocol (L0)                                │
│                      协议定义:Schema / Errors / Types                        │
└──────────────────────────────────────────────────────────────────────────────┘

职责边界

职责 sagellm-comm sagellm-backend
通信操作 (all_reduce)
拓扑发现
P2P 通信 (send/recv)
通信组管理
计算/通信重叠
Device/Stream/Event
内存分配与管理
Kernel 注册/选择

关键约束

  • 本仓库负责:通信后端抽象、拓扑发现、集合操作、P2P 通信、通信组管理
  • 不依赖:sagellm-backend(两者是平行的 L1 层)
  • 不负责:设备内存管理(由 backend 负责)
  • 🔗 被使用于:sagellm-core(分布式推理)、sagellm-kv-cache(KV 传输)

与 sagellm-backend 的协作方式

在分布式推理场景中,core 层同时使用 backend 和 comm:

from sagellm_backend import get_provider
from sagellm_comm import CommBackend, ReduceOp

# backend: 计算相关
backend = get_provider("cuda")
tensor = backend.allocate(1024, DType.FP16)

# comm: 通信相关
comm = CommBackend.create("nccl")
comm.init_process_group(world_size=4, rank=0)
comm.all_reduce(tensor, op=ReduceOp.SUM)

📦 职责边界图

┌─────────────────────────────────────────────────────────────────────┐
│                         sagellm-core                                 │
│                    (分布式推理:TP/PP 并行)                           │
└────────────────────────────┬────────────────────────────────────────┘
                             │ 使用 CommBackend 进行张量通信
                             ▼
┌─────────────────────────────────────────────────────────────────────┐
│                       sagellm-comm (本仓库)                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │  Topology   │  │ Collective  │  │   Overlap   │  │  Domestic   │ │
│  │  (Task1.1)  │  │  (Task1.2)  │  │ (Task1.4/8) │  │  (Task1.5)  │ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘ │
│                      CommBackend Interface                           │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │
│  │   NCCL   │  │   HCCL   │  │   RCCL   │  │   Gloo   │            │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘            │
└─────────────────────────────────────────────────────────────────────┘
                             ▲
                             │ KV Transfer 使用 CommBackend
┌────────────────────────────┴────────────────────────────────────────┐
│                      sagellm-kv-cache                                │
│                   (KV Transfer 使用本包的网络能力)                    │
└─────────────────────────────────────────────────────────────────────┘

🔍 Research Context

sagellm-comm is conceptually similar to the Transfer Engine in Mooncake:

Aspect Mooncake Transfer Engine sagellm-comm
Core Function KV cache data movement Network communication layer
Scope Cross-node KV transfer Topology + collectives + overlap
Focus RDMA/NVLink optimization Hardware-agnostic abstraction
KV Transfer Integrated Provided to sagellm-kv-cache

Key differences:

  • sagellm-comm provides a unified communication layer that integrates with sageLLM's backend abstraction, supporting NCCL, HCCL, and domestic interconnects (CXL/UB/RDMA)
  • Compute/communication overlap (Task1.4/1.8) is a first-class design goal
  • Adapter pattern ensures zero vendor lock-in: swappable backends without core logic changes
  • KV Transfer (Task1.3) is implemented in sagellm-kv-cache, using this package's CommBackend for data-aware optimization

Installation

# 从 PyPI 安装(自动安装依赖)
pip install isagellm-comm

🚀 开发者快速开始

git clone git@github.com:intellistream/sagellm-comm.git
cd sagellm-comm
./quickstart.sh --standard   # 标准模式:从 PyPI 安装稳定包
./quickstart.sh --dev        # 开发模式:先 PyPI,再本地 editable 覆盖(--no-deps)

# 查看完整帮助
./quickstart.sh --help

# 或手动安装
pip install -e ".[dev]"

模式说明:

  • --standard:稳定/发布导向,依赖优先从 PyPI 安装。
  • --dev:在 standard 基础上做本地 editable 覆盖,避免重复依赖解析。
  • 每次执行安装前会动态清理同前缀历史安装包(默认 isagellm*)。

运行测试:

pytest tests/ -v

⚠️ GitHub Actions 账单阻塞时的本地替代 CI

当 Actions 因 billing/quota 无法启动时,可在仓库根目录执行:

bash scripts/local_ci_fallback.sh

该脚本按 ci.yml/version-source-guard.yml 核心路径执行:pre-commit、version source guard、pytest+coverage、build+twine check。

💡 isagellm-protocol 会作为依赖自动从 PyPI 安装;sagellm-commsagellm-backend 是平行层,无直接依赖。

Quick Start

from sagellm_comm import CommGroup, Topology, CollectiveOps

# Discover topology
topology = Topology.discover()

# Create communication group
group = CommGroup.create(world_size=4, rank=0)

# Collective operations (for distributed inference)
CollectiveOps.all_reduce(tensor, group=group)
CollectiveOps.all_gather(tensor, group=group)

Note: For KV block transfer, use sagellm-kv-cache.KVTransferEngine which utilizes this package's CommBackend internally.

Benchmarking

Quick Benchmark

Run all-reduce micro-benchmark with default settings:

# Single process (world_size=1)
python examples/benchmark_all_reduce.py

# Multi-process with torchrun (2 processes)
torchrun --nproc_per_node=2 examples/benchmark_all_reduce.py

CLI Tool

Use the CLI for more control:

# Run with custom message sizes
torchrun --nproc_per_node=2 -m benchmark.run_bench --message-sizes 1024 4096 16384

# Specify backend (default: gloo)
torchrun --nproc_per_node=4 -m benchmark.run_bench --backend gloo

# Adaptive selection (default): backend + algorithm auto policy
torchrun --nproc_per_node=4 -m benchmark.run_bench --backend auto --algo auto

# Explicit algorithm
torchrun --nproc_per_node=4 -m benchmark.run_bench --algo tree

# Custom selector policy config
python -m benchmark.run_bench --selector-config ./selector_policy.json

# Custom warmup and benchmark iterations
python -m benchmark.run_bench --warmup 20 --iters 200

# Specify output directory
python -m benchmark.run_bench --output-dir ./my_results

Output Format

Benchmark results are exported in both JSON and CSV formats:

.benchmarks/collective/
├── all_reduce_ws2_20260217_120000.json  # JSON format
└── all_reduce_ws2_20260217_120000.csv   # CSV format

Output fields:

  • bytes: Message size in bytes
  • latency_ms: Latency in milliseconds
  • bandwidth_gbps: Bandwidth in GB/s
  • algo_id: Algorithm identifier (e.g., "ring")
  • backend_kind: Backend type (e.g., "gloo", "nccl")
  • topology_source: Topology source (e.g., "env", "cuda")
  • world_size: Number of processes
  • op_type: Operation type (e.g., "all_reduce")
  • data_type: Tensor data type (e.g., "float32")
  • warmup_iters: Number of warmup iterations
  • benchmark_iters: Number of benchmark iterations
  • selection_rule_id: Rule ID used by adaptive selector
  • selection_reason: Human-readable selection reason for traceability
  • timestamp: Benchmark timestamp

Python API

from sagellm_comm import get_comm_backend
from sagellm_comm.benchmark import run_all_reduce_benchmark

# Initialize backend
backend = get_comm_backend("gloo")
backend.init(rank=0, world_size=1, master_addr="localhost", master_port=29500)

# Run benchmark
results = run_all_reduce_benchmark(
    backend=backend,
    message_sizes=[1024, 4096, 16384],  # Element counts
    output_dir=".benchmarks/collective",
    warmup_iters=10,
    benchmark_iters=100,
)

# Results are automatically exported to JSON + CSV
print(f"Completed {len(results)} benchmark runs")

Overlap Benchmark

Measure communication-computation overlap efficiency:

# Run overlap benchmark with default settings
python -m sagellm_comm.benchmark.run_overlap_bench

# Custom communication sizes and compute durations
python -m sagellm_comm.benchmark.run_overlap_bench \
    --comm-bytes 1024 4096 16384 \
    --compute-ms 10 50 100

# Custom iterations
python -m sagellm_comm.benchmark.run_overlap_bench --warmup 10 --iters 20

# Specify output directory
python -m sagellm_comm.benchmark.run_overlap_bench --output-dir ./.benchmarks/overlap

Output fields:

  • comm_bytes: Communication payload size in bytes
  • compute_duration_ms: Configured computation duration in milliseconds
  • comm_submit_ms: Time to submit communication operation
  • comm_wait_ms: Time waiting for communication to complete
  • compute_ms: Time for computation
  • total_ms: Total elapsed time
  • overlap_ratio: Percentage of overlap (0-1), higher is better
  • overlap_efficiency: Actual overlap efficiency vs theoretical (0-1)
  • theoretical_sequential_ms: Expected time without overlap
  • theoretical_overlap_ms: Expected time with perfect overlap

Python API:

from sagellm_comm.benchmark import run_overlap_benchmark

# Run overlap benchmark
results = run_overlap_benchmark(
    comm_bytes=[1024, 4096, 16384],  # Communication sizes
    compute_durations_ms=[10, 50, 100],  # Compute durations
    output_dir=".benchmarks/overlap",
    warmup_iters=5,
    benchmark_iters=10,
)

# Results are automatically exported to JSON + CSV
for r in results:
    print(f"Overlap ratio: {r.overlap_ratio:.2%}, Efficiency: {r.overlap_efficiency:.2%}")

Supported Backends

  • NCCL (NVIDIA)
  • HCCL (Huawei Ascend)
  • RCCL (AMD ROCm)
  • Gloo (CPU fallback)

Dependencies

  • isagellm-protocol>=0.1.0 - Protocol definitions
  • isagellm-backend>=0.1.0 - Backend abstraction

Development

Setup

# Install dev dependencies
pip install -e ".[dev]"

# Install pre-commit hooks
pip install pre-commit
pre-commit install

Pre-commit Hooks

This project uses pre-commit to ensure code quality:

# Run on all files
pre-commit run --all-files

# Run on staged files (automatic on git commit)
git commit

Configured hooks:

  • Ruff linter and formatter
  • MyPy type checking
  • Trailing whitespace, end-of-file fixer
  • YAML/TOML/JSON validation

Testing

# Run all tests
pytest tests/ -v

# Run with coverage
pytest tests/ -v --cov=sagellm_comm --cov-report=html

# Run specific test file
pytest tests/test_imports.py -v

Code Quality

# Format code
ruff format .

# Lint code
ruff check . --fix

# Type check
mypy src/

提交流程:先创建 Issue,分支开发并通过测试与 lint,再发起 PR。

版本信息

从 sagellm-backend 迁移

如果你之前从 sagellm-backend 调用通信 API,请按以下方式迁移:

迁移前(v0.3.x)

# ❌ 旧版:通信操作在 backend(已废弃)
from sagellm_backend import get_provider
backend = get_provider("cuda")
backend.all_reduce(tensor, op="sum")

迁移后(v0.4.0+)

# ✅ 新版:通信操作使用 sagellm-comm
from sagellm_comm import CommBackend, ReduceOp

comm = CommBackend.create("nccl")  # 或 "hccl"/"gloo"
comm.init_process_group(world_size=4, rank=0)
comm.all_reduce(tensor, op=ReduceOp.SUM)

API 对照表

旧 API (backend) 新 API (comm) 说明
backend.all_reduce() comm.all_reduce() 集合归约
backend.all_gather() comm.all_gather() 集合收集
backend.broadcast() comm.broadcast() 广播
backend.send() comm.send() P2P 发送
backend.recv() comm.recv() P2P 接收
N/A Topology.discover() 拓扑发现(新增)
N/A CommGroup.create() 通信组管理(新增)

详细迁移指南

完整的迁移指南请参阅:

License

Private - IntelliStream Research Project

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

isagellm_comm-0.5.4.15.tar.gz (171.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

isagellm_comm-0.5.4.15-py2.py3-none-any.whl (215.1 kB view details)

Uploaded Python 2Python 3

File details

Details for the file isagellm_comm-0.5.4.15.tar.gz.

File metadata

  • Download URL: isagellm_comm-0.5.4.15.tar.gz
  • Upload date:
  • Size: 171.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for isagellm_comm-0.5.4.15.tar.gz
Algorithm Hash digest
SHA256 a51917ddc35458874f5471a8d9a28b580a977d17009c4cb0a5dba1c4272a3b6a
MD5 d87e87cbaa4892c3ab1fe5ff350853ff
BLAKE2b-256 5bd589bc5d9778cb64f5192b75f62a7f1888d4acd63c2cabe4ed7254fa33288b

See more details on using hashes here.

File details

Details for the file isagellm_comm-0.5.4.15-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for isagellm_comm-0.5.4.15-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 d6841de2a584d6a541153c1bd392acc4b954832676e5386134e296aefb7e18f2
MD5 0ff2a39cdcf1401663964c74ade7da9c
BLAKE2b-256 b6e4773191bbd90b08a1471d4c14fdd1fb9b78d07811eb216588d0425cb605ce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page