Unified message provider interface for Discord, Slack, Jira, and custom platforms with distributed relay support
Project description
Omni Message Provider
A unified Python interface for building chatbots and automated systems across multiple messaging platforms (Discord, Slack, Jira) with optional distributed relay support for scalable deployments.
Features
- Unified Interface: Single
MessageProviderinterface for all platforms - Multiple Platforms: Discord, Slack, Jira, FastAPI (HTTP/REST), and polling clients
- Distributed Architecture: Optional WebSocket relay for microservices deployments
- Authentication Layer: Pluggable authentication for HTTP provider
- Request Tracking: Status updates and cancellation support
- High Performance: MessagePack serialization, WebSocket transport
- Production Ready: Kubernetes-ready, auto-reconnection, error handling
Installation
Basic Installation
pip install omni-message-provider
With Platform Support
# Discord only
pip install omni-message-provider[discord]
# Slack only
pip install omni-message-provider[slack]
# Jira only
pip install omni-message-provider[jira]
# All platforms
pip install omni-message-provider[all]
Quick Start
Discord Bot
import os
import discord
from message_provider import DiscordMessageProvider
# Configure Discord
intents = discord.Intents.default()
intents.message_content = True
# Create provider
provider = DiscordMessageProvider(
bot_token=os.getenv("DISCORD_BOT_TOKEN"),
client_id="discord:my-bot",
intents=intents,
trigger_mode="mention",
command_prefixes=["!support", "!cq"]
)
# Handle messages
def message_handler(message):
print(f"Received: {message['text']}")
channel = message['channel']
message_id = message['message_id']
# Reply (threads the response)
provider.send_message(
message="Hello!",
user_id=message['user_id'],
channel=channel,
previous_message_id=message_id
)
# React to the original message
provider.send_reaction(message_id, "๐", channel=channel)
provider.register_message_listener(message_handler)
provider.start()
Slack Bot
import os
from message_provider import SlackMessageProvider
provider = SlackMessageProvider(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
app_token=os.getenv("SLACK_APP_TOKEN"),
client_id="slack:my-workspace",
use_socket_mode=True,
trigger_mode="mention",
allowed_channels=["#support", "C12345678"]
)
def message_handler(message):
channel = message['channel']
message_id = message['message_id']
# Reply in thread (previous_message_id is used as thread_ts)
provider.send_message(
message="Got it!",
user_id=message['user_id'],
channel=channel,
previous_message_id=message_id
)
# React to the original message
provider.send_reaction(message_id, "eyes", channel=channel)
provider.register_message_listener(message_handler)
provider.start()
Jira Issue Monitor
import os
from message_provider import JiraMessageProvider
provider = JiraMessageProvider(
server="https://company.atlassian.net",
email=os.getenv("JIRA_EMAIL"),
api_token=os.getenv("JIRA_API_TOKEN"),
project_keys=["SUPPORT", "BUG"],
client_id="jira:main",
watch_labels=["bot-watching"],
trigger_phrases=["@bot"]
)
def message_handler(message):
if message['type'] == 'new_issue':
# Add comment to ticket
provider.send_message(
message="We're on it!",
user_id="bot",
channel=message['channel'] # Issue key
)
# Add label
provider.send_reaction(message['channel'], "bot-acknowledged")
# Change status
provider.update_message(message['channel'], "In Progress")
provider.register_message_listener(message_handler)
provider.start()
FastAPI/HTTP Provider
The HTTP provider allows external clients to connect via REST API. It uses a unified message endpoint where all requests (messages, status requests, cancellations) go through the same API with a type field.
import os
from message_provider import FastAPIMessageProvider
# Basic setup (no authentication)
provider = FastAPIMessageProvider(
provider_id="http:my-service",
api_key=os.getenv("API_KEY"), # Optional server-level API key
host="0.0.0.0",
port=9547
)
# Handle incoming messages from HTTP clients
def message_handler(message):
msg_type = message.get('type', 'message')
channel = message['channel']
if msg_type in ('message', 'new_message'):
# Regular message
print(f"Received from {message['user_id']}: {message['text']}")
# Send reply back to the subscriber
provider.send_message(
message="I received your message!",
user_id="bot",
channel=channel,
previous_message_id=message['message_id']
)
elif msg_type == 'status_request':
# Client requesting status update
request_id = message.get('request_id')
print(f"Status requested for {request_id}")
# Look up status and send reply message
elif msg_type == 'cancellation_request':
# Client requesting cancellation
request_id = message.get('request_id')
print(f"Cancellation requested for {request_id}")
# Handle cancellation logic
provider.register_message_listener(message_handler)
provider.start()
FastAPI Provider Authentication
The FastAPI provider supports a pluggable authentication layer with two callbacks:
Authentication Provider
Called during subscriber registration to validate credentials and issue session tokens:
def my_auth_provider(user_id: str, auth_details: dict) -> dict:
"""
Validate user credentials at registration time.
Args:
user_id: The user ID provided during registration
auth_details: Dict containing credentials (password, token, etc.)
Returns:
Dict with:
- "allowed": bool (required) - Whether registration is allowed
- "session_token": str (optional) - Token for subsequent requests
- "reason": str (optional) - Reason for rejection
- Any additional fields are stored with the subscriber
"""
# Example: Check against your user database
if not validate_user(user_id, auth_details.get("password")):
return {"allowed": False, "reason": "Invalid credentials"}
# Generate a session token
session_token = generate_token(user_id)
return {
"allowed": True,
"session_token": session_token,
"user_role": "standard" # Extra data stored with subscriber
}
Session Validator
Called on each request to validate the session token:
def my_session_validator(subscriber_id: str, session_token: str) -> bool:
"""
Validate session token on each request.
Args:
subscriber_id: The subscriber's UUID
session_token: Token from Authorization header
Returns:
True if valid, False otherwise
"""
# Example: Validate token against your session store
return validate_token(session_token)
Complete Authentication Example
from message_provider import FastAPIMessageProvider
# Simple in-memory auth (use a real database in production)
users = {"alice": "password123", "bob": "secret456"}
sessions = {}
def auth_provider(user_id: str, auth_details: dict) -> dict:
password = auth_details.get("password")
if users.get(user_id) != password:
return {"allowed": False, "reason": "Invalid credentials"}
import uuid
token = str(uuid.uuid4())
sessions[token] = user_id
return {"allowed": True, "session_token": token}
def session_validator(subscriber_id: str, session_token: str) -> bool:
return session_token in sessions
provider = FastAPIMessageProvider(
provider_id="http:authenticated-service",
authentication_provider=auth_provider,
session_validator=session_validator
)
Client Registration Flow
- Register: Client sends credentials, receives
subscriber_idandsession_token - Send Messages: All requests go through
POST /message/processwithtypefield - Receive Replies: Client polls
GET /messages/{subscriber_id}for responses - Re-register: Client can re-register with same
subscriber_idto refresh session
# 1. Register
POST /subscriber/register
{
"user_id": "alice",
"auth_details": {"password": "password123"},
"source_type": "web"
}
# Response
{
"status": "registered",
"subscriber_id": "uuid-here",
"session_token": "token-here"
}
# 2. Send message
POST /message/process
Authorization: Bearer <session_token>
{
"type": "message",
"user_id": "alice",
"text": "Hello!",
"channel": "uuid-here"
}
# 3. Request status (same endpoint, different type)
POST /message/process
Authorization: Bearer <session_token>
{
"type": "status_request",
"user_id": "alice",
"channel": "uuid-here",
"request_id": "msg_abc123"
}
# 4. Poll for all responses
GET /messages/{subscriber_id}
Authorization: Bearer <session_token>
Unified Message Types
All client requests go through POST /message/process with a type field:
| Type | Description | Required Fields |
|---|---|---|
message |
Regular text message (default) | text, user_id, channel |
status_request |
Request status of a previous message | user_id, channel, request_id |
cancellation_request |
Request cancellation of active work | user_id, channel, request_id |
# Send a regular message
POST /message/process
{
"type": "message",
"text": "Hello!",
"user_id": "alice",
"channel": "subscriber-uuid"
}
# Request status update
POST /message/process
{
"type": "status_request",
"user_id": "alice",
"channel": "subscriber-uuid",
"request_id": "msg_abc123"
}
# Request cancellation
POST /message/process
{
"type": "cancellation_request",
"user_id": "alice",
"channel": "subscriber-uuid",
"request_id": "msg_abc123"
}
The orchestrator receives all message types through the same listener and handles them based on type:
# Register listeners for specific events (optional)
provider.register_request_status_update_listener(
lambda req_id, info: print(f"Status request for {req_id}")
)
provider.register_request_cancellation_listener(
lambda req_id, info: print(f"Cancellation for {req_id}")
)
# Main message handler receives everything
def message_handler(message):
msg_type = message.get('type', 'message')
if msg_type == 'status_request':
# Client wants status - send a message back with current status
provider.send_message(
message=f"Status: processing",
user_id="bot",
channel=message['channel']
)
elif msg_type == 'cancellation_request':
# Handle cancellation
pass
else:
# Regular message processing
pass
Distributed Architecture
For scalable, Kubernetes-ready deployments:
โโโโโโโโโโโโโโโโโโโ
โ SlackProvider โโโโบ RelayClient (client_id="slack:ws1") โโโ
โโโโโโโโโโโโโโโโโโโ โ
โ WS
โโโโโโโโโโโโโโโโโโโ โผ
โ DiscordProvider โโโโบ RelayClient (client_id="discord:g1") โโโบ RelayHub
โโโโโโโโโโโโโโโโโโโ โฒ
โ WS
โโโโโโโโโโโโโโโโโโโ โ
โ Orchestrator โโโโบ RelayMessageProvider โโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโ
# Message Provider Pod (Discord/Slack/Jira)
from message_provider import DiscordMessageProvider, RelayClient
discord_provider = DiscordMessageProvider(...)
relay_client = RelayClient(
local_provider=discord_provider,
relay_hub_url="ws://relay-hub:8765",
client_id="discord:guild-123"
)
relay_client.start_blocking()
# RelayHub Pod (Central Router)
from message_provider import RelayHub, FastAPIMessageProvider
mp_provider = FastAPIMessageProvider(provider_id="http:hub")
hub = RelayHub(local_provider=mp_provider, port=8765)
await hub.start()
# Orchestrator Pods (Multiple instances)
from message_provider import RelayMessageProvider
provider = RelayMessageProvider(websocket_url="ws://relay-hub:8765")
provider.register_message_listener(my_handler)
provider.start()
Unified Interface
All providers implement the same interface:
class MessageProvider:
def send_message(message: str, user_id: str, channel: str = None,
previous_message_id: str = None) -> dict:
"""Send a message. Use previous_message_id to reply in a thread."""
def send_reaction(message_id: str, reaction: str, channel: str = None) -> dict:
"""Add a reaction/label. channel is required for Discord, Slack, FastAPI."""
def update_message(message_id: str, new_text: str, channel: str = None) -> dict:
"""Update a message/status. channel is required for Discord, Slack, FastAPI."""
def register_message_listener(callback: Callable) -> None:
"""Register callback for incoming messages"""
def start() -> None:
"""Start the provider (blocking)"""
def get_formatting_rules() -> str:
"""Return formatting syntax for this provider (mrkdwn, markdown, plaintext, jira)"""
def request_status_update(request_id: str, channel: str = None) -> dict:
"""Get status of a request (FastAPI only)"""
def register_request_status_update_listener(callback: Callable) -> None:
"""Register callback for status updates (FastAPI only)"""
def request_cancellation(request_id: str, channel: str = None) -> dict:
"""Request cancellation of active request (FastAPI only)"""
def register_request_cancellation_listener(callback: Callable) -> None:
"""Register callback for cancellation requests (FastAPI only)"""
Providers are stateless -- they do not cache message or channel metadata internally. The application is responsible for tracking conversation state.
Platform-Specific Mappings
Discord
client_id= Provider instance identifier (e.g., "discord:my-bot")channel= Discord channel IDsend_message()โ Send Discord message (usesprevious_message_idas reply reference)send_reaction(channel=...)โ Add emoji reaction (channel required)update_message(channel=...)โ Edit message (channel required)get_formatting_rules()โ Returns "markdown"
Slack
client_id= Provider instance identifier (e.g., "slack:my-workspace")channel= Slack channel IDsend_message()โ Post Slack message (usesprevious_message_idasthread_ts)send_reaction(channel=...)โ Add reaction emoji (channel required)update_message(channel=...)โ Update message (channel required)get_formatting_rules()โ Returns "mrkdwn"
Jira
client_id= Provider instance identifier (e.g., "jira:main")channel= Jira issue key (e.g., "SUPPORT-123")send_message()โ Add comment to ticketsend_reaction()โ Add label to ticketupdate_message()โ Change ticket statusget_formatting_rules()โ Returns "jira"
FastAPI/HTTP
provider_id= Provider instance identifier (e.g., "http:my-service")channel= Subscriber UUID (returned at registration)send_message(channel=...)โ Send to specific subscriber (channel required)send_reaction(channel=...)โ Send reaction to subscriber (channel required)update_message(channel=...)โ Send update to subscriber (channel required)get_formatting_rules()โ Returns "plaintext"
Message Format
All providers use the same message structure for incoming and outgoing messages:
{
"type": "message", # See message types below
"message_id": "msg_abc123",
"text": "message content", # Optional for non-message types
"user_id": "user-identifier",
"channel": "channel-identifier",
"thread_id": "thread-identifier", # Optional
"request_id": "msg_xyz789", # For status/cancellation requests
"metadata": {
"provider_id": "http:my-service", # or "client_id" for Discord/Slack/Jira
# Platform-specific fields
},
"timestamp": "2026-02-11T10:30:00Z"
}
Message Types
| Type | Description | Key Fields |
|---|---|---|
message |
Regular text message | text (required) |
status_request |
Client requesting status update | request_id |
cancellation_request |
Client requesting cancellation | request_id |
reaction |
Emoji/label reaction | reaction, message_id |
update |
Message edit/update | text, message_id |
For Discord/Slack/Jira, incoming messages use types like new_message, new_issue, new_comment.
## Configuration
All providers accept explicit parameters (no environment variables in library):
```python
# Discord
provider = DiscordMessageProvider(
bot_token=os.getenv("MY_DISCORD_TOKEN"),
client_id="discord:my-bot",
trigger_mode="both", # "mention", "chat", "command", "both"
command_prefixes=["!support", "!cq"]
)
# Slack
provider = SlackMessageProvider(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
app_token=os.getenv("SLACK_APP_TOKEN"),
client_id="slack:my-workspace",
use_socket_mode=True,
trigger_mode="mention",
allowed_channels=["#support"]
)
# FastAPI/HTTP
provider = FastAPIMessageProvider(
provider_id="http:my-service",
api_key=os.getenv("API_KEY"), # Optional server-level key
authentication_provider=my_auth_func, # Optional
session_validator=my_validator_func, # Optional
request_context_ttl=3600, # Cleanup tracked requests after 1 hour (default)
max_request_contexts=10000, # Force cleanup if exceeds this count (default)
session_validator=my_validator_func, # Optional
host="0.0.0.0",
port=9547
)
Examples
See the src/message_provider/examples/ directory for complete working examples:
discord_example.py- Discord bot with reactionsslack_example.py- Slack bot with Socket Modejira_example.py- Jira issue monitorrelay_example.py- Distributed relay setuppolling_client_example.py- FastAPI polling client with status tracking
Development
# Clone repository
git clone https://github.com/AgentSanchez/omni-message-provider
cd omni-message-provider
# Install with dev dependencies
pip install -e ".[dev,all]"
# Run tests
pytest
# Format code
black src/message_provider/
ruff check src/message_provider/
Requirements
- Python 3.9+
- Core:
fastapi,uvicorn,websockets,msgpack - Optional:
discord.py,slack-bolt,jira
License
MIT License - see LICENSE file
Support
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file omni_message_provider-0.2.6.tar.gz.
File metadata
- Download URL: omni_message_provider-0.2.6.tar.gz
- Upload date:
- Size: 65.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa083c853624e86182cc5e653892a498ebfdf1e65312bff8810afa936007a26c
|
|
| MD5 |
b24cd76df3bce8a1286a66ad8c0f7deb
|
|
| BLAKE2b-256 |
50799a7c711fa3042562a37b2a18b79ed13964ee3854b9a56bac1128cbaef918
|
File details
Details for the file omni_message_provider-0.2.6-py3-none-any.whl.
File metadata
- Download URL: omni_message_provider-0.2.6-py3-none-any.whl
- Upload date:
- Size: 72.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
13ed36d7761bf824a0bed030d2cb913a1431acf67a5d21e50f3378a736dd41ca
|
|
| MD5 |
66edfffe98ed45564d32248cef375754
|
|
| BLAKE2b-256 |
19bf5d8c677d912ec400ecc30272dd6c13b852194a6626f17b03ec87142b4262
|