Skip to main content

Pure async Python library for multi-channel conversations

Project description

RoomKit

PyPI Python License

Pure async Python 3.12+ library for multi-channel conversations.

RoomKit gives you a single abstraction — the room — to orchestrate messages across SMS, RCS, Email, WhatsApp, Messenger, Voice, WebSocket, HTTP webhooks, and AI channels. Events flow in through any channel, get validated by hooks, and broadcast to every other channel in the room with automatic content transcoding.

Inbound ──► Hook pipeline ──► Store ──► Broadcast to all channels
                                             │
              ┌──────────┬──────────┬────────┼────────┬────────┬────────┐
              ▼          ▼          ▼        ▼        ▼        ▼        ▼
           SMS/RCS   WebSocket   Email   Messenger  Voice     AI    Webhook

Website: www.roomkit.live | Docs: www.roomkit.live/docs

Quickstart

pip install roomkit
import asyncio
from roomkit import (
    ChannelCategory, HookResult, HookTrigger,
    InboundMessage, MockAIProvider, RoomContext,
    RoomEvent, RoomKit, TextContent, WebSocketChannel,
)
from roomkit.channels.ai import AIChannel

async def main():
    kit = RoomKit()

    # Register channels
    ws = WebSocketChannel("ws-user")
    ai = AIChannel("ai-bot", provider=MockAIProvider(responses=["Hello!"]))
    kit.register_channel(ws)
    kit.register_channel(ai)

    # Create a room and attach channels
    await kit.create_room(room_id="room-1")
    await kit.attach_channel("room-1", "ws-user")
    await kit.attach_channel("room-1", "ai-bot", category=ChannelCategory.INTELLIGENCE)

    # Add a pre-inbound hook
    @kit.hook(HookTrigger.PRE_INBOUND, name="filter")
    async def block_spam(event: RoomEvent, ctx: RoomContext) -> HookResult:
        if isinstance(event.content, TextContent) and "spam" in event.content.body:
            return HookResult.block("spam detected")
        return HookResult.allow()

    # Process a message — it gets stored, broadcast, and the AI responds
    result = await kit.process_inbound(
        InboundMessage(channel_id="ws-user", sender_id="user-1", content=TextContent(body="Hi"))
    )
    print(result.blocked)  # False

    # View conversation history
    for event in await kit.store.list_events("room-1"):
        print(f"[{event.source.channel_id}] {event.content.body}")

asyncio.run(main())

More examples in examples/.

Installation

RoomKit's core has a single dependency (pydantic). Providers that call external APIs need optional extras:

pip install roomkit                    # core only
pip install roomkit[httpx]             # HTTP-based providers (SMS, RCS, Email)
pip install roomkit[websocket]         # WebSocket event source
pip install roomkit[anthropic]         # Anthropic Claude AI
pip install roomkit[openai]            # OpenAI GPT
pip install roomkit[gemini]            # Google Gemini AI
pip install roomkit[fastrtc]           # FastRTC voice backend
pip install roomkit[providers]         # all transport providers
pip install roomkit[all]               # everything

For development:

git clone https://github.com/sboily/roomkit.git
cd roomkit
uv sync --extra dev
make all                               # lint + typecheck + test

Requires Python 3.12+.

Channels

Each channel is a thin adapter between the room and an external transport. All channels implement the same interface: handle_inbound() converts a provider message into a RoomEvent, and deliver() pushes events out.

Channel Type Media Notes
SMS sms text, MMS Max 1600 chars, delivery receipts
RCS rcs text, rich, media Rich cards, carousels, suggested actions
Email email text, rich, media Threading support
WebSocket websocket text, rich, media Real-time with typing, reactions
Messenger messenger text, rich, media, template Buttons, quick replies
WhatsApp whatsapp text, rich, media, location, template Buttons, templates
Voice voice audio, text STT/TTS, barge-in, FastRTC streaming
HTTP webhook text, rich Generic webhook for any system
AI ai text, rich Intelligence layer (not transport)

