Pure async Python library for multi-channel conversations
Project description
RoomKit
Pure async Python 3.12+ library for multi-channel conversations.
RoomKit gives you a single abstraction — the room — to orchestrate messages across SMS, RCS, Email, WhatsApp, Messenger, Voice, WebSocket, HTTP webhooks, and AI channels. Events flow in through any channel, get validated by hooks, and broadcast to every other channel in the room with automatic content transcoding.
Inbound ──► Hook pipeline ──► Store ──► Broadcast to all channels
│
┌──────────┬──────────┬────────┬─────┼─────┬────────┬────────┬────────┐
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
SMS/RCS WhatsApp Email Teams Msgr Voice WS AI Webhook
Website: www.roomkit.live | Docs: www.roomkit.live/docs
Quickstart
pip install roomkit
import asyncio
from roomkit import (
ChannelCategory, HookResult, HookTrigger,
InboundMessage, MockAIProvider, RoomContext,
RoomEvent, RoomKit, TextContent, WebSocketChannel,
)
from roomkit.channels.ai import AIChannel
async def main():
kit = RoomKit()
# Register channels
ws = WebSocketChannel("ws-user")
ai = AIChannel("ai-bot", provider=MockAIProvider(responses=["Hello!"]))
kit.register_channel(ws)
kit.register_channel(ai)
# Create a room and attach channels
await kit.create_room(room_id="room-1")
await kit.attach_channel("room-1", "ws-user")
await kit.attach_channel("room-1", "ai-bot", category=ChannelCategory.INTELLIGENCE)
# Add a broadcast hook
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="filter")
async def block_spam(event: RoomEvent, ctx: RoomContext) -> HookResult:
if isinstance(event.content, TextContent) and "spam" in event.content.body:
return HookResult.block("spam detected")
return HookResult.allow()
# Process a message — it gets stored, broadcast, and the AI responds
result = await kit.process_inbound(
InboundMessage(channel_id="ws-user", sender_id="user-1", content=TextContent(body="Hi"))
)
print(result.blocked) # False
# View conversation history
for event in await kit.store.list_events("room-1"):
print(f"[{event.source.channel_id}] {event.content.body}")
asyncio.run(main())
More examples in examples/.
Installation
RoomKit's core has a single dependency (pydantic). Providers that call external APIs need optional extras:
pip install roomkit # core only
pip install roomkit[httpx] # HTTP-based providers (SMS, RCS, Email)
pip install roomkit[websocket] # WebSocket event source
pip install roomkit[anthropic] # Anthropic Claude AI
pip install roomkit[openai] # OpenAI GPT
pip install roomkit[gemini] # Google Gemini AI
pip install roomkit[teams] # Microsoft Teams (Bot Framework)
pip install roomkit[neonize] # WhatsApp Personal (neonize)
pip install roomkit[gradium] # Gradium STT + TTS
pip install roomkit[fastrtc] # FastRTC voice backend
pip install roomkit[realtime-gemini] # Gemini Live speech-to-speech
pip install roomkit[realtime-openai] # OpenAI Realtime speech-to-speech
pip install roomkit[providers] # all transport providers
pip install roomkit[all] # everything
For development:
git clone https://github.com/sboily/roomkit.git
cd roomkit
uv sync --extra dev
make all # lint + typecheck + test
Requires Python 3.12+.
Channels
Each channel is a thin adapter between the room and an external transport. All channels implement the same interface: handle_inbound() converts a provider message into a RoomEvent, and deliver() pushes events out.
| Channel | Type | Media | Notes |
|---|---|---|---|
| SMS | sms |
text, MMS | Max 1600 chars, delivery receipts |
| RCS | rcs |
text, rich, media | Rich cards, carousels, suggested actions |
email |
text, rich, media | Threading support | |
| WebSocket | websocket |
text, rich, media | Real-time with typing, reactions |
| Messenger | messenger |
text, rich, media, template | Buttons, quick replies |
| Teams | teams |
text, rich | Bot Framework SDK, proactive messaging, 28K chars |
whatsapp |
text, rich, media, location, template | Buttons, templates | |
| WhatsApp Personal | whatsapp_personal |
text, media, audio, location | Typing indicators, read receipts, neonize |
| Voice | voice |
audio, text | STT/TTS, audio pipeline, barge-in, FastRTC streaming |
| Realtime Voice | realtime_voice |
audio, text | Speech-to-speech AI (Gemini Live, OpenAI Realtime) |
| HTTP | webhook |
text, rich | Generic webhook for any system |
| AI | ai |
text, rich | Intelligence layer (not transport) |
Channels have two categories: transport (delivers to external systems) and intelligence (generates content, like AI). The Voice channel bridges real-time audio with the room-based conversation model.
Providers
Providers handle the actual API calls. Every provider has a mock counterpart for testing.
SMS Providers
| Provider | Features | Dependency |
|---|---|---|
TwilioSMSProvider |
SMS, MMS, delivery status | roomkit[httpx] |
TelnyxSMSProvider |
SMS, MMS, delivery status | roomkit[httpx] |
SinchSMSProvider |
SMS, delivery status | roomkit[httpx] |
VoiceMeUpSMSProvider |
SMS, MMS aggregation | roomkit[httpx] |
RCS Providers
| Provider | Features | Dependency |
|---|---|---|
TwilioRCSProvider |
Rich cards, carousels, actions | roomkit[httpx] |
TelnyxRCSProvider |
Rich cards, carousels, actions | roomkit[httpx] |
AI Providers
| Provider | Features | Dependency |
|---|---|---|
AnthropicAIProvider |
Claude, vision, tools | roomkit[anthropic] |
OpenAIAIProvider |
GPT-4, vision, tools | roomkit[openai] |
GeminiAIProvider |
Gemini, vision, tools | roomkit[gemini] |
create_vllm_provider |
Local LLM, OpenAI-compatible | roomkit[vllm] |
Voice Providers
| Provider | Role | Dependency |
|---|---|---|
DeepgramSTTProvider |
Speech-to-text | roomkit[httpx] |
ElevenLabsTTSProvider |
Text-to-speech | roomkit[httpx] |
GradiumSTTProvider |
Speech-to-text | roomkit[gradium] |
GradiumTTSProvider |
Text-to-speech | roomkit[gradium] |
SherpaOnnxSTTProvider |
Local STT (transducer/Whisper) | roomkit[sherpa-onnx] |
SherpaOnnxTTSProvider |
Local TTS (VITS/Piper) | roomkit[sherpa-onnx] |
FastRTCVoiceBackend |
WebRTC audio transport | roomkit[fastrtc] |
Audio Pipeline
The audio pipeline sits between the voice backend (transport) and STT, processing raw audio through pluggable inbound and outbound chains:
Inbound: Backend → [Resampler] → [Recorder] → [DTMF] → [AEC] → [AGC] → [Denoiser] → VAD → [Diarization] → STT
Outbound: TTS → [PostProcessors] → [Recorder] → AEC.feed_reference → [Resampler] → Backend
AEC and AGC stages are automatically skipped when the backend declares NATIVE_AEC / NATIVE_AGC capabilities.
| Component | Role | Required |
|---|---|---|
VADProvider |
Voice activity detection (speech start/end) | No |
DenoiserProvider |
Noise reduction before VAD | No |
DiarizationProvider |
Speaker identification | No |
AECProvider |
Acoustic Echo Cancellation | No |
AGCProvider |
Automatic Gain Control | No |
DTMFDetector |
DTMF tone detection (parallel with main chain) | No |
AudioRecorder |
Record inbound/outbound audio | No |
AudioPostProcessor |
Custom outbound frame transforms | No |
TurnDetector |
Post-STT turn completion detection | No |
BackchannelDetector |
Distinguish interruptions from acknowledgements | No |
All stages are optional — configure what you need:
from roomkit import VoiceChannel
from roomkit.voice.pipeline import AudioPipelineConfig, VADConfig
from roomkit.voice.interruption import InterruptionConfig, InterruptionStrategy
pipeline = AudioPipelineConfig(
vad=my_vad,
denoiser=my_denoiser,
diarization=my_diarizer,
aec=my_aec,
agc=my_agc,
dtmf=my_dtmf_detector,
recorder=my_recorder,
recording_config=my_recording_config,
turn_detector=my_turn_detector,
vad_config=VADConfig(silence_threshold_ms=500),
)
voice = VoiceChannel("voice", stt=stt, tts=tts, backend=backend, pipeline=pipeline)
# Realtime voice: denoiser + diarization only (AI provider handles VAD)
preprocess = AudioPipelineConfig(denoiser=my_denoiser, diarization=my_diarizer)
Interruption strategies control how user speech during TTS playback is handled:
voice = VoiceChannel(
"voice", stt=stt, tts=tts, backend=backend, pipeline=pipeline,
interruption=InterruptionConfig(strategy=InterruptionStrategy.CONFIRMED, min_speech_ms=300),
)
Four strategies: IMMEDIATE (interrupt on any speech), CONFIRMED (wait for sustained speech), SEMANTIC (use backchannel detection to ignore "uh-huh"), DISABLED (ignore speech during playback).
The pipeline fires events that the hook system can intercept: ON_SPEECH_START, ON_SPEECH_END, ON_VAD_SILENCE, ON_VAD_AUDIO_LEVEL, ON_SPEAKER_CHANGE, ON_DTMF, ON_TURN_COMPLETE, ON_TURN_INCOMPLETE, ON_BACKCHANNEL, ON_RECORDING_STARTED, ON_RECORDING_STOPPED. See examples/voice_pipeline.py for a complete example.
Realtime Voice (Speech-to-Speech)
| Component | Role | Dependency |
|---|---|---|
GeminiLiveProvider |
Speech-to-speech AI provider | roomkit[realtime-gemini] |
OpenAIRealtimeProvider |
Speech-to-speech AI provider | roomkit[realtime-openai] |
WebSocketRealtimeTransport |
Browser-to-server audio (WebSocket) | roomkit[websocket] |
FastRTCRealtimeTransport |
Browser-to-server audio (WebRTC) | roomkit[fastrtc] |
Teams Providers
| Provider | Features | Dependency |
|---|---|---|
BotFrameworkTeamsProvider |
Proactive messaging, conversation references, bot mention detection | roomkit[teams] |
WhatsApp Personal Providers
| Provider | Features | Dependency |
|---|---|---|
NeonizeWhatsAppProvider |
Multidevice, typing indicators, read receipts, media | roomkit[neonize] |
Other Providers
| Provider | Channel | Dependency |
|---|---|---|
ElasticEmailProvider |
roomkit[httpx] |
|
FacebookMessengerProvider |
Messenger | roomkit[httpx] |
WebhookHTTPProvider |
HTTP | roomkit[httpx] |
Each HTTP-based provider lazy-imports httpx so the core library stays lightweight.
Hooks
Hooks intercept events at specific points in the pipeline. Sync hooks can block or modify events; async hooks run after the fact for logging or side effects.
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="compliance_check")
async def check(event: RoomEvent, ctx: RoomContext) -> HookResult:
# Block, allow, or modify the event
return HookResult.allow()
Triggers: BEFORE_BROADCAST, AFTER_BROADCAST, ON_ROOM_CREATED, ON_ROOM_PAUSED, ON_ROOM_CLOSED, ON_CHANNEL_ATTACHED, ON_CHANNEL_DETACHED, ON_CHANNEL_MUTED, ON_CHANNEL_UNMUTED, ON_IDENTITY_AMBIGUOUS, ON_IDENTITY_UNKNOWN, ON_PARTICIPANT_IDENTIFIED, ON_TASK_CREATED, ON_DELIVERY_STATUS, ON_ERROR, ON_SPEECH_START, ON_SPEECH_END, ON_TRANSCRIPTION, BEFORE_TTS, AFTER_TTS, ON_TTS_CANCELLED, ON_BARGE_IN, ON_VAD_SILENCE, ON_VAD_AUDIO_LEVEL, ON_SPEAKER_CHANGE, ON_DTMF, ON_TURN_COMPLETE, ON_TURN_INCOMPLETE, ON_BACKCHANNEL, ON_RECORDING_STARTED, ON_RECORDING_STOPPED, ON_REALTIME_TOOL_CALL, ON_REALTIME_TEXT_INJECTED, ON_OBSERVATION.
Hooks support filtering by channel type, channel ID, and direction:
@kit.hook(
HookTrigger.BEFORE_BROADCAST,
channel_types={ChannelType.SMS},
directions={ChannelDirection.INBOUND},
)
async def sms_only_hook(event, ctx):
return HookResult.allow()
Hooks can also inject side-effect events, create tasks, and record observations.
AI Integration
Per-Room AI Configuration
Configure AI behavior per room with custom system prompts, temperature, and tools:
from roomkit import AIConfig, AITool
room = await kit.create_room(
room_id="support-room",
ai_config=AIConfig(
system_prompt="You are a helpful support agent.",
temperature=0.7,
tools=[
AITool(
name="lookup_order",
description="Look up order status",
parameters={"type": "object", "properties": {"order_id": {"type": "string"}}}
)
],
),
)
Function Calling
AI providers support function calling with automatic tool result handling:
response = await ai_provider.generate(context)
if response.tool_calls:
for call in response.tool_calls:
result = await execute_tool(call.name, call.arguments)
# Feed result back to AI
Realtime Events
Handle ephemeral events like typing indicators, presence, and read receipts:
from roomkit import EphemeralEvent, EphemeralEventType
# Subscribe to realtime events
async def handle_realtime(event: EphemeralEvent):
if event.type == EphemeralEventType.TYPING_START:
print(f"{event.user_id} is typing...")
sub_id = await kit.subscribe_room("room-1", handle_realtime)
# Publish typing indicator
await kit.publish_typing("room-1", "user-1")
# Publish presence
await kit.publish_presence("room-1", "user-1", "online")
# Publish read receipt
await kit.publish_read_receipt("room-1", "user-1", "event-123")
For distributed deployments, implement a custom RealtimeBackend (e.g., Redis pub/sub).
Identity Resolution
Resolve unknown senders to known identities with a pluggable pipeline:
from roomkit import IdentityResolver, IdentityResult, IdentificationStatus, Identity
class MyResolver(IdentityResolver):
async def resolve(self, message, context):
user = await lookup(message.sender_id)
if user:
return IdentityResult(
status=IdentificationStatus.IDENTIFIED,
identity=Identity(id=user.id, display_name=user.name),
)
return IdentityResult(status=IdentificationStatus.UNKNOWN)
kit = RoomKit(identity_resolver=MyResolver())
Channel Type Filtering
Restrict identity resolution to specific channel types:
kit = RoomKit(
identity_resolver=MyResolver(),
identity_channel_types={ChannelType.SMS}, # Only resolve for SMS
)
Supports identified, pending, ambiguous, challenge, and rejected outcomes with hook-based customization.
Webhook Processing
Process provider webhooks with automatic parsing and delivery status tracking:
# Generic webhook processing
result = await kit.process_webhook(
channel_id="sms-channel",
raw_payload=request_body,
headers=request_headers,
)
# Handle delivery status updates
@kit.on_delivery_status
async def handle_status(status: DeliveryStatus):
print(f"Message {status.provider_message_id}: {status.status}")
Event-Driven Sources
For persistent connections (WebSocket, NATS, SSE), use SourceProviders instead of webhooks:
from roomkit import RoomKit, BaseSourceProvider, SourceStatus
from roomkit.sources import WebSocketSource
kit = RoomKit()
# Built-in WebSocket source
source = WebSocketSource(
url="wss://chat.example.com/events",
channel_id="websocket-chat",
)
# Attach with resilience options
await kit.attach_source(
"websocket-chat",
source,
auto_restart=True, # Restart on failure
max_restart_attempts=10, # Give up after 10 failures
max_concurrent_emits=20, # Backpressure control
)
# Monitor health
health = await kit.source_health("websocket-chat")
print(f"Status: {health.status}, Messages: {health.messages_received}")
# Detach when done
await kit.detach_source("websocket-chat")
Webhook vs Event-Driven:
| Aspect | Webhooks | Event Sources |
|---|---|---|
| Connection | Stateless HTTP | Persistent (WS, TCP, etc.) |
| Initiative | External system pushes | RoomKit subscribes |
| Use cases | Twilio, SendGrid | WebSocket, NATS, SSE |
Create custom sources by extending BaseSourceProvider:
class NATSSource(BaseSourceProvider):
@property
def name(self) -> str:
return "nats:events"
async def start(self, emit) -> None:
self._set_status(SourceStatus.CONNECTED)
async for msg in self.subscribe():
await emit(parse_message(msg))
self._record_message()
Features: exponential backoff, max restart attempts, backpressure control, health monitoring.
Resilience
Built-in patterns for production reliability:
- Retry with backoff — configurable per-channel retry policy with exponential backoff
- Circuit breaker — isolates failing providers so one broken channel doesn't bring down the room
- Rate limiting — token bucket limiter with per-second/minute/hour limits per channel
- Content transcoding — automatic conversion between channel capabilities (e.g. rich to text fallback)
- Chain depth tracking — prevents infinite event loops between channels
Configure per binding:
from roomkit import RetryPolicy, RateLimit
await kit.attach_channel("room-1", "sms-out",
metadata={"phone_number": "+15551234567"},
retry_policy=RetryPolicy(max_retries=3, base_delay_seconds=1.0),
rate_limit=RateLimit(max_per_second=5.0),
)
Scaling
Per-room locking
RoomKit serializes event processing per room using a RoomLockManager. The default InMemoryLockManager works for single-process deployments. For multi-process or distributed setups, subclass RoomLockManager with a distributed lock (Redis, Postgres advisory locks, etc.):
from roomkit import RoomKit, RoomLockManager
# Single process (default)
kit = RoomKit()
# Distributed
kit = RoomKit(lock_manager=MyRedisLockManager())
Backpressure
RoomKit does not enforce a global concurrency limit on process_inbound calls. Each call acquires a per-room lock internally, but across different rooms, processing is fully concurrent.
To prevent resource exhaustion, add concurrency control upstream:
import asyncio
from roomkit import RoomKit, InboundMessage, InboundResult
kit = RoomKit()
semaphore = asyncio.Semaphore(100) # max 100 concurrent rooms processing
async def handle_webhook(message: InboundMessage) -> InboundResult:
async with semaphore:
return await kit.process_inbound(message)
Room Lifecycle
Rooms transition through states automatically based on activity timers:
ACTIVE ──(inactive timeout)──► PAUSED ──(closed timeout)──► CLOSED
from roomkit import RoomTimers
await kit.create_room(
room_id="support-123",
timers=RoomTimers(inactive_after_seconds=300, closed_after_seconds=3600),
)
# Check and apply timer transitions
transitioned = await kit.sweep_room_timers()
Storage
The ConversationStore ABC defines the persistence interface. InMemoryStore is included for development and testing. Implement the ABC to use any database.
kit = RoomKit() # uses InMemoryStore by default
kit = RoomKit(store=MyPostgresStore()) # plug in your own
The store handles rooms, events, bindings, participants, identities, tasks, and observations.
AI Assistant Support
RoomKit includes files to help AI coding assistants understand the library:
- llms.txt — Structured documentation for LLM context windows
- AGENTS.md — Coding guidelines and patterns for AI assistants
- MCP Integration — Model Context Protocol support
Access programmatically:
from roomkit import get_llms_txt, get_agents_md
llms_content = get_llms_txt()
agents_content = get_agents_md()
Project Structure
src/roomkit/
channels/ Channel implementations (sms, rcs, email, websocket, ai, ...)
core/ Framework, hooks, routing, retry, circuit breaker
identity/ Identity resolution pipeline
models/ Pydantic data models and enums
providers/ Provider implementations (sms/, rcs/, email/, teams/, messenger/, ...)
realtime/ Ephemeral events (typing, presence, read receipts)
sources/ Event-driven sources (WebSocket, neonize, custom)
store/ Storage abstraction and in-memory implementation
voice/ Voice subsystem (stt/, tts/, backends/, pipeline/, realtime/)
Documentation
- Website — Landing page and overview
- Documentation — Full documentation
- API Reference — Complete API docs
- RFC — Design document
Contributing
See CONTRIBUTING.md and CODE_OF_CONDUCT.md. Quick version:
uv sync --extra dev
make all # ruff check + mypy --strict + pytest
All new code needs tests. Aim for >90% coverage.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file roomkit-0.4.5.tar.gz.
File metadata
- Download URL: roomkit-0.4.5.tar.gz
- Upload date:
- Size: 407.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
79373558cdc6bdb7256939dbd41f6e3929b06c1d5fbe444b02c7d5817a5d93f9
|
|
| MD5 |
ac8f008bcef0cbe0da0a160096b7b962
|
|
| BLAKE2b-256 |
a03cad34ff6ce10cbd9ac1bb1017df1008c30662d121c33ce13b0f28ca3fab79
|
File details
Details for the file roomkit-0.4.5-py3-none-any.whl.
File metadata
- Download URL: roomkit-0.4.5-py3-none-any.whl
- Upload date:
- Size: 366.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3156a44df1bd1a94be164113a0cfa77a27e25d9d498d1b0c40bdb677b2c5b9d4
|
|
| MD5 |
7fb557ad8604950e0fecedeff5f30c79
|
|
| BLAKE2b-256 |
b7c9fd959ad8426e05f740efdcd07ea571cda10ff69d9b31ce4a150665318a0e
|