Skip to main content

Unified message provider interface for Discord, Slack, Jira, and custom platforms with distributed relay support

Project description

Omni Message Provider

A unified Python interface for building chatbots and automated systems across multiple messaging platforms (Discord, Slack, Jira) with optional distributed relay support for scalable deployments.

Features

  • Unified Interface: Single MessageProvider interface for all platforms
  • Multiple Platforms: Discord, Slack, Jira, FastAPI (HTTP/REST), and polling clients
  • Distributed Architecture: Optional WebSocket relay for microservices deployments
  • Authentication Layer: Pluggable authentication for HTTP provider
  • Request Tracking: Status updates and cancellation support
  • High Performance: MessagePack serialization, WebSocket transport
  • Production Ready: Kubernetes-ready, auto-reconnection, error handling

Installation

Basic Installation

pip install omni-message-provider

With Platform Support

# Discord only
pip install omni-message-provider[discord]

# Slack only
pip install omni-message-provider[slack]

# Jira only
pip install omni-message-provider[jira]

# All platforms
pip install omni-message-provider[all]

Quick Start

Discord Bot

import os
import discord
from message_provider import DiscordMessageProvider

# Configure Discord
intents = discord.Intents.default()
intents.message_content = True

# Create provider
provider = DiscordMessageProvider(
    bot_token=os.getenv("DISCORD_BOT_TOKEN"),
    client_id="discord:my-bot",
    intents=intents,
    trigger_mode="mention",
    command_prefixes=["!support", "!cq"]
)

# Handle messages
def message_handler(message):
    print(f"Received: {message['text']}")

    channel = message['channel']
    message_id = message['message_id']

    # Reply (threads the response)
    provider.send_message(
        message="Hello!",
        user_id=message['user_id'],
        channel=channel,
        previous_message_id=message_id
    )

    # React to the original message
    provider.send_reaction(message_id, "๐Ÿ‘", channel=channel)

provider.register_message_listener(message_handler)
provider.start()

Slack Bot

import os
from message_provider import SlackMessageProvider

provider = SlackMessageProvider(
    bot_token=os.getenv("SLACK_BOT_TOKEN"),
    app_token=os.getenv("SLACK_APP_TOKEN"),
    client_id="slack:my-workspace",
    use_socket_mode=True,
    trigger_mode="mention",
    allowed_channels=["#support", "C12345678"]
)

def message_handler(message):
    channel = message['channel']
    message_id = message['message_id']

    # Reply in thread (previous_message_id is used as thread_ts)
    provider.send_message(
        message="Got it!",
        user_id=message['user_id'],
        channel=channel,
        previous_message_id=message_id
    )

    # React to the original message
    provider.send_reaction(message_id, "eyes", channel=channel)

provider.register_message_listener(message_handler)
provider.start()

Jira Issue Monitor

import os
from message_provider import JiraMessageProvider

provider = JiraMessageProvider(
    server="https://company.atlassian.net",
    email=os.getenv("JIRA_EMAIL"),
    api_token=os.getenv("JIRA_API_TOKEN"),
    project_keys=["SUPPORT", "BUG"],
    client_id="jira:main",
    watch_labels=["bot-watching"],
    trigger_phrases=["@bot"]
)

def message_handler(message):
    if message['type'] == 'new_issue':
        # Add comment to ticket
        provider.send_message(
            message="We're on it!",
            user_id="bot",
            channel=message['channel']  # Issue key
        )
        # Add label
        provider.send_reaction(message['channel'], "bot-acknowledged")
        # Change status
        provider.update_message(message['channel'], "In Progress")

provider.register_message_listener(message_handler)
provider.start()

FastAPI/HTTP Provider

The HTTP provider allows external clients to connect via REST API. It uses a unified message endpoint where all requests (messages, status requests, cancellations) go through the same API with a type field.

import os
from message_provider import FastAPIMessageProvider

# Basic setup (no authentication)
provider = FastAPIMessageProvider(
    provider_id="http:my-service",
    api_key=os.getenv("API_KEY"),  # Optional server-level API key
    host="0.0.0.0",
    port=9547
)

# Handle incoming messages from HTTP clients
def message_handler(message):
    msg_type = message.get('type', 'message')
    channel = message['channel']

    if msg_type in ('message', 'new_message'):
        # Regular message
        print(f"Received from {message['user_id']}: {message['text']}")

        # Send reply back to the subscriber
        provider.send_message(
            message="I received your message!",
            user_id="bot",
            channel=channel,
            previous_message_id=message['message_id']
        )

    elif msg_type == 'status_request':
        # Client requesting status update
        request_id = message.get('request_id')
        print(f"Status requested for {request_id}")
        # Look up status and send reply message

    elif msg_type == 'cancellation_request':
        # Client requesting cancellation
        request_id = message.get('request_id')
        print(f"Cancellation requested for {request_id}")
        # Handle cancellation logic