Channels have two categories: transport (delivers to external systems) and intelligence (generates content, like AI). The Voice channel bridges real-time audio with the room-based conversation model.

Providers

Providers handle the actual API calls. Every provider has a mock counterpart for testing.

SMS Providers

Provider Features Dependency
TwilioSMSProvider SMS, MMS, delivery status roomkit[httpx]
TelnyxSMSProvider SMS, MMS, delivery status roomkit[httpx]
SinchSMSProvider SMS, delivery status roomkit[httpx]
VoiceMeUpSMSProvider SMS, MMS aggregation roomkit[httpx]

RCS Providers

Provider Features Dependency
TwilioRCSProvider Rich cards, carousels, actions roomkit[httpx]
TelnyxRCSProvider Rich cards, carousels, actions roomkit[httpx]

AI Providers

Provider Features Dependency
AnthropicAIProvider Claude, vision, tools roomkit[anthropic]
OpenAIAIProvider GPT-4, vision, tools roomkit[openai]
GeminiAIProvider Gemini, vision, tools roomkit[gemini]

Voice Providers

Provider Role Dependency
DeepgramSTTProvider Speech-to-text roomkit[httpx]
ElevenLabsTTSProvider Text-to-speech roomkit[httpx]
FastRTCVoiceBackend WebRTC audio transport roomkit[fastrtc]

Other Providers

Provider Channel Dependency
ElasticEmailProvider Email roomkit[httpx]
FacebookMessengerProvider Messenger roomkit[httpx]
WebhookHTTPProvider HTTP roomkit[httpx]

Each HTTP-based provider lazy-imports httpx so the core library stays lightweight.

Hooks

Hooks intercept events at specific points in the pipeline. Sync hooks can block or modify events; async hooks run after the fact for logging or side effects.

@kit.hook(HookTrigger.PRE_INBOUND, name="compliance_check")
async def check(event: RoomEvent, ctx: RoomContext) -> HookResult:
    # Block, allow, or modify the event
    return HookResult.allow()

Triggers: PRE_INBOUND, POST_INBOUND, PRE_DELIVERY, POST_DELIVERY, BEFORE_BROADCAST, AFTER_BROADCAST, ON_ROOM_CREATED, ON_ROOM_PAUSED, ON_ROOM_CLOSED, ON_CHANNEL_ATTACHED, ON_CHANNEL_DETACHED, ON_IDENTITY_AMBIGUOUS, ON_IDENTITY_UNKNOWN, ON_DELIVERY_STATUS, ON_ERROR.

Hooks support filtering by channel type, channel ID, and direction:

@kit.hook(
    HookTrigger.PRE_INBOUND,
    channel_types={ChannelType.SMS},
    directions={ChannelDirection.INBOUND},
)
async def sms_only_hook(event, ctx):
    return HookResult.allow()

Hooks can also inject side-effect events, create tasks, and record observations.

AI Integration

Per-Room AI Configuration

Configure AI behavior per room with custom system prompts, temperature, and tools:

from roomkit import AIConfig, AITool

room = await kit.create_room(
    room_id="support-room",
    ai_config=AIConfig(
        system_prompt="You are a helpful support agent.",
        temperature=0.7,
        tools=[
            AITool(
                name="lookup_order",
                description="Look up order status",
                parameters={"type": "object", "properties": {"order_id": {"type": "string"}}}
            )
        ],
    ),
)

Function Calling

AI providers support function calling with automatic tool result handling:

response = await ai_provider.generate(context)
if response.tool_calls:
    for call in response.tool_calls:
        result = await execute_tool(call.name, call.arguments)
        # Feed result back to AI

Realtime Events

Handle ephemeral events like typing indicators, presence, and read receipts:

from roomkit import EphemeralEvent, EphemeralEventType

# Subscribe to realtime events
async def handle_realtime(event: EphemeralEvent):
    if event.type == EphemeralEventType.TYPING_START:
        print(f"{event.user_id} is typing...")

sub_id = await kit.subscribe_room("room-1", handle_realtime)

