Multi-platform IM channel bridge with unified message abstraction for AI agents and bots
Project description
Harness IM Bridge
Multi-platform IM channel bridge — unified message abstraction for AI agents and bots.
English · 中文
Highlights
- 9 Platforms, One Processor — Write one async generator function, run your bot on Feishu, QQ, WeChat Work, DingTalk, Discord, WeChat, and more
- Streaming-First — Delta events accumulate token-by-token;
COMPLETEDtriggers merge,<think>cleanup, and send - Platform Constraints Handled — Reply timeouts, rate limits, and typing indicators managed transparently per platform
- Proactive Push — Bot-initiated messages for notifications, webhooks, and scheduled tasks
- LLM Integration Ready — Works with OpenAI streaming, LangGraph agents, and any async generator pattern
Overview
Harness IM Bridge lets you write one processor function and deploy your AI bot across 9 messaging platforms simultaneously. The library handles platform-specific transport (WebSocket, HTTP, Socket.IO), message format translation, media upload/download, rate limiting, and reply timeout strategies — so your bot logic stays clean and platform-agnostic.
async def my_bot(message: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.text(f"Received: {message.text}")
yield MessageEvent.completed()
Core Technology
| Component | Technology | Purpose |
|---|---|---|
| Message Model | Pydantic v2 | Typed InboundMessage / MessageEvent / ContentPart |
| Transport | Platform SDKs + aiohttp | WebSocket, HTTP callback, Socket.IO, streaming |
| Orchestration | asyncio + Worker pools | Multi-channel queues with session-aware batching |
| Constraints | Sliding window + timeout guards | Transparent rate limiting and reply timeout handling |
| Media | Per-platform abstraction | Unified upload/download across IM platforms |
| Registry | Decorator + filesystem discovery | @channel auto-registration for custom channels |
Features
- Unified Model —
InboundMessage/MessageEvent/ContentPartacross all platforms - Async Generator Pattern — Simple processor: receive messages, yield responses
- Multi-Channel — Run multiple platforms simultaneously with session-aware batching
- Streaming Support — DELTA events for token-by-token LLM output, merged on COMPLETED
<think>Auto-Filtering — Reasoning model thinking content stripped automatically- Platform Constraints — Reply timeouts, rate limits, typing indicators per platform
- Proactive Push — Bot-initiated messages for notifications and scheduled tasks
- Media Handling — Upload/download abstraction per platform (images, files, audio, video)
- Session Awareness — Same-session serial, cross-session parallel, prevents message reordering
- Message Batching — Rapid consecutive messages from the same user are auto-batched
- Extensible —
@channeldecorator + filesystem discovery for custom channels
Supported Platforms
| Platform | channel_type | Transport | Text | Image | File | Push |
|---|---|---|---|---|---|---|
| Feishu (Lark) | feishu |
WebSocket + REST | ✅ | ✅ | ✅ | ✅ |
qq |
WebSocket + REST | ✅ | ✅ | ✅ | ✅ | |
| WeChat Work | wecom |
WebSocket + REST | ✅ | ✅ | ✅ | ✅ |
| DingTalk | dingtalk |
Stream + REST | ✅ | ✅ | ✅ | ✅ |
| Discord | discord |
discord.py | ✅ | ✅ | ✅ | ✅ |
| WeChat iLink | weixin |
HTTP Long-Poll | ✅ | ✅ | ✅ | ✅ |
| AgentChat | agentchat |
Socket.IO | ✅ | ✅ | ✅ | ✅ |
| Yuanbao | yuanbao |
HTTP API | ✅ | ✅ | ✅ | ✅ |
| Dashboard | dashboard |
Console (stdout) | ✅ | — | — | — |
Quick Start
Installation
pip install harness-im-bridge
Minimal Echo Bot
import asyncio
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.dashboard import DashboardConfig
async def echo(message: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.text(f"Echo: {message.text}")
yield MessageEvent.completed()
async def main():
manager = ChannelManager(processor=echo)
await manager.start()
channel_id = await manager.add_channel("dashboard", DashboardConfig())
manager.enqueue(channel_id, "Hello World")
await asyncio.sleep(0.5)
await manager.stop()
asyncio.run(main())
Real Platform (QQ Bot)
import asyncio
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.qq import QQConfig
async def bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.text(f"Echo: {msg.text}")
yield MessageEvent.completed()
async def main():
manager = ChannelManager(processor=bot)
await manager.start()
channel_id = await manager.add_qq_channel(
QQConfig(app_id="YOUR_APP_ID", token="YOUR_TOKEN", secret="YOUR_SECRET")
)
await asyncio.Event().wait()
asyncio.run(main())
Multi-Platform
import asyncio, os
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.feishu import FeishuConfig
from harness_im_bridge.channels.qq import QQConfig
from harness_im_bridge.channels.dingtalk import DingTalkConfig
async def unified_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.text(f"[{msg.channel_type}] {msg.text}")
yield MessageEvent.completed()
async def main():
manager = ChannelManager(processor=unified_bot, workers_per_channel=4)
await manager.start()
await manager.add_feishu_channel(
FeishuConfig(app_id=os.environ["FEISHU_APP_ID"], app_secret=os.environ["FEISHU_APP_SECRET"])
)
await manager.add_qq_channel(
QQConfig(app_id=os.environ["QQ_APP_ID"], token=os.environ["QQ_TOKEN"], secret=os.environ["QQ_SECRET"])
)
await manager.add_dingtalk_channel(
DingTalkConfig(app_key=os.environ["DINGTALK_APP_KEY"], app_secret=os.environ["DINGTALK_APP_SECRET"])
)
await asyncio.Event().wait()
asyncio.run(main())
Streaming + LLM Integration
Event Types
| Event | Purpose | Channel Behavior |
|---|---|---|
MESSAGE |
Complete message | Send immediately |
DELTA |
Token-by-token LLM output | Accumulate, merge on COMPLETED |
TOOL_START |
Tool call starting | Show status hint (configurable) |
TOOL_END |
Tool call finished | Silent by default |
TYPING |
Typing indicator | Platform typing status |
ERROR |
Error occurred | Send error message |
COMPLETED |
Stream ended | Flush delta buffer → clean <think> → send |
OpenAI Streaming
from openai import AsyncOpenAI
from harness_im_bridge import InboundMessage, MessageEvent
client = AsyncOpenAI(api_key="sk-xxx")
async def chat_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
yield MessageEvent.typing()
stream = await client.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": msg.text}], stream=True
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield MessageEvent.delta(chunk.choices[0].delta.content)
yield MessageEvent.completed()
<think> Auto-Filtering
Reasoning models (DeepSeek-R1, QwQ, etc.) output <think>...</think> blocks. Harness IM Bridge strips these automatically — users see only the final answer.
Platform Constraints
Different IM platforms have reply timeouts and rate limits. Harness IM Bridge handles them transparently:
| Platform | Reply Timeout | Rate Limit | Strategy |
|---|---|---|---|
| WeChat Work | 10s | ≤2 per 5s | Placeholder on timeout |
| WeChat Official | 5s | ≤5 per min | Placeholder + typing refresh |
| None | ≤20 per min | Rate-limit wait | |
| Feishu | None | Relaxed | No constraint |
| Discord | None | ≤5 per 5s | Rate-limit wait |
Configure constraints per channel:
from harness_im_bridge import ChannelConstraints
constraints = ChannelConstraints(
reply_timeout=10.0,
send_rate_limit=(2, 5.0),
timeout_strategy="placeholder",
show_thinking=False,
show_tool_hints=True,
)
Proactive Push
# channel_id is the UUID returned by add_*_channel()
await manager.push_text(channel_id, "user_open_id", "Your build completed ✅")
# Push rich content
from harness_im_bridge import TextContent, ImageContent
await manager.push_content(channel_id, "user_openid", [
TextContent(text="📊 Daily Report:"),
ImageContent(url="https://charts.example.com/daily.png"),
])
Multi-Tenant Support
Run multiple instances of the same platform — each with its own tenant_id for session isolation:
from harness_im_bridge.channels.feishu import FeishuConfig
# Two Feishu tenants, session keys auto-isolated as feishu-{tenant_id}-{sender_id}
t1_id = await manager.add_feishu_channel(
FeishuConfig(app_id="cli_t1", app_secret="secret1", tenant_id="tenant1")
)
t2_id = await manager.add_feishu_channel(
FeishuConfig(app_id="cli_t2", app_secret="secret2", tenant_id="tenant2")
)
# InboundMessage carries tenant context
async def bot(msg: InboundMessage):
print(msg.channel_type) # "feishu"
print(msg.tenant_id) # "tenant1" or "tenant2"
print(msg.channel_id) # UUID instance ID
yield MessageEvent.text(f"[{msg.tenant_id}] {msg.text}")
yield MessageEvent.completed()
---
## Custom Channels
```python
from dataclasses import dataclass
from harness_im_bridge import BaseChannel, ChannelConfig, InboundMessage, ContentPart, MessageProcessor
from harness_im_bridge.registry import channel
@dataclass
class TelegramConfig(ChannelConfig):
bot_token: str = ""
@channel("telegram")
class TelegramChannel(BaseChannel):
channel_type = "telegram"
def __init__(self, processor: MessageProcessor, config: TelegramConfig, *,
channel_id: str | None = None, tenant_id: str | None = None):
super().__init__(processor, channel_id=channel_id, tenant_id=tenant_id)
self._bot_token = config.bot_token
async def start(self) -> None: ...
async def stop(self) -> None: ...
async def _send_text(self, to_handle: str, text: str, meta=None) -> None: ...
async def _send_content(self, to_handle: str, parts: list[ContentPart], meta=None) -> None: ...
async def _send_media(self, to_handle: str, media: ContentPart, meta=None) -> None: ...
def parse_inbound(self, raw_payload) -> InboundMessage: ...
Architecture
┌─────────────────┐
│ IM Platform │ Feishu / QQ / WeChat Work / DingTalk / Discord / ...
└────────┬────────┘
│ WebSocket / HTTP / Socket.IO
▼
┌─────────────────┐
│ BaseChannel │ parse_inbound() → InboundMessage
│ (impl) │ send_text() / send_content()
└────────┬────────┘
│ enqueue
▼
┌─────────────────┐
│ ChannelManager │ Multi-channel queues + Worker pool + Session lock
└────────┬────────┘
│ InboundMessage
▼
┌─────────────────┐
│ Your Processor │ async def(InboundMessage) -> AsyncIterator[MessageEvent]
└────────┬────────┘
│ MessageEvent (MESSAGE / DELTA / TOOL_START / ...)
▼
┌─────────────────┐
│ BaseChannel │ Constraints: timeout guard + rate limiter + typing
│ (send reply) │ Delta merge → platform API delivery
└─────────────────┘
Development
make install # Install dependencies
make test # Run tests
make test-quick # Unit tests only (fast)
make lint # Lint + type check
make format # Format code
make build # Build wheel
Related Projects
| Project | Description |
|---|---|
| harness-agent | Production-grade AI agent platform built on LangChain Deep Agents |
| harness-memory | Pluggable memory system with hierarchical recall and FTS |
| harness-browser | AI-friendly browser automation via CDP |
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file harness_im_bridge-0.1.0.tar.gz.
File metadata
- Download URL: harness_im_bridge-0.1.0.tar.gz
- Upload date:
- Size: 428.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f742e1aa6770c66df4ce4c74ccfaba5433ee7bd6acfb7a57483480822a277451
|
|
| MD5 |
b47ee9a57e204c88e300926fbb4de204
|
|
| BLAKE2b-256 |
1e077db1a815d5081f049d15b1e5a3617b00dc6e2830eaef9822815d2961792a
|
File details
Details for the file harness_im_bridge-0.1.0-py3-none-any.whl.
File metadata
- Download URL: harness_im_bridge-0.1.0-py3-none-any.whl
- Upload date:
- Size: 95.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f247a111347ec164dc4f771ab3a9d63b2002a9633e827f65918818871cef6850
|
|
| MD5 |
bed7e680e891cc7f6829a130a7256758
|
|
| BLAKE2b-256 |
3dba32a3bc43328420c724f159dc87162920058fa4088689c4536b62481f400c
|