Multi-platform IM channel bridge with unified message abstraction for AI agents and bots
Project description
Harness Gateway
Multi-platform IM channel bridge — unified message abstraction for AI agents and bots.
English · 中文
Highlights
- 9 Platforms, One Processor — Write one async generator, run your bot on Feishu, QQ, WeCom, DingTalk, WeChat iLink, Yuanbao, Xiaoyi, MQTT, and Telegram
- Streaming-First —
DELTAevents accumulate token-by-token;COMPLETEDtriggers merge,<think>cleanup, and send - Platform Constraints Handled — Reply timeouts, rate limits, and typing indicators managed per platform
- Proactive Push — Bot-initiated messages via
ChannelSubjectrouting for notifications and scheduled tasks - LLM Integration Ready — OpenAI streaming, LangGraph agents, and any async-generator processor pattern
Overview
Harness Gateway lets you write one processor function and deploy your AI bot across multiple messaging platforms. The library handles platform transport (WebSocket, HTTP, MQTT, long-poll), message normalization, media persistence, rate limiting, and reply-timeout strategies — so your bot logic stays platform-agnostic.
async def my_bot(message: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.text(f"Received: {message.text}")
yield MessageEvent.completed()
Core Technology
| Component | Technology | Purpose |
|---|---|---|
| Message Model | Pydantic v2 | Typed InboundMessage / MessageEvent / ContentPart |
| Routing | ChannelSubject |
Unified outbound handle (subject_id + platform metadata) |
| Transport | Platform SDKs + aiohttp | WebSocket, Stream, HTTP, MQTT, long-poll |
| Orchestration | asyncio + worker pools | Multi-channel queues with session-aware batching |
| Constraints | Sliding window + timeout guards | Rate limits, reply timeouts, typing keepalive |
| Media | MediaBackend |
Pluggable storage; per-channel fetch_remote_media for auth |
Features
- Unified Model —
InboundMessage/MessageEvent/ContentPartacross all platforms - Async Generator Pattern — Receive messages, yield response events
- Multi-Channel — Run several platforms simultaneously with session-aware batching
- Streaming Support —
DELTAfor token-by-token LLM output, merged onCOMPLETED <think>Filtering — Reasoning-model thinking blocks stripped by default- Localized Tool Hints — Pass
tool_hint_textin event metadata, or fall back to templates - Platform Constraints — Per-channel reply timeouts, rate limits, typing indicators
- Proactive Push —
push_text/push_content/push_to_allviaChannelSubject - Media Handling — Inbound auto-persist + outbound
load_media_bytesthroughMediaBackend - Multi-Tenant — Optional
tenant_idper channel instance for session isolation - Extensible — Subclass
BaseChanneland register viaadd_channel(instance)
Supported Platforms
| Platform | channel_type |
Transport | Text | Image | File | Push |
|---|---|---|---|---|---|---|
| Feishu (Lark) | feishu |
WebSocket + REST | ✅ | ✅ | ✅ | ✅ |
qq |
WebSocket + REST | ✅ | ✅ | ✅ | ✅ | |
| WeCom (Enterprise WeChat) | wecom |
WebSocket + stream reply | ✅ | ⚠️ | ⚠️ | ✅ |
| DingTalk | dingtalk |
Stream + REST | ✅ | ✅ | ✅ | ✅ |
| WeChat iLink | weixin |
HTTP long-poll | ✅ | ⚠️ | ⚠️ | ✅ |
| Yuanbao (腾讯元宝) | yuanbao |
HTTP REST | ✅ | ✅ | ✅ | ✅ |
| Xiaoyi (小艺) | xiaoyi |
WebSocket (primary + backup) | ✅ | ✅ | ✅ | ✅ |
| MQTT | mqtt |
pub/sub broker | ✅ | ⚠️ | ⚠️ | ✅ |
| Telegram | telegram |
long-polling | ✅ | ✅ | ✅ | ✅ |
✅ = supported · ⚠️ = URL or text fallback (no native bytes on some platforms)
See README_CN.md for a detailed capability matrix (group chat, audio/video, Markdown, etc.).
Examples
| Platform | Example script | Key env vars |
|---|---|---|
| Telegram | examples/telegram_bot.py |
TELEGRAM_BOT_TOKEN |
| MQTT | examples/mqtt_bot.py |
MQTT_HOST, MQTT_USERNAME, MQTT_PASSWORD |
| Xiaoyi | examples/xiaoyi_bot.py |
XIAOYI_AK, XIAOYI_SK, XIAOYI_AGENT_ID |
examples/qq_bot.py |
QQ_APP_ID, QQ_TOKEN, QQ_SECRET |
|
| Feishu | examples/feishu_bot.py |
FEISHU_APP_ID, FEISHU_APP_SECRET |
| WeCom | examples/wecom_bot.py |
WECOM_BOT_ID, WECOM_SECRET |
| Weixin | examples/weixin_bot.py |
WEIXIN_ACCOUNT_ID, WEIXIN_TOKEN |
| Multi-channel | examples/all_channels.py |
Combined credentials |
Copy .env.example to .env and fill in credentials before running examples.
Quick Start
Installation
pip install harness-gateway
# With example / agent integration deps
pip install "harness-gateway[examples]"
Minimal Echo Bot (Telegram)
import asyncio
import os
from collections.abc import AsyncIterator
from harness_gateway import ChannelManager, InboundMessage, MessageEvent
from harness_gateway.channels.telegram import TelegramConfig
async def echo(message: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.text(f"Echo: {message.text}")
yield MessageEvent.completed()
async def main():
manager = ChannelManager(processor=echo)
await manager.start()
await manager.add_telegram_channel(
TelegramConfig(bot_token=os.environ["TELEGRAM_BOT_TOKEN"])
)
await asyncio.Event().wait()
asyncio.run(main())
Multi-Platform
import asyncio
import os
from collections.abc import AsyncIterator
from harness_gateway import ChannelManager, InboundMessage, MessageEvent
from harness_gateway.channels.dingtalk import DingTalkConfig
from harness_gateway.channels.feishu import FeishuConfig
from harness_gateway.channels.qq import QQConfig
async def unified_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.text(f"[{msg.channel_type}] {msg.text}")
yield MessageEvent.completed()
async def main():
manager = ChannelManager(processor=unified_bot, workers_per_channel=4)
await manager.start()
await manager.add_feishu_channel(
FeishuConfig(app_id=os.environ["FEISHU_APP_ID"], app_secret=os.environ["FEISHU_APP_SECRET"])
)
await manager.add_qq_channel(
QQConfig(app_id=os.environ["QQ_APP_ID"], token=os.environ["QQ_TOKEN"], secret=os.environ["QQ_SECRET"])
)
await manager.add_dingtalk_channel(
DingTalkConfig(app_key=os.environ["DINGTALK_APP_KEY"], app_secret=os.environ["DINGTALK_APP_SECRET"])
)
await asyncio.Event().wait()
asyncio.run(main())
Streaming + LLM Integration
Event Types
| Event | Purpose | Channel Behavior |
|---|---|---|
MESSAGE |
Complete message | Send immediately |
DELTA |
Token-by-token LLM output | Accumulate, merge on COMPLETED |
TOOL_START |
Tool call starting | Status hint (configurable) |
TOOL_END |
Tool call finished | Optional completion hint |
TYPING |
Typing indicator | Platform typing status |
ERROR |
Error occurred | Send error message |
COMPLETED |
Stream ended | Flush delta buffer → strip thinking → send |
OpenAI Streaming
from openai import AsyncOpenAI
from harness_gateway import InboundMessage, MessageEvent
client = AsyncOpenAI(api_key="sk-xxx")
async def chat_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.typing()
stream = await client.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": msg.text}], stream=True
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield MessageEvent.delta(chunk.choices[0].delta.content)
yield MessageEvent.completed()
Localized Tool Hints
By default, TOOL_START / TOOL_END use English templates from ChannelConstraints. Pass a pre-localized string in event metadata to override:
yield MessageEvent.tool_start(
"read_file",
tool_hint_text="🔧 正在调用工具:读取文件",
)
Disable hints per channel: ChannelConstraints(show_tool_hints=False).
<think> Auto-Filtering
Reasoning models (DeepSeek-R1, QwQ, etc.) emit <think>...</think> blocks. Harness Gateway strips them before delivery unless show_thinking=True.
Platform Constraints
Default constraints vary by platform. Override via add_*_channel(..., constraints=...) or channel.constraints at runtime.
| Platform | Reply Timeout | Rate Limit | Typing Keepalive | Notes |
|---|---|---|---|---|
| WeCom | 10s | 2 / 5s | — | Stream reply + placeholder on timeout |
| WeChat iLink | 5s | 5 / 60s | 5s | sendtyping API |
| Telegram | — | 20 / 60s | 4s | When show_typing=True |
| MQTT | — | 30 / 60s | — | URL-based media payload |
| Xiaoyi | — | 20 / 60s | — | WebSocket dual connection |
| Feishu / QQ / DingTalk / Yuanbao | — | — | — | Use custom constraints as needed |
from harness_gateway import ChannelConstraints
constraints = ChannelConstraints(
reply_timeout=10.0,
send_rate_limit=(2, 5.0),
timeout_strategy="placeholder",
show_thinking=False,
show_tool_hints=True,
tool_hint_template="🔧 Calling tool: {tool_name}",
)
Media Backend
Inbound media is downloaded (with per-platform auth when needed), saved through MediaBackend, and stamped on ContentPart.local_path. Outbound sends read bytes via load_media_bytes (data → local_path → url).
from harness_gateway import ChannelManager, FileSystemMediaBackend
media = FileSystemMediaBackend("/tmp/harness-gateway/media")
manager = ChannelManager(processor=my_bot, media_backend=media)
Override fetch_remote_media() on your channel subclass when platform URLs require auth headers.
Proactive Push
Outbound routing uses ChannelSubject — auto-tracked on every inbound message (channel_subject.subject_id identifies the user).
# Single user (subject must have interacted before, or supply routing metadata)
subject = channel.get_subject("user_open_id")
if subject:
await manager.push_text(channel_id, subject, "Your build completed ✅")
# Rich content
from harness_gateway import ImageContent, TextContent
await manager.push_content(channel_id, subject, [
TextContent(text="📊 Daily Report:"),
ImageContent(url="https://charts.example.com/daily.png"),
])
# Broadcast to all known subjects on a channel
await manager.push_to_all(channel_id, "👋 Scheduled greeting!")
Multi-Tenant Support
Run multiple instances of the same platform with isolated tenant_id:
from harness_gateway.channels.feishu import FeishuConfig
t1_id = await manager.add_feishu_channel(
FeishuConfig(app_id="cli_t1", app_secret="secret1", tenant_id="tenant1")
)
t2_id = await manager.add_feishu_channel(
FeishuConfig(app_id="cli_t2", app_secret="secret2", tenant_id="tenant2")
)
async def bot(msg: InboundMessage):
# msg.tenant_id → "tenant1" or "tenant2"
# msg.channel_id → UUID instance key
yield MessageEvent.text(f"[{msg.tenant_id}] {msg.text}")
yield MessageEvent.completed()
Custom Channels
Subclass BaseChannel, implement start / stop / _send_* / parse_inbound, then register:
from dataclasses import dataclass
from harness_gateway import BaseChannel, ChannelConfig, ChannelSubject, ContentPart, InboundMessage
@dataclass
class MyPlatformConfig(ChannelConfig):
api_key: str = ""
class MyPlatformChannel(BaseChannel):
channel_type = "myplatform"
async def start(self) -> None: ...
async def stop(self) -> None: ...
async def _send_text(self, subject: ChannelSubject, text: str) -> None: ...
async def _send_content(self, subject: ChannelSubject, parts: list[ContentPart]) -> None: ...
async def _send_media(self, subject: ChannelSubject, media: ContentPart) -> None: ...
def parse_inbound(self, raw_payload) -> InboundMessage: ...
# Register an instance
channel = MyPlatformChannel(processor, MyPlatformConfig(api_key="..."))
channel_id = await manager.add_channel(channel)
See AGENTS.md for the full channel implementation specification.
Architecture
┌─────────────────┐
│ IM Platform │ Feishu / QQ / WeCom / DingTalk / Telegram / MQTT / ...
└────────┬────────┘
│ WebSocket / HTTP / MQTT
▼
┌─────────────────┐
│ BaseChannel │ parse_inbound() → InboundMessage
│ (impl) │ _send_text() / _send_content() / _send_media()
└────────┬────────┘
│ enqueue
▼
┌─────────────────┐
│ ChannelManager │ Queues + workers + session batching + MediaBackend
└────────┬────────┘
│ InboundMessage
▼
┌─────────────────┐
│ Your Processor │ async def(InboundMessage) → AsyncIterator[MessageEvent]
└────────┬────────┘
│ MessageEvent (MESSAGE / DELTA / TOOL_START / ...)
▼
┌─────────────────┐
│ BaseChannel │ Constraints + delta merge → platform API
└─────────────────┘
Development
Prerequisites: Python 3.11+, uv
git clone https://github.com/orcakit/harness-gateway.git
cd harness-gateway
make install # uv sync --extra dev
make all # lint + typecheck + test (CI ship bar)
| Command | Description |
|---|---|
make lint |
Ruff check + format check |
make format |
Ruff auto-fix + format |
make typecheck |
mypy src |
make test |
pytest -m "not integration" |
make test-integration |
pytest -m integration (requires network) |
make build |
wheel + sdist → dist/ |
make clean |
remove build artifacts and caches |
Contributing
Contributions are welcome! Please read CONTRIBUTING.md and run make all before opening a PR.
Security issues: see SECURITY.md.
Related Projects
| Project | Description |
|---|---|
| Octop | Self-hosted multi-user AI control plane |
| harness-agent | Production-grade AI agent platform built on LangChain Deep Agents |
| harness-memory | Pluggable memory system with hierarchical recall and FTS |
| harness-browser | AI-friendly browser automation via CDP |
License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file harness_gateway-0.8.1.tar.gz.
File metadata
- Download URL: harness_gateway-0.8.1.tar.gz
- Upload date:
- Size: 459.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ddaebbbe1e1481e77b834511e59bce3023e0148257ab09990912044a7d1c3300
|
|
| MD5 |
e05a4db70ea844c579be9c4c7a0c6a34
|
|
| BLAKE2b-256 |
2d155daeeb8d8692fdfbec060ebd5a42065b95c9300b9ea22457b73f34e4ec2a
|
File details
Details for the file harness_gateway-0.8.1-py3-none-any.whl.
File metadata
- Download URL: harness_gateway-0.8.1-py3-none-any.whl
- Upload date:
- Size: 106.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
74a083c37742ae0d15cc46c93993b74bd1861e602a8ddf4a28a3757f1b7b2ce7
|
|
| MD5 |
2d5d15f06c7cc894c74a26e31952afc8
|
|
| BLAKE2b-256 |
b18b21d7e34413687a36415d13ef072c3f1d07862c6d98f9cdb0e52867a10512
|