# Publish typing indicator
await kit.publish_typing("room-1", "user-1")

# Publish presence
await kit.publish_presence("room-1", "user-1", "online")

# Publish read receipt
await kit.publish_read_receipt("room-1", "user-1", "event-123")

For distributed deployments, implement a custom RealtimeBackend (e.g., Redis pub/sub).

Identity Resolution

Resolve unknown senders to known identities with a pluggable pipeline:

from roomkit import IdentityResolver, IdentityResult, IdentificationStatus, Identity

class MyResolver(IdentityResolver):
    async def resolve(self, message, context):
        user = await lookup(message.sender_id)
        if user:
            return IdentityResult(
                status=IdentificationStatus.IDENTIFIED,
                identity=Identity(id=user.id, display_name=user.name),
            )
        return IdentityResult(status=IdentificationStatus.UNKNOWN)

kit = RoomKit(identity_resolver=MyResolver())

Channel Type Filtering

Restrict identity resolution to specific channel types:

kit = RoomKit(
    identity_resolver=MyResolver(),
    identity_channel_types={ChannelType.SMS},  # Only resolve for SMS
)

Supports identified, pending, ambiguous, challenge, and rejected outcomes with hook-based customization.

Webhook Processing

Process provider webhooks with automatic parsing and delivery status tracking:

# Generic webhook processing
result = await kit.process_webhook(
    channel_id="sms-channel",
    raw_payload=request_body,
    headers=request_headers,
)

# Handle delivery status updates
@kit.on_delivery_status
async def handle_status(status: DeliveryStatus):
    print(f"Message {status.provider_message_id}: {status.status}")

Event-Driven Sources

For persistent connections (WebSocket, NATS, SSE), use SourceProviders instead of webhooks:

from roomkit import RoomKit, BaseSourceProvider, SourceStatus
from roomkit.sources import WebSocketSource

kit = RoomKit()

# Built-in WebSocket source
source = WebSocketSource(
    url="wss://chat.example.com/events",
    channel_id="websocket-chat",
)

# Attach with resilience options
await kit.attach_source(
    "websocket-chat",
    source,
    auto_restart=True,           # Restart on failure
    max_restart_attempts=10,     # Give up after 10 failures
    max_concurrent_emits=20,     # Backpressure control
)

# Monitor health
health = await kit.source_health("websocket-chat")
print(f"Status: {health.status}, Messages: {health.messages_received}")

# Detach when done
await kit.detach_source("websocket-chat")

Webhook vs Event-Driven:

Aspect Webhooks Event Sources
Connection Stateless HTTP Persistent (WS, TCP, etc.)
Initiative External system pushes RoomKit subscribes
Use cases Twilio, SendGrid WebSocket, NATS, SSE

Create custom sources by extending BaseSourceProvider:

class NATSSource(BaseSourceProvider):
    @property
    def name(self) -> str:
        return "nats:events"

    async def start(self, emit) -> None:
        self._set_status(SourceStatus.CONNECTED)
        async for msg in self.subscribe():
            await emit(parse_message(msg))
            self._record_message()

Features: exponential backoff, max restart attempts, backpressure control, health monitoring.

Resilience

Built-in patterns for production reliability:

  • Retry with backoff — configurable per-channel retry policy with exponential backoff
  • Circuit breaker — isolates failing providers so one broken channel doesn't bring down the room
  • Rate limiting — token bucket limiter with per-second/minute/hour limits per channel
  • Content transcoding — automatic conversion between channel capabilities (e.g. rich to text fallback)
  • Chain depth tracking — prevents infinite event loops between channels

Configure per binding:

from roomkit import RetryPolicy, RateLimit

await kit.attach_channel("room-1", "sms-out",
    metadata={"phone_number": "+15551234567"},
    retry_policy=RetryPolicy(max_retries=3, base_delay_seconds=1.0),
    rate_limit=RateLimit(max_per_second=5.0),
)

Scaling

Per-room locking

