Pure async Python library for multi-channel conversations
Project description
RoomKit
Pure async Python 3.12+ library for multi-channel conversations.
RoomKit gives you a single abstraction — the room — to orchestrate messages across SMS, RCS, Email, WhatsApp, Messenger, WebSocket, HTTP webhooks, and AI channels. Events flow in through any channel, get validated by hooks, and broadcast to every other channel in the room with automatic content transcoding.
Inbound ──► Hook pipeline ──► Store ──► Broadcast to all channels
│
┌──────────┬──────────┬────────┼────────┬────────┐
▼ ▼ ▼ ▼ ▼ ▼
SMS/RCS WebSocket Email Messenger AI Webhook
Website: www.roomkit.live | Docs: www.roomkit.live/docs
Quickstart
pip install roomkit
import asyncio
from roomkit import (
ChannelCategory, HookResult, HookTrigger,
InboundMessage, MockAIProvider, RoomContext,
RoomEvent, RoomKit, TextContent, WebSocketChannel,
)
from roomkit.channels.ai import AIChannel
async def main():
kit = RoomKit()
# Register channels
ws = WebSocketChannel("ws-user")
ai = AIChannel("ai-bot", provider=MockAIProvider(responses=["Hello!"]))
kit.register_channel(ws)
kit.register_channel(ai)
# Create a room and attach channels
await kit.create_room(room_id="room-1")
await kit.attach_channel("room-1", "ws-user")
await kit.attach_channel("room-1", "ai-bot", category=ChannelCategory.INTELLIGENCE)
# Add a pre-inbound hook
@kit.hook(HookTrigger.PRE_INBOUND, name="filter")
async def block_spam(event: RoomEvent, ctx: RoomContext) -> HookResult:
if isinstance(event.content, TextContent) and "spam" in event.content.body:
return HookResult.block("spam detected")
return HookResult.allow()
# Process a message — it gets stored, broadcast, and the AI responds
result = await kit.process_inbound(
InboundMessage(channel_id="ws-user", sender_id="user-1", content=TextContent(body="Hi"))
)
print(result.blocked) # False
# View conversation history
for event in await kit.store.list_events("room-1"):
print(f"[{event.source.channel_id}] {event.content.body}")
asyncio.run(main())
More examples in examples/.
Installation
RoomKit's core has a single dependency (pydantic). Providers that call external APIs need optional extras:
pip install roomkit # core only
pip install roomkit[httpx] # HTTP-based providers (SMS, RCS, Email)
pip install roomkit[websocket] # WebSocket event source
pip install roomkit[anthropic] # Anthropic Claude AI
pip install roomkit[openai] # OpenAI GPT
pip install roomkit[gemini] # Google Gemini AI
pip install roomkit[providers] # all transport providers
pip install roomkit[all] # everything
For development:
git clone https://github.com/sboily/roomkit.git
cd roomkit
uv sync --extra dev
make all # lint + typecheck + test
Requires Python 3.12+.
Channels
Each channel is a thin adapter between the room and an external transport. All channels implement the same interface: handle_inbound() converts a provider message into a RoomEvent, and deliver() pushes events out.
| Channel | Type | Media | Notes |
|---|---|---|---|
| SMS | sms |
text, MMS | Max 1600 chars, delivery receipts |
| RCS | rcs |
text, rich, media | Rich cards, carousels, suggested actions |
email |
text, rich, media | Threading support | |
| WebSocket | websocket |
text, rich, media | Real-time with typing, reactions |
| Messenger | messenger |
text, rich, media, template | Buttons, quick replies |
whatsapp |
text, rich, media, location, template | Buttons, templates | |
| HTTP | webhook |
text, rich | Generic webhook for any system |
| AI | ai |
text, rich | Intelligence layer (not transport) |
Channels have two categories: transport (delivers to external systems) and intelligence (generates content, like AI).
Providers
Providers handle the actual API calls. Every provider has a mock counterpart for testing.
SMS Providers
| Provider | Features | Dependency |
|---|---|---|
TwilioSMSProvider |
SMS, MMS, delivery status | roomkit[httpx] |
TelnyxSMSProvider |
SMS, MMS, delivery status | roomkit[httpx] |
SinchSMSProvider |
SMS, delivery status | roomkit[httpx] |
VoiceMeUpSMSProvider |
SMS, MMS aggregation | roomkit[httpx] |
RCS Providers
| Provider | Features | Dependency |
|---|---|---|
TwilioRCSProvider |
Rich cards, carousels, actions | roomkit[httpx] |
TelnyxRCSProvider |
Rich cards, carousels, actions | roomkit[httpx] |
AI Providers
| Provider | Features | Dependency |
|---|---|---|
AnthropicAIProvider |
Claude, vision, tools | roomkit[anthropic] |
OpenAIAIProvider |
GPT-4, vision, tools | roomkit[openai] |
GeminiAIProvider |
Gemini, vision, tools | roomkit[gemini] |
Other Providers
| Provider | Channel | Dependency |
|---|---|---|
ElasticEmailProvider |
roomkit[httpx] |
|
FacebookMessengerProvider |
Messenger | roomkit[httpx] |
WebhookHTTPProvider |
HTTP | roomkit[httpx] |
Each HTTP-based provider lazy-imports httpx so the core library stays lightweight.
Hooks
Hooks intercept events at specific points in the pipeline. Sync hooks can block or modify events; async hooks run after the fact for logging or side effects.
@kit.hook(HookTrigger.PRE_INBOUND, name="compliance_check")
async def check(event: RoomEvent, ctx: RoomContext) -> HookResult:
# Block, allow, or modify the event
return HookResult.allow()
Triggers: PRE_INBOUND, POST_INBOUND, PRE_DELIVERY, POST_DELIVERY, BEFORE_BROADCAST, AFTER_BROADCAST, ON_ROOM_CREATED, ON_ROOM_PAUSED, ON_ROOM_CLOSED, ON_CHANNEL_ATTACHED, ON_CHANNEL_DETACHED, ON_IDENTITY_AMBIGUOUS, ON_IDENTITY_UNKNOWN, ON_DELIVERY_STATUS, ON_ERROR.
Hooks support filtering by channel type, channel ID, and direction:
@kit.hook(
HookTrigger.PRE_INBOUND,
channel_types={ChannelType.SMS},
directions={ChannelDirection.INBOUND},
)
async def sms_only_hook(event, ctx):
return HookResult.allow()
Hooks can also inject side-effect events, create tasks, and record observations.
AI Integration
Per-Room AI Configuration
Configure AI behavior per room with custom system prompts, temperature, and tools:
from roomkit import AIConfig, AITool
room = await kit.create_room(
room_id="support-room",
ai_config=AIConfig(
system_prompt="You are a helpful support agent.",
temperature=0.7,
tools=[
AITool(
name="lookup_order",
description="Look up order status",
parameters={"type": "object", "properties": {"order_id": {"type": "string"}}}
)
],
),
)
Function Calling
AI providers support function calling with automatic tool result handling:
response = await ai_provider.generate(context)
if response.tool_calls:
for call in response.tool_calls:
result = await execute_tool(call.name, call.arguments)
# Feed result back to AI
Realtime Events
Handle ephemeral events like typing indicators, presence, and read receipts:
from roomkit import EphemeralEvent, EphemeralEventType
# Subscribe to realtime events
async def handle_realtime(event: EphemeralEvent):
if event.type == EphemeralEventType.TYPING_START:
print(f"{event.user_id} is typing...")
sub_id = await kit.subscribe_room("room-1", handle_realtime)
# Publish typing indicator
await kit.publish_typing("room-1", "user-1")
# Publish presence
await kit.publish_presence("room-1", "user-1", "online")
# Publish read receipt
await kit.publish_read_receipt("room-1", "user-1", "event-123")
For distributed deployments, implement a custom RealtimeBackend (e.g., Redis pub/sub).
Identity Resolution
Resolve unknown senders to known identities with a pluggable pipeline:
from roomkit import IdentityResolver, IdentityResult, IdentificationStatus, Identity
class MyResolver(IdentityResolver):
async def resolve(self, message, context):
user = await lookup(message.sender_id)
if user:
return IdentityResult(
status=IdentificationStatus.IDENTIFIED,
identity=Identity(id=user.id, display_name=user.name),
)
return IdentityResult(status=IdentificationStatus.UNKNOWN)
kit = RoomKit(identity_resolver=MyResolver())
Channel Type Filtering
Restrict identity resolution to specific channel types:
kit = RoomKit(
identity_resolver=MyResolver(),
identity_channel_types={ChannelType.SMS}, # Only resolve for SMS
)
Supports identified, pending, ambiguous, challenge, and rejected outcomes with hook-based customization.
Webhook Processing
Process provider webhooks with automatic parsing and delivery status tracking:
# Generic webhook processing
result = await kit.process_webhook(
channel_id="sms-channel",
raw_payload=request_body,
headers=request_headers,
)
# Handle delivery status updates
@kit.on_delivery_status
async def handle_status(status: DeliveryStatus):
print(f"Message {status.provider_message_id}: {status.status}")
Event-Driven Sources
For persistent connections (WebSocket, NATS, SSE), use SourceProviders instead of webhooks:
from roomkit import RoomKit, BaseSourceProvider, SourceStatus
from roomkit.sources import WebSocketSource
kit = RoomKit()
# Built-in WebSocket source
source = WebSocketSource(
url="wss://chat.example.com/events",
channel_id="websocket-chat",
)
# Attach with resilience options
await kit.attach_source(
"websocket-chat",
source,
auto_restart=True, # Restart on failure
max_restart_attempts=10, # Give up after 10 failures
max_concurrent_emits=20, # Backpressure control
)
# Monitor health
health = await kit.source_health("websocket-chat")
print(f"Status: {health.status}, Messages: {health.messages_received}")
# Detach when done
await kit.detach_source("websocket-chat")
Webhook vs Event-Driven:
| Aspect | Webhooks | Event Sources |
|---|---|---|
| Connection | Stateless HTTP | Persistent (WS, TCP, etc.) |
| Initiative | External system pushes | RoomKit subscribes |
| Use cases | Twilio, SendGrid | WebSocket, NATS, SSE |
Create custom sources by extending BaseSourceProvider:
class NATSSource(BaseSourceProvider):
@property
def name(self) -> str:
return "nats:events"
async def start(self, emit) -> None:
self._set_status(SourceStatus.CONNECTED)
async for msg in self.subscribe():
await emit(parse_message(msg))
self._record_message()
Features: exponential backoff, max restart attempts, backpressure control, health monitoring.
Resilience
Built-in patterns for production reliability:
- Retry with backoff — configurable per-channel retry policy with exponential backoff
- Circuit breaker — isolates failing providers so one broken channel doesn't bring down the room
- Rate limiting — token bucket limiter with per-second/minute/hour limits per channel
- Content transcoding — automatic conversion between channel capabilities (e.g. rich to text fallback)
- Chain depth tracking — prevents infinite event loops between channels
Configure per binding:
from roomkit import RetryPolicy, RateLimit
await kit.attach_channel("room-1", "sms-out",
metadata={"phone_number": "+15551234567"},
retry_policy=RetryPolicy(max_retries=3, base_delay_seconds=1.0),
rate_limit=RateLimit(max_per_second=5.0),
)
Scaling
Per-room locking
RoomKit serializes event processing per room using a RoomLockManager. The default InMemoryLockManager works for single-process deployments. For multi-process or distributed setups, subclass RoomLockManager with a distributed lock (Redis, Postgres advisory locks, etc.):
from roomkit import RoomKit, RoomLockManager
# Single process (default)
kit = RoomKit()
# Distributed
kit = RoomKit(lock_manager=MyRedisLockManager())
Backpressure
RoomKit does not enforce a global concurrency limit on process_inbound calls. Each call acquires a per-room lock internally, but across different rooms, processing is fully concurrent.
To prevent resource exhaustion, add concurrency control upstream:
import asyncio
from roomkit import RoomKit, InboundMessage, InboundResult
kit = RoomKit()
semaphore = asyncio.Semaphore(100) # max 100 concurrent rooms processing
async def handle_webhook(message: InboundMessage) -> InboundResult:
async with semaphore:
return await kit.process_inbound(message)
Room Lifecycle
Rooms transition through states automatically based on activity timers:
ACTIVE ──(inactive timeout)──► PAUSED ──(closed timeout)──► CLOSED
from roomkit import RoomTimers
await kit.create_room(
room_id="support-123",
timers=RoomTimers(inactive_after_seconds=300, closed_after_seconds=3600),
)
# Check and apply timer transitions
transitioned = await kit.sweep_room_timers()
Storage
The ConversationStore ABC defines the persistence interface. InMemoryStore is included for development and testing. Implement the ABC to use any database.
kit = RoomKit() # uses InMemoryStore by default
kit = RoomKit(store=MyPostgresStore()) # plug in your own
The store handles rooms, events, bindings, participants, identities, tasks, and observations.
AI Assistant Support
RoomKit includes files to help AI coding assistants understand the library:
- llms.txt — Structured documentation for LLM context windows
- AGENTS.md — Coding guidelines and patterns for AI assistants
- MCP Integration — Model Context Protocol support
Access programmatically:
from roomkit import get_llms_txt, get_agents_md
llms_content = get_llms_txt()
agents_content = get_agents_md()
Project Structure
src/roomkit/
channels/ Channel implementations (sms, rcs, email, websocket, ai, ...)
core/ Framework, hooks, routing, retry, circuit breaker
identity/ Identity resolution pipeline
models/ Pydantic data models and enums
providers/ Provider implementations grouped by vendor
realtime/ Ephemeral events (typing, presence, read receipts)
sources/ Event-driven sources (WebSocket, custom)
store/ Storage abstraction and in-memory implementation
Documentation
- Website — Landing page and overview
- Documentation — Full documentation
- API Reference — Complete API docs
- RFC — Design document
Contributing
See CONTRIBUTING.md and CODE_OF_CONDUCT.md. Quick version:
uv sync --extra dev
make all # ruff check + mypy --strict + pytest
All new code needs tests. Aim for >90% coverage.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file roomkit-0.1.1.tar.gz.
File metadata
- Download URL: roomkit-0.1.1.tar.gz
- Upload date:
- Size: 254.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
567ac2159428e9c0550c94ea3a7284aa9f945327dad20d78dffff7d73c34d17a
|
|
| MD5 |
b574fef60dd17c54ac4bcb8a7b356a02
|
|
| BLAKE2b-256 |
0c3d777ebffb67c4073c011dbb96f19d83b3ffd3e6857f178c6a5c53748d82e9
|
File details
Details for the file roomkit-0.1.1-py3-none-any.whl.
File metadata
- Download URL: roomkit-0.1.1-py3-none-any.whl
- Upload date:
- Size: 135.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
66d514d472ec2dc24fd038a64d50eb8b3319d3524a657acc0c9e6f9646422470
|
|
| MD5 |
55c4b7bffdacb0efc23c81114040167e
|
|
| BLAKE2b-256 |
affcea132acb7f32b213b52ad89a61708db00177c2ff1cfa49b1b9de8dbd501d
|