provider.register_message_listener(message_handler)
provider.start()

FastAPI Provider Authentication

The FastAPI provider supports a pluggable authentication layer with two callbacks:

Authentication Provider

Called during subscriber registration to validate credentials and issue session tokens:

def my_auth_provider(user_id: str, auth_details: dict) -> dict:
    """
    Validate user credentials at registration time.

    Args:
        user_id: The user ID provided during registration
        auth_details: Dict containing credentials (password, token, etc.)

    Returns:
        Dict with:
        - "allowed": bool (required) - Whether registration is allowed
        - "session_token": str (optional) - Token for subsequent requests
        - "reason": str (optional) - Reason for rejection
        - Any additional fields are stored with the subscriber
    """
    # Example: Check against your user database
    if not validate_user(user_id, auth_details.get("password")):
        return {"allowed": False, "reason": "Invalid credentials"}

    # Generate a session token
    session_token = generate_token(user_id)

    return {
        "allowed": True,
        "session_token": session_token,
        "user_role": "standard"  # Extra data stored with subscriber
    }

Session Validator

Called on each request to validate the session token:

def my_session_validator(subscriber_id: str, session_token: str) -> bool:
    """
    Validate session token on each request.

    Args:
        subscriber_id: The subscriber's UUID
        session_token: Token from Authorization header

    Returns:
        True if valid, False otherwise
    """
    # Example: Validate token against your session store
    return validate_token(session_token)

Complete Authentication Example

from message_provider import FastAPIMessageProvider

# Simple in-memory auth (use a real database in production)
users = {"alice": "password123", "bob": "secret456"}
sessions = {}

def auth_provider(user_id: str, auth_details: dict) -> dict:
    password = auth_details.get("password")
    if users.get(user_id) != password:
        return {"allowed": False, "reason": "Invalid credentials"}

    import uuid
    token = str(uuid.uuid4())
    sessions[token] = user_id
    return {"allowed": True, "session_token": token}

def session_validator(subscriber_id: str, session_token: str) -> bool:
    return session_token in sessions

provider = FastAPIMessageProvider(
    provider_id="http:authenticated-service",
    authentication_provider=auth_provider,
    session_validator=session_validator
)

Client Registration Flow

  1. Register: Client sends credentials, receives subscriber_id and session_token
  2. Send Messages: All requests go through POST /message/process with type field
  3. Receive Replies: Client polls GET /messages/{subscriber_id} for responses
  4. Re-register: Client can re-register with same subscriber_id to refresh session
# 1. Register
POST /subscriber/register
{
    "user_id": "alice",
    "auth_details": {"password": "password123"},
    "source_type": "web"
}

# Response
{
    "status": "registered",
    "subscriber_id": "uuid-here",
    "session_token": "token-here"
}

# 2. Send message
POST /message/process
Authorization: Bearer <session_token>
{
    "type": "message",
    "user_id": "alice",
    "text": "Hello!",
    "channel": "uuid-here"
}

# 3. Request status (same endpoint, different type)
POST /message/process
Authorization: Bearer <session_token>
{
    "type": "status_request",
    "user_id": "alice",
    "channel": "uuid-here",
    "request_id": "msg_abc123"
}

# 4. Poll for all responses
GET /messages/{subscriber_id}
Authorization: Bearer <session_token>

Unified Message Types

All client requests go through POST /message/process with a type field:

Type Description Required Fields
message Regular text message (default) text, user_id, channel
status_request Request status of a previous message user_id, channel, request_id
cancellation_request Request cancellation of active work user_id, channel, request_id
# Send a regular message
POST /message/process
{
    "type": "message",
    "text": "Hello!",
    "user_id": "alice",
    "channel": "subscriber-uuid"
}

# Request status update
POST /message/process
{
    "type": "status_request",
    "user_id": "alice",
    "channel": "subscriber-uuid",
    "request_id": "msg_abc123"
}

# Request cancellation
POST /message/process
{
    "type": "cancellation_request",
    "user_id": "alice",
    "channel": "subscriber-uuid",
    "request_id": "msg_abc123"
}

The orchestrator receives all message types through the same listener and handles them based on type:

# Register listeners for specific events (optional)
provider.register_request_status_update_listener(
    lambda req_id, info: print(f"Status request for {req_id}")
)