RoomKit serializes event processing per room using a RoomLockManager. The default InMemoryLockManager works for single-process deployments. For multi-process or distributed setups, subclass RoomLockManager with a distributed lock (Redis, Postgres advisory locks, etc.):

from roomkit import RoomKit, RoomLockManager

# Single process (default)
kit = RoomKit()

# Distributed
kit = RoomKit(lock_manager=MyRedisLockManager())

Backpressure

RoomKit does not enforce a global concurrency limit on process_inbound calls. Each call acquires a per-room lock internally, but across different rooms, processing is fully concurrent.

To prevent resource exhaustion, add concurrency control upstream:

import asyncio
from roomkit import RoomKit, InboundMessage, InboundResult

kit = RoomKit()
semaphore = asyncio.Semaphore(100)  # max 100 concurrent rooms processing

async def handle_webhook(message: InboundMessage) -> InboundResult:
    async with semaphore:
        return await kit.process_inbound(message)

Room Lifecycle

Rooms transition through states automatically based on activity timers:

ACTIVE ──(inactive timeout)──► PAUSED ──(closed timeout)──► CLOSED
from roomkit import RoomTimers

await kit.create_room(
    room_id="support-123",
    timers=RoomTimers(inactive_after_seconds=300, closed_after_seconds=3600),
)

# Check and apply timer transitions
transitioned = await kit.sweep_room_timers()

Storage

The ConversationStore ABC defines the persistence interface. InMemoryStore is included for development and testing. Implement the ABC to use any database.

kit = RoomKit()                          # uses InMemoryStore by default
kit = RoomKit(store=MyPostgresStore())   # plug in your own

The store handles rooms, events, bindings, participants, identities, tasks, and observations.

AI Assistant Support

RoomKit includes files to help AI coding assistants understand the library:

  • llms.txt — Structured documentation for LLM context windows
  • AGENTS.md — Coding guidelines and patterns for AI assistants
  • MCP Integration — Model Context Protocol support

Access programmatically:

from roomkit import get_llms_txt, get_agents_md

llms_content = get_llms_txt()
agents_content = get_agents_md()

Project Structure

src/roomkit/
  channels/        Channel implementations (sms, rcs, email, websocket, ai, ...)
  core/            Framework, hooks, routing, retry, circuit breaker
  identity/        Identity resolution pipeline
  models/          Pydantic data models and enums
  providers/       Provider implementations grouped by vendor
  realtime/        Ephemeral events (typing, presence, read receipts)
  sources/         Event-driven sources (WebSocket, custom)
  store/           Storage abstraction and in-memory implementation
  voice/           Voice subsystem (stt/, tts/, backends/)

Documentation

Contributing

See CONTRIBUTING.md and CODE_OF_CONDUCT.md. Quick version:

uv sync --extra dev
make all                # ruff check + mypy --strict + pytest

All new code needs tests. Aim for >90% coverage.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

roomkit-0.2.1.tar.gz (309.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

roomkit-0.2.1-py3-none-any.whl (169.6 kB view details)

Uploaded Python 3

File details

Details for the file roomkit-0.2.1.tar.gz.

File metadata

  • Download URL: roomkit-0.2.1.tar.gz
  • Upload date:
  • Size: 309.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for roomkit-0.2.1.tar.gz
Algorithm Hash digest
SHA256 7fc69ca8158e3b9a87782ea2569a2ab7a01c7afd1934982b971b009824e0f975
MD5 4f354b6df0cdaa2ab7c4498d2429b98d
BLAKE2b-256 b9cf939a27fb743dab69aa79797c1cd01e71d5e855495612b1076a15957fbc3b

See more details on using hashes here.

File details

Details for the file roomkit-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: roomkit-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 169.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for roomkit-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 83608a37e14b6bab8f43f679191db8efb577c9bceae1447b83d133a034307365
MD5 2845a282c13c563bce14b52d18777258
BLAKE2b-256 f3b3c3ec7d51ff70f156f81a9b83758b7b64c5516f4d66855682d2735f96d728

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page