Skip to main content

Multi-platform IM channel bridge with unified message abstraction for AI agents and bots

Project description

Harness IM Bridge Banner

Harness IM Bridge

Multi-platform IM channel bridge — unified message abstraction for AI agents and bots.

PyPI CI Python License: MIT

English · 中文


Highlights

  • 9 Platforms, One Processor — Write one async generator function, run your bot on Feishu, QQ, WeChat Work, DingTalk, Discord, WeChat, and more
  • Streaming-First — Delta events accumulate token-by-token; COMPLETED triggers merge, <think> cleanup, and send
  • Platform Constraints Handled — Reply timeouts, rate limits, and typing indicators managed transparently per platform
  • Proactive Push — Bot-initiated messages for notifications, webhooks, and scheduled tasks
  • LLM Integration Ready — Works with OpenAI streaming, LangGraph agents, and any async generator pattern

Overview

Harness IM Bridge lets you write one processor function and deploy your AI bot across 9 messaging platforms simultaneously. The library handles platform-specific transport (WebSocket, HTTP, Socket.IO), message format translation, media upload/download, rate limiting, and reply timeout strategies — so your bot logic stays clean and platform-agnostic.

async def my_bot(message: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Received: {message.text}")
    yield MessageEvent.completed()

Core Technology

Component Technology Purpose
Message Model Pydantic v2 Typed InboundMessage / MessageEvent / ContentPart
Transport Platform SDKs + aiohttp WebSocket, HTTP callback, Socket.IO, streaming
Orchestration asyncio + Worker pools Multi-channel queues with session-aware batching
Constraints Sliding window + timeout guards Transparent rate limiting and reply timeout handling
Media Per-platform abstraction Unified upload/download across IM platforms
Registry Decorator + filesystem discovery @channel auto-registration for custom channels

Features

  • Unified ModelInboundMessage / MessageEvent / ContentPart across all platforms
  • Async Generator Pattern — Simple processor: receive messages, yield responses
  • Multi-Channel — Run multiple platforms simultaneously with session-aware batching
  • Streaming Support — DELTA events for token-by-token LLM output, merged on COMPLETED
  • <think> Auto-Filtering — Reasoning model thinking content stripped automatically
  • Platform Constraints — Reply timeouts, rate limits, typing indicators per platform
  • Proactive Push — Bot-initiated messages for notifications and scheduled tasks
  • Media Handling — Upload/download abstraction per platform (images, files, audio, video)
  • Session Awareness — Same-session serial, cross-session parallel, prevents message reordering
  • Message Batching — Rapid consecutive messages from the same user are auto-batched
  • Extensible@channel decorator + filesystem discovery for custom channels

Supported Platforms

Platform channel_type Transport Text Image File Push
Feishu (Lark) feishu WebSocket + REST
QQ qq WebSocket + REST
WeChat Work wecom WebSocket + REST
DingTalk dingtalk Stream + REST
Discord discord discord.py
WeChat iLink weixin HTTP Long-Poll
AgentChat agentchat Socket.IO
Yuanbao yuanbao HTTP API
Dashboard dashboard Console (stdout)

Quick Start

Installation

pip install harness-im-bridge

Minimal Echo Bot

import asyncio
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.dashboard import DashboardConfig

async def echo(message: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Echo: {message.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=echo)
    await manager.start()
    channel_id = await manager.add_channel("dashboard", DashboardConfig())
    manager.enqueue(channel_id, "Hello World")
    await asyncio.sleep(0.5)
    await manager.stop()

asyncio.run(main())

Real Platform (QQ Bot)

import asyncio
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.qq import QQConfig

async def bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Echo: {msg.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=bot)
    await manager.start()
    channel_id = await manager.add_qq_channel(
        QQConfig(app_id="YOUR_APP_ID", token="YOUR_TOKEN", secret="YOUR_SECRET")
    )
    await asyncio.Event().wait()

asyncio.run(main())

Multi-Platform

import asyncio, os
from collections.abc import AsyncIterator
from harness_im_bridge import ChannelManager, InboundMessage, MessageEvent
from harness_im_bridge.channels.feishu import FeishuConfig
from harness_im_bridge.channels.qq import QQConfig
from harness_im_bridge.channels.dingtalk import DingTalkConfig

async def unified_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"[{msg.channel_type}] {msg.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=unified_bot, workers_per_channel=4)
    await manager.start()

    await manager.add_feishu_channel(
        FeishuConfig(app_id=os.environ["FEISHU_APP_ID"], app_secret=os.environ["FEISHU_APP_SECRET"])
    )
    await manager.add_qq_channel(
        QQConfig(app_id=os.environ["QQ_APP_ID"], token=os.environ["QQ_TOKEN"], secret=os.environ["QQ_SECRET"])
    )
    await manager.add_dingtalk_channel(
        DingTalkConfig(app_key=os.environ["DINGTALK_APP_KEY"], app_secret=os.environ["DINGTALK_APP_SECRET"])
    )
    await asyncio.Event().wait()

asyncio.run(main())

Streaming + LLM Integration

Event Types

Event Purpose Channel Behavior
MESSAGE Complete message Send immediately
DELTA Token-by-token LLM output Accumulate, merge on COMPLETED
TOOL_START Tool call starting Show status hint (configurable)
TOOL_END Tool call finished Silent by default
TYPING Typing indicator Platform typing status
ERROR Error occurred Send error message
COMPLETED Stream ended Flush delta buffer → clean <think> → send

OpenAI Streaming

from openai import AsyncOpenAI
from harness_im_bridge import InboundMessage, MessageEvent

client = AsyncOpenAI(api_key="sk-xxx")

async def chat_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.typing()

    stream = await client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": msg.text}], stream=True
    )

    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            yield MessageEvent.delta(chunk.choices[0].delta.content)

    yield MessageEvent.completed()

<think> Auto-Filtering

Reasoning models (DeepSeek-R1, QwQ, etc.) output <think>...</think> blocks. Harness IM Bridge strips these automatically — users see only the final answer.


Platform Constraints

Different IM platforms have reply timeouts and rate limits. Harness IM Bridge handles them transparently:

Platform Reply Timeout Rate Limit Strategy
WeChat Work 10s ≤2 per 5s Placeholder on timeout
WeChat Official 5s ≤5 per min Placeholder + typing refresh
QQ None ≤20 per min Rate-limit wait
Feishu None Relaxed No constraint
Discord None ≤5 per 5s Rate-limit wait

Configure constraints per channel:

from harness_im_bridge import ChannelConstraints

constraints = ChannelConstraints(
    reply_timeout=10.0,
    send_rate_limit=(2, 5.0),
    timeout_strategy="placeholder",
    show_thinking=False,
    show_tool_hints=True,
)

Proactive Push

# channel_id is the UUID returned by add_*_channel()
await manager.push_text(channel_id, "user_open_id", "Your build completed ✅")

# Push rich content
from harness_im_bridge import TextContent, ImageContent
await manager.push_content(channel_id, "user_openid", [
    TextContent(text="📊 Daily Report:"),
    ImageContent(url="https://charts.example.com/daily.png"),
])

Multi-Tenant Support

Run multiple instances of the same platform — each with its own tenant_id for session isolation:

from harness_im_bridge.channels.feishu import FeishuConfig

# Two Feishu tenants, session keys auto-isolated as feishu-{tenant_id}-{sender_id}
t1_id = await manager.add_feishu_channel(
    FeishuConfig(app_id="cli_t1", app_secret="secret1", tenant_id="tenant1")
)
t2_id = await manager.add_feishu_channel(
    FeishuConfig(app_id="cli_t2", app_secret="secret2", tenant_id="tenant2")
)

# InboundMessage carries tenant context
async def bot(msg: InboundMessage):
    print(msg.channel_type)  # "feishu"
    print(msg.tenant_id)     # "tenant1" or "tenant2"
    print(msg.channel_id)    # UUID instance ID
    yield MessageEvent.text(f"[{msg.tenant_id}] {msg.text}")
    yield MessageEvent.completed()

---

## Custom Channels

```python
from dataclasses import dataclass
from harness_im_bridge import BaseChannel, ChannelConfig, InboundMessage, ContentPart, MessageProcessor
from harness_im_bridge.registry import channel

@dataclass
class TelegramConfig(ChannelConfig):
    bot_token: str = ""

@channel("telegram")
class TelegramChannel(BaseChannel):
    channel_type = "telegram"

    def __init__(self, processor: MessageProcessor, config: TelegramConfig, *,
                 channel_id: str | None = None, tenant_id: str | None = None):
        super().__init__(processor, channel_id=channel_id, tenant_id=tenant_id)
        self._bot_token = config.bot_token

    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def _send_text(self, to_handle: str, text: str, meta=None) -> None: ...
    async def _send_content(self, to_handle: str, parts: list[ContentPart], meta=None) -> None: ...
    async def _send_media(self, to_handle: str, media: ContentPart, meta=None) -> None: ...
    def parse_inbound(self, raw_payload) -> InboundMessage: ...

Architecture

┌─────────────────┐
│  IM Platform    │   Feishu / QQ / WeChat Work / DingTalk / Discord / ...
└────────┬────────┘
         │ WebSocket / HTTP / Socket.IO
         ▼
┌─────────────────┐
│  BaseChannel    │   parse_inbound() → InboundMessage
│  (impl)         │   send_text() / send_content()
└────────┬────────┘
         │ enqueue
         ▼
┌─────────────────┐
│ ChannelManager  │   Multi-channel queues + Worker pool + Session lock
└────────┬────────┘
         │ InboundMessage
         ▼
┌─────────────────┐
│ Your Processor  │   async def(InboundMessage) -> AsyncIterator[MessageEvent]
└────────┬────────┘
         │ MessageEvent (MESSAGE / DELTA / TOOL_START / ...)
         ▼
┌─────────────────┐
│ BaseChannel     │   Constraints: timeout guard + rate limiter + typing
│ (send reply)    │   Delta merge → platform API delivery
└─────────────────┘

Development

make install      # Install dependencies
make test         # Run tests
make test-quick   # Unit tests only (fast)
make lint         # Lint + type check
make format       # Format code
make build        # Build wheel

Related Projects

Project Description
harness-agent Production-grade AI agent platform built on LangChain Deep Agents
harness-memory Pluggable memory system with hierarchical recall and FTS
harness-browser AI-friendly browser automation via CDP

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

harness_im_bridge-0.1.0.tar.gz (428.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

harness_im_bridge-0.1.0-py3-none-any.whl (95.1 kB view details)

Uploaded Python 3

File details

Details for the file harness_im_bridge-0.1.0.tar.gz.

File metadata

  • Download URL: harness_im_bridge-0.1.0.tar.gz
  • Upload date:
  • Size: 428.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for harness_im_bridge-0.1.0.tar.gz
Algorithm Hash digest
SHA256 f742e1aa6770c66df4ce4c74ccfaba5433ee7bd6acfb7a57483480822a277451
MD5 b47ee9a57e204c88e300926fbb4de204
BLAKE2b-256 1e077db1a815d5081f049d15b1e5a3617b00dc6e2830eaef9822815d2961792a

See more details on using hashes here.

File details

Details for the file harness_im_bridge-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for harness_im_bridge-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f247a111347ec164dc4f771ab3a9d63b2002a9633e827f65918818871cef6850
MD5 bed7e680e891cc7f6829a130a7256758
BLAKE2b-256 3dba32a3bc43328420c724f159dc87162920058fa4088689c4536b62481f400c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page