provider.register_request_cancellation_listener(
    lambda req_id, info: print(f"Cancellation for {req_id}")
)

# Main message handler receives everything
def message_handler(message):
    msg_type = message.get('type', 'message')

    if msg_type == 'status_request':
        # Client wants status - send a message back with current status
        provider.send_message(
            message=f"Status: processing",
            user_id="bot",
            channel=message['channel']
        )
    elif msg_type == 'cancellation_request':
        # Handle cancellation
        pass
    else:
        # Regular message processing
        pass

Distributed Architecture

For scalable, Kubernetes-ready deployments:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ SlackProvider   โ”‚โ”€โ”€โ–บ RelayClient (client_id="slack:ws1") โ”€โ”€โ”
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                          โ”‚
                                                             โ”‚ WS
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                          โ–ผ
โ”‚ DiscordProvider โ”‚โ”€โ”€โ–บ RelayClient (client_id="discord:g1") โ”€โ”€โ–บ RelayHub
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                          โ–ฒ
                                                             โ”‚ WS
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                          โ”‚
โ”‚ Orchestrator    โ”‚โ—„โ”€โ–บ RelayMessageProvider โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
# Message Provider Pod (Discord/Slack/Jira)
from message_provider import DiscordMessageProvider, RelayClient

discord_provider = DiscordMessageProvider(...)
relay_client = RelayClient(
    local_provider=discord_provider,
    relay_hub_url="ws://relay-hub:8765",
    client_id="discord:guild-123"
)
relay_client.start_blocking()

# RelayHub Pod (Central Router)
from message_provider import RelayHub, FastAPIMessageProvider

mp_provider = FastAPIMessageProvider(provider_id="http:hub")
hub = RelayHub(local_provider=mp_provider, port=8765)
await hub.start()

# Orchestrator Pods (Multiple instances)
from message_provider import RelayMessageProvider

provider = RelayMessageProvider(websocket_url="ws://relay-hub:8765")
provider.register_message_listener(my_handler)
provider.start()

Unified Interface

All providers implement the same interface:

class MessageProvider:
    def send_message(message: str, user_id: str, channel: str = None,
                     previous_message_id: str = None) -> dict:
        """Send a message. Use previous_message_id to reply in a thread."""

    def send_reaction(message_id: str, reaction: str, channel: str = None) -> dict:
        """Add a reaction/label. channel is required for Discord, Slack, FastAPI."""

    def update_message(message_id: str, new_text: str, channel: str = None) -> dict:
        """Update a message/status. channel is required for Discord, Slack, FastAPI."""

    def register_message_listener(callback: Callable) -> None:
        """Register callback for incoming messages"""

    def start() -> None:
        """Start the provider (blocking)"""

    def get_formatting_rules() -> str:
        """Return formatting syntax for this provider (mrkdwn, markdown, plaintext, jira)"""

    def request_status_update(request_id: str, channel: str = None) -> dict:
        """Get status of a request (FastAPI only)"""

    def register_request_status_update_listener(callback: Callable) -> None:
        """Register callback for status updates (FastAPI only)"""

    def request_cancellation(request_id: str, channel: str = None) -> dict:
        """Request cancellation of active request (FastAPI only)"""

    def register_request_cancellation_listener(callback: Callable) -> None:
        """Register callback for cancellation requests (FastAPI only)"""

Providers are stateless -- they do not cache message or channel metadata internally. The application is responsible for tracking conversation state.

Platform-Specific Mappings

Discord

  • client_id = Provider instance identifier (e.g., "discord:my-bot")
  • channel = Discord channel ID
  • send_message() โ†’ Send Discord message (uses previous_message_id as reply reference)
  • send_reaction(channel=...) โ†’ Add emoji reaction (channel required)
  • update_message(channel=...) โ†’ Edit message (channel required)
  • get_formatting_rules() โ†’ Returns "markdown"

Slack

  • client_id = Provider instance identifier (e.g., "slack:my-workspace")
  • channel = Slack channel ID
  • send_message() โ†’ Post Slack message (uses previous_message_id as thread_ts)
  • send_reaction(channel=...) โ†’ Add reaction emoji (channel required)
  • update_message(channel=...) โ†’ Update message (channel required)
  • get_formatting_rules() โ†’ Returns "mrkdwn"

Jira

  • client_id = Provider instance identifier (e.g., "jira:main")
  • channel = Jira issue key (e.g., "SUPPORT-123")
  • send_message() โ†’ Add comment to ticket
  • send_reaction() โ†’ Add label to ticket
  • update_message() โ†’ Change ticket status
  • get_formatting_rules() โ†’ Returns "jira"

