Skip to main content

Multi-platform IM channel bridge with unified message abstraction for AI agents and bots

Project description

Harness Gateway Banner

Harness Gateway

Multi-platform IM channel bridge — unified message abstraction for AI agents and bots.

PyPI CI Python License: MIT

English · 中文


Highlights

  • 9 Platforms, One Processor — Write one async generator, run your bot on Feishu, QQ, WeCom, DingTalk, WeChat iLink, Yuanbao, Xiaoyi, MQTT, and Telegram
  • Streaming-FirstDELTA events accumulate token-by-token; COMPLETED triggers merge, <think> cleanup, and send
  • Platform Constraints Handled — Reply timeouts, rate limits, and typing indicators managed per platform
  • Proactive Push — Bot-initiated messages via ChannelSubject routing for notifications and scheduled tasks
  • LLM Integration Ready — OpenAI streaming, LangGraph agents, and any async-generator processor pattern

Overview

Harness Gateway lets you write one processor function and deploy your AI bot across multiple messaging platforms. The library handles platform transport (WebSocket, HTTP, MQTT, long-poll), message normalization, media persistence, rate limiting, and reply-timeout strategies — so your bot logic stays platform-agnostic.

async def my_bot(message: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Received: {message.text}")
    yield MessageEvent.completed()

Core Technology

Component Technology Purpose
Message Model Pydantic v2 Typed InboundMessage / MessageEvent / ContentPart
Routing ChannelSubject Unified outbound handle (subject_id + platform metadata)
Transport Platform SDKs + aiohttp WebSocket, Stream, HTTP, MQTT, long-poll
Orchestration asyncio + worker pools Multi-channel queues with session-aware batching
Constraints Sliding window + timeout guards Rate limits, reply timeouts, typing keepalive
Media MediaBackend Pluggable storage; per-channel fetch_remote_media for auth

Features

  • Unified ModelInboundMessage / MessageEvent / ContentPart across all platforms
  • Async Generator Pattern — Receive messages, yield response events
  • Multi-Channel — Run several platforms simultaneously with session-aware batching
  • Streaming SupportDELTA for token-by-token LLM output, merged on COMPLETED
  • <think> Filtering — Reasoning-model thinking blocks stripped by default
  • Localized Tool Hints — Pass tool_hint_text in event metadata, or fall back to templates
  • Platform Constraints — Per-channel reply timeouts, rate limits, typing indicators
  • Proactive Pushpush_text / push_content / push_to_all via ChannelSubject
  • Media Handling — Inbound auto-persist + outbound load_media_bytes through MediaBackend
  • Multi-Tenant — Optional tenant_id per channel instance for session isolation
  • Extensible — Subclass BaseChannel and register via add_channel(instance)

Supported Platforms

Platform channel_type Transport Text Image File Push
Feishu (Lark) feishu WebSocket + REST
QQ qq WebSocket + REST
WeCom (Enterprise WeChat) wecom WebSocket + stream reply ⚠️ ⚠️
DingTalk dingtalk Stream + REST
WeChat iLink weixin HTTP long-poll ⚠️ ⚠️
Yuanbao (腾讯元宝) yuanbao HTTP REST
Xiaoyi (小艺) xiaoyi WebSocket (primary + backup)
MQTT mqtt pub/sub broker ⚠️ ⚠️
Telegram telegram long-polling

✅ = supported · ⚠️ = URL or text fallback (no native bytes on some platforms)

See README_CN.md for a detailed capability matrix (group chat, audio/video, Markdown, etc.).

Examples

Platform Example script Key env vars
Telegram examples/telegram_bot.py TELEGRAM_BOT_TOKEN
MQTT examples/mqtt_bot.py MQTT_HOST, MQTT_USERNAME, MQTT_PASSWORD
Xiaoyi examples/xiaoyi_bot.py XIAOYI_AK, XIAOYI_SK, XIAOYI_AGENT_ID
QQ examples/qq_bot.py QQ_APP_ID, QQ_TOKEN, QQ_SECRET
Feishu examples/feishu_bot.py FEISHU_APP_ID, FEISHU_APP_SECRET
WeCom examples/wecom_bot.py WECOM_BOT_ID, WECOM_SECRET
Weixin examples/weixin_bot.py WEIXIN_ACCOUNT_ID, WEIXIN_TOKEN
Multi-channel examples/all_channels.py Combined credentials

Copy .env.example to .env and fill in credentials before running examples.


Quick Start

Installation

pip install harness-gateway

# With example / agent integration deps
pip install "harness-gateway[examples]"

Minimal Echo Bot (Telegram)

import asyncio
import os
from collections.abc import AsyncIterator

from harness_gateway import ChannelManager, InboundMessage, MessageEvent
from harness_gateway.channels.telegram import TelegramConfig

async def echo(message: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"Echo: {message.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=echo)
    await manager.start()
    await manager.add_telegram_channel(
        TelegramConfig(bot_token=os.environ["TELEGRAM_BOT_TOKEN"])
    )
    await asyncio.Event().wait()

asyncio.run(main())

Multi-Platform

import asyncio
import os
from collections.abc import AsyncIterator

from harness_gateway import ChannelManager, InboundMessage, MessageEvent
from harness_gateway.channels.dingtalk import DingTalkConfig
from harness_gateway.channels.feishu import FeishuConfig
from harness_gateway.channels.qq import QQConfig

async def unified_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.text(f"[{msg.channel_type}] {msg.text}")
    yield MessageEvent.completed()

async def main():
    manager = ChannelManager(processor=unified_bot, workers_per_channel=4)
    await manager.start()

    await manager.add_feishu_channel(
        FeishuConfig(app_id=os.environ["FEISHU_APP_ID"], app_secret=os.environ["FEISHU_APP_SECRET"])
    )
    await manager.add_qq_channel(
        QQConfig(app_id=os.environ["QQ_APP_ID"], token=os.environ["QQ_TOKEN"], secret=os.environ["QQ_SECRET"])
    )
    await manager.add_dingtalk_channel(
        DingTalkConfig(app_key=os.environ["DINGTALK_APP_KEY"], app_secret=os.environ["DINGTALK_APP_SECRET"])
    )
    await asyncio.Event().wait()

asyncio.run(main())

Streaming + LLM Integration

Event Types

Event Purpose Channel Behavior
MESSAGE Complete message Send immediately
DELTA Token-by-token LLM output Accumulate, merge on COMPLETED
TOOL_START Tool call starting Status hint (configurable)
TOOL_END Tool call finished Optional completion hint
TYPING Typing indicator Platform typing status
ERROR Error occurred Send error message
COMPLETED Stream ended Flush delta buffer → strip thinking → send

OpenAI Streaming

from openai import AsyncOpenAI
from harness_gateway import InboundMessage, MessageEvent

client = AsyncOpenAI(api_key="sk-xxx")

async def chat_bot(msg: InboundMessage) -> AsyncIterator[MessageEvent]:
    yield MessageEvent.typing()

    stream = await client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": msg.text}], stream=True
    )

    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            yield MessageEvent.delta(chunk.choices[0].delta.content)

    yield MessageEvent.completed()

Localized Tool Hints

By default, TOOL_START / TOOL_END use English templates from ChannelConstraints. Pass a pre-localized string in event metadata to override:

yield MessageEvent.tool_start(
    "read_file",
    tool_hint_text="🔧 正在调用工具:读取文件",
)

Disable hints per channel: ChannelConstraints(show_tool_hints=False).

<think> Auto-Filtering

Reasoning models (DeepSeek-R1, QwQ, etc.) emit <think>...</think> blocks. Harness Gateway strips them before delivery unless show_thinking=True.


Platform Constraints

Default constraints vary by platform. Override via add_*_channel(..., constraints=...) or channel.constraints at runtime.

Platform Reply Timeout Rate Limit Typing Keepalive Notes
WeCom 10s 2 / 5s Stream reply + placeholder on timeout
WeChat iLink 5s 5 / 60s 5s sendtyping API
Telegram 20 / 60s 4s When show_typing=True
MQTT 30 / 60s URL-based media payload
Xiaoyi 20 / 60s WebSocket dual connection
Feishu / QQ / DingTalk / Yuanbao Use custom constraints as needed
from harness_gateway import ChannelConstraints

constraints = ChannelConstraints(
    reply_timeout=10.0,
    send_rate_limit=(2, 5.0),
    timeout_strategy="placeholder",
    show_thinking=False,
    show_tool_hints=True,
    tool_hint_template="🔧 Calling tool: {tool_name}",
)

Media Backend

Inbound media is downloaded (with per-platform auth when needed), saved through MediaBackend, and stamped on ContentPart.local_path. Outbound sends read bytes via load_media_bytes (datalocal_pathurl).

from harness_gateway import ChannelManager, FileSystemMediaBackend

media = FileSystemMediaBackend("/tmp/harness-gateway/media")
manager = ChannelManager(processor=my_bot, media_backend=media)

Override fetch_remote_media() on your channel subclass when platform URLs require auth headers.


Proactive Push

Outbound routing uses ChannelSubject — auto-tracked on every inbound message (channel_subject.subject_id identifies the user).

# Single user (subject must have interacted before, or supply routing metadata)
subject = channel.get_subject("user_open_id")
if subject:
    await manager.push_text(channel_id, subject, "Your build completed ✅")

# Rich content
from harness_gateway import ImageContent, TextContent

await manager.push_content(channel_id, subject, [
    TextContent(text="📊 Daily Report:"),
    ImageContent(url="https://charts.example.com/daily.png"),
])

# Broadcast to all known subjects on a channel
await manager.push_to_all(channel_id, "👋 Scheduled greeting!")

Multi-Tenant Support

Run multiple instances of the same platform with isolated tenant_id:

from harness_gateway.channels.feishu import FeishuConfig

t1_id = await manager.add_feishu_channel(
    FeishuConfig(app_id="cli_t1", app_secret="secret1", tenant_id="tenant1")
)
t2_id = await manager.add_feishu_channel(
    FeishuConfig(app_id="cli_t2", app_secret="secret2", tenant_id="tenant2")
)

async def bot(msg: InboundMessage):
    # msg.tenant_id → "tenant1" or "tenant2"
    # msg.channel_id → UUID instance key
    yield MessageEvent.text(f"[{msg.tenant_id}] {msg.text}")
    yield MessageEvent.completed()

Custom Channels

Subclass BaseChannel, implement start / stop / _send_* / parse_inbound, then register:

from dataclasses import dataclass

from harness_gateway import BaseChannel, ChannelConfig, ChannelSubject, ContentPart, InboundMessage

@dataclass
class MyPlatformConfig(ChannelConfig):
    api_key: str = ""

class MyPlatformChannel(BaseChannel):
    channel_type = "myplatform"

    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def _send_text(self, subject: ChannelSubject, text: str) -> None: ...
    async def _send_content(self, subject: ChannelSubject, parts: list[ContentPart]) -> None: ...
    async def _send_media(self, subject: ChannelSubject, media: ContentPart) -> None: ...
    def parse_inbound(self, raw_payload) -> InboundMessage: ...

# Register an instance
channel = MyPlatformChannel(processor, MyPlatformConfig(api_key="..."))
channel_id = await manager.add_channel(channel)

See AGENTS.md for the full channel implementation specification.


Architecture

┌─────────────────┐
│  IM Platform    │   Feishu / QQ / WeCom / DingTalk / Telegram / MQTT / ...
└────────┬────────┘
         │ WebSocket / HTTP / MQTT
         ▼
┌─────────────────┐
│  BaseChannel    │   parse_inbound() → InboundMessage
│  (impl)         │   _send_text() / _send_content() / _send_media()
└────────┬────────┘
         │ enqueue
         ▼
┌─────────────────┐
│ ChannelManager  │   Queues + workers + session batching + MediaBackend
└────────┬────────┘
         │ InboundMessage
         ▼
┌─────────────────┐
│ Your Processor  │   async def(InboundMessage) → AsyncIterator[MessageEvent]
└────────┬────────┘
         │ MessageEvent (MESSAGE / DELTA / TOOL_START / ...)
         ▼
┌─────────────────┐
│ BaseChannel     │   Constraints + delta merge → platform API
└─────────────────┘

Development

Prerequisites: Python 3.11+, uv

git clone https://github.com/orcakit/harness-gateway.git
cd harness-gateway
make install           # uv sync --extra dev
make all               # lint + typecheck + test (CI ship bar)
Command Description
make lint Ruff check + format check
make format Ruff auto-fix + format
make typecheck mypy src
make test pytest -m "not integration"
make test-integration pytest -m integration (requires network)
make build wheel + sdist → dist/
make clean remove build artifacts and caches

Contributing

Contributions are welcome! Please read CONTRIBUTING.md and run make all before opening a PR.

Security issues: see SECURITY.md.


Related Projects

Project Description
Octop Self-hosted multi-user AI control plane
harness-agent Production-grade AI agent platform built on LangChain Deep Agents
harness-memory Pluggable memory system with hierarchical recall and FTS
harness-browser AI-friendly browser automation via CDP

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

harness_gateway-0.8.2.tar.gz (448.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

harness_gateway-0.8.2-py3-none-any.whl (108.9 kB view details)

Uploaded Python 3

File details

Details for the file harness_gateway-0.8.2.tar.gz.

File metadata

  • Download URL: harness_gateway-0.8.2.tar.gz
  • Upload date:
  • Size: 448.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for harness_gateway-0.8.2.tar.gz
Algorithm Hash digest
SHA256 55bfc9b82805c72c62198d751e2ec668878b5e537310f832fc1276d14e602ea6
MD5 36078099394b6c2900e880f9443c053f
BLAKE2b-256 ad5d6e1e310cc66af75ee55f7e5d445b20ee09b30a15de004e677b3de0eaa0f5

See more details on using hashes here.

File details

Details for the file harness_gateway-0.8.2-py3-none-any.whl.

File metadata

File hashes

Hashes for harness_gateway-0.8.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3d07b040da31fc05b8898c511053cb1fcf1332704320727a1a84a84d297a67c8
MD5 f0cd6b655878ea579c902ca982aea003
BLAKE2b-256 450b41f7700f6a508435ef6624c486044e5eb02cb8571dbf9ae935233a08fc67

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page