FastAPI/HTTP

  • provider_id = Provider instance identifier (e.g., "http:my-service")
  • channel = Subscriber UUID (returned at registration)
  • send_message(channel=...) โ†’ Send to specific subscriber (channel required)
  • send_reaction(channel=...) โ†’ Send reaction to subscriber (channel required)
  • update_message(channel=...) โ†’ Send update to subscriber (channel required)
  • get_formatting_rules() โ†’ Returns "plaintext"

Message Format

All providers use the same message structure for incoming and outgoing messages:

{
    "type": "message",              # See message types below
    "message_id": "msg_abc123",
    "text": "message content",      # Optional for non-message types
    "user_id": "user-identifier",
    "channel": "channel-identifier",
    "thread_id": "thread-identifier",  # Optional
    "request_id": "msg_xyz789",     # For status/cancellation requests
    "metadata": {
        "provider_id": "http:my-service",  # or "client_id" for Discord/Slack/Jira
        # Platform-specific fields
    },
    "timestamp": "2026-02-11T10:30:00Z"
}

Message Types

Type Description Key Fields
message Regular text message text (required)
status_request Client requesting status update request_id
cancellation_request Client requesting cancellation request_id
reaction Emoji/label reaction reaction, message_id
update Message edit/update text, message_id

For Discord/Slack/Jira, incoming messages use types like new_message, new_issue, new_comment.


## Configuration

All providers accept explicit parameters (no environment variables in library):

```python
# Discord
provider = DiscordMessageProvider(
    bot_token=os.getenv("MY_DISCORD_TOKEN"),
    client_id="discord:my-bot",
    trigger_mode="both",  # "mention", "chat", "command", "both"
    command_prefixes=["!support", "!cq"]
)

# Slack
provider = SlackMessageProvider(
    bot_token=os.getenv("SLACK_BOT_TOKEN"),
    app_token=os.getenv("SLACK_APP_TOKEN"),
    client_id="slack:my-workspace",
    use_socket_mode=True,
    trigger_mode="mention",
    allowed_channels=["#support"]
)

# FastAPI/HTTP
provider = FastAPIMessageProvider(
    provider_id="http:my-service",
    api_key=os.getenv("API_KEY"),  # Optional server-level key
    authentication_provider=my_auth_func,  # Optional
    session_validator=my_validator_func,   # Optional
    request_context_ttl=3600,              # Cleanup tracked requests after 1 hour (default)
    max_request_contexts=10000,            # Force cleanup if exceeds this count (default)
    session_validator=my_validator_func,   # Optional
    host="0.0.0.0",
    port=9547
)

Examples

See the src/message_provider/examples/ directory for complete working examples:

  • discord_example.py - Discord bot with reactions
  • slack_example.py - Slack bot with Socket Mode
  • jira_example.py - Jira issue monitor
  • relay_example.py - Distributed relay setup
  • polling_client_example.py - FastAPI polling client with status tracking

Development

# Clone repository
git clone https://github.com/AgentSanchez/omni-message-provider
cd omni-message-provider

# Install with dev dependencies
pip install -e ".[dev,all]"

# Run tests
pytest

# Format code
black src/message_provider/
ruff check src/message_provider/

Requirements

  • Python 3.9+
  • Core: fastapi, uvicorn, websockets, msgpack
  • Optional: discord.py, slack-bolt, jira

License

MIT License - see LICENSE file

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

omni_message_provider-0.2.6.tar.gz (65.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

omni_message_provider-0.2.6-py3-none-any.whl (72.8 kB view details)

Uploaded Python 3

File details

Details for the file omni_message_provider-0.2.6.tar.gz.

File metadata

  • Download URL: omni_message_provider-0.2.6.tar.gz
  • Upload date:
  • Size: 65.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for omni_message_provider-0.2.6.tar.gz
Algorithm Hash digest
SHA256 fa083c853624e86182cc5e653892a498ebfdf1e65312bff8810afa936007a26c
MD5 b24cd76df3bce8a1286a66ad8c0f7deb
BLAKE2b-256 50799a7c711fa3042562a37b2a18b79ed13964ee3854b9a56bac1128cbaef918

See more details on using hashes here.

File details

Details for the file omni_message_provider-0.2.6-py3-none-any.whl.

File metadata

File hashes

Hashes for omni_message_provider-0.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 13ed36d7761bf824a0bed030d2cb913a1431acf67a5d21e50f3378a736dd41ca
MD5 66edfffe98ed45564d32248cef375754
BLAKE2b-256 19bf5d8c677d912ec400ecc30272dd6c13b852194a6626f17b03ec87142b4262

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page