Skip to main content

Real-time WebSocket pub/sub server with optional AI processing

Project description

Gnosari Realtime

A high-performance, real-time WebSocket pub/sub server with optional AI processing capabilities. Built with Python, Starlette, and modern async technologies.

Features

  • Real-time WebSocket Communication: High-performance WebSocket server with channel-based pub/sub
  • Python SDK: Easy-to-use client library for programmatic access
  • Authentication & Authorization: JWT-based authentication with team and user-level permissions
  • Message Persistence: PostgreSQL storage with full message history
  • Search & Analytics: OpenSearch integration for message search and analytics
  • Redis Integration: Pub/sub broadcasting and caching for horizontal scaling
  • Rate Limiting: Built-in rate limiting and connection management
  • Extensible Architecture: Clean SOLID architecture ready for AI integrations

Quick Start

Installation

# Clone the repository
git clone <repository-url>
cd realtime-server

# Install dependencies
pip install poetry
poetry install

# Set up environment
cp .env.example .env
# Edit .env with your configuration

Start Services

Option 1: Docker Compose (Recommended)

# Start all services including the server
docker compose up -d

# Wait for services to be healthy, then run migrations
docker compose exec gnosari-server poetry run alembic upgrade head

Option 2: Local Development

# Start supporting services only
docker compose up -d postgres redis opensearch-node1

# Run database migrations
poetry run alembic upgrade head

# Start the server locally
poetry run gnosari-realtime-server

The server will start on http://localhost:8000 by default.

Docker Service Health

Check if all services are running:

# Check service status
docker compose ps

# Check service logs
docker compose logs gnosari-server
docker compose logs postgres
docker compose logs redis

# Check health status
curl http://localhost:8000/health

Usage

Python SDK

Basic Usage

import asyncio
from gnosari_realtime import RealtimeClient

async def message_handler(data):
    print(f"Received: {data}")

async def main():
    client = RealtimeClient("ws://localhost:8000/ws/v1")
    
    await client.connect()
    await client.subscribe("public:notifications", message_handler)
    await client.publish("public:notifications", {"message": "Hello World!"})
    
    # Keep listening
    await asyncio.sleep(10)
    await client.disconnect()

asyncio.run(main())

Authenticated Usage

# Create auth token first
import httpx

async with httpx.AsyncClient() as client:
    response = await client.post("http://localhost:8000/api/v1/auth/token", json={
        "user_id": "user123",
        "teams": ["content-team"],
        "permissions": ["publish_public"]
    })
    token = response.json()["token"]

# Use token with WebSocket client
client = RealtimeClient("ws://localhost:8000/ws/v1", auth_token=token)
await client.connect()
await client.subscribe("team:content-team", team_handler)

HTTP API

Publish Messages

# Create authentication token
curl -X POST http://localhost:8000/api/v1/auth/token \
  -H "Content-Type: application/json" \
  -d '{"user_id": "user123", "teams": ["content-team"]}'

# Publish to channel
curl -X POST http://localhost:8000/api/v1/publish/public:alerts \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{"data": {"message": "System alert", "level": "warning"}}'

Search Messages

curl -X GET "http://localhost:8000/api/v1/search?q=alert&channel=public:alerts" \
  -H "Authorization: Bearer <token>"

Get Analytics

curl -X GET "http://localhost:8000/api/v1/analytics?channel=public:alerts" \
  -H "Authorization: Bearer <token>"

Channels

Dynamic Creation: Channels are created automatically when first used - no pre-configuration required.

Access Control Patterns:

  • public:* - Public channels (require publish_public permission to publish)
  • team:<team_id> - Team-specific channels (require team membership)
  • user:<user_id> - User-private channels (only accessible by the specific user)
  • admin:* - Admin-only channels (require admin privileges)

Permissions: Dynamic strings - create any permission name. Built-in ones: publish_public, read_analytics.

WebSocket API

Connection

Connect to ws://localhost:8000/ws/v1 with optional authentication:

  • Header: Authorization: Bearer <token>
  • Query param: ?token=<token>

Message Format

All WebSocket messages follow this schema:

{
  "id": "uuid",
  "type": "command|query|event|response|error",
  "action": "subscribe|unsubscribe|publish|ping|...",
  "version": "v1",
  "payload": {},
  "metadata": {
    "timestamp": "2024-01-15T10:30:00Z",
    "user_id": "user123",
    "session_id": "session456"
  }
}

Subscribe to Channel

{
  "type": "command",
  "action": "subscribe",
  "payload": {
    "channel": "public:notifications",
    "filters": {"user_id": "user123"}
  }
}

Publish Message

{
  "type": "command",
  "action": "publish",
  "payload": {
    "channel": "public:notifications",
    "data": {"message": "Hello World!"}
  }
}

Receive Messages

{
  "type": "event",
  "action": "message",
  "payload": {
    "channel": "public:notifications",
    "data": {"message": "Hello World!"}
  }
}

Configuration

Environment variables (see .env.example):

# Database
DATABASE_URL=postgresql+asyncpg://gnosari:gnosari123@localhost:5432/gnosari_realtime

# Redis
REDIS_URL=redis://localhost:6379/0

# OpenSearch
OPENSEARCH_URL=http://localhost:9200

# JWT Authentication
JWT_SECRET_KEY=your-secret-key
JWT_ALGORITHM=HS256
JWT_EXPIRE_MINUTES=1440

# Server
HOST=0.0.0.0
PORT=8000
DEBUG=true
LOG_LEVEL=INFO

# Rate Limiting
RATE_LIMIT_PER_MINUTE=100
MAX_CONNECTIONS_PER_CLIENT=5

Development

Running Tests

The project includes a comprehensive test suite with unit, integration, and end-to-end tests.

# Run all tests
./scripts/test.sh

# Quick test runner for development
./scripts/run_quick_tests.py

# Run specific test categories
poetry run pytest tests/unit/          # Unit tests only
poetry run pytest tests/integration/   # Integration tests only

# Run with coverage
poetry run pytest --cov=src/gnosari_realtime --cov-report=html

# Run specific test file
poetry run pytest tests/unit/test_schemas.py -v

Test Categories

  • Unit Tests: Core component testing with mocked dependencies

    • Message schema validation
    • Authentication and authorization logic
    • Channel management and subscription handling
    • Python SDK client functionality
  • Integration Tests: Service integration testing

    • WebSocket communication protocols
    • HTTP API endpoint functionality
    • Database and external service integration
  • End-to-End Tests: Complete workflow testing

    • Multi-client pub/sub scenarios
    • Authentication flows
    • Error recovery and resilience

Code Quality

# Format code
poetry run black .

# Lint code
poetry run ruff check .

# Type checking
poetry run mypy .

# Run all quality checks
./scripts/test.sh  # Includes quality checks

Database Migrations

# Create new migration
poetry run alembic revision --autogenerate -m "description"

# Apply migrations
poetry run alembic upgrade head

# Rollback migration
poetry run alembic downgrade -1

Architecture

┌─────────────────────┐
│   Transport Layer   │  (Starlette WebSocket)
├─────────────────────┤
│ Application Layer   │  (Connection Management, Message Routing)
├─────────────────────┤
│  Integration Layer  │  (Optional AI Processing Hooks)
├─────────────────────┤
│    Domain Layer     │  (Channel Management, Message Schemas)
├─────────────────────┤
│ Infrastructure Layer│  (Database, Redis, OpenSearch)
└─────────────────────┘

Key Components

  • Connection Manager: Handles WebSocket lifecycle and routing
  • Channel Manager: Manages channel subscriptions and message routing
  • Message Schemas: Pydantic models for type-safe message handling
  • Authentication: JWT-based authentication with role-based access control
  • Persistence: PostgreSQL for message history and connection tracking
  • Search: OpenSearch for full-text search and analytics
  • Caching: Redis for pub/sub and high-performance caching

Scaling

Horizontal Scaling

The server supports horizontal scaling through Redis pub/sub:

  1. Run multiple server instances
  2. All instances connect to the same Redis
  3. Messages are automatically distributed across instances
  4. WebSocket connections are load-balanced

Performance Tuning

  • Adjust PostgreSQL connection pool size
  • Configure Redis memory limits
  • Set appropriate OpenSearch sharding
  • Tune rate limiting parameters

Security

  • Authentication: JWT tokens with configurable expiration
  • Authorization: Channel-based access control with team/user permissions
  • Rate Limiting: Per-client message and connection limits
  • Input Validation: Comprehensive message validation and sanitization
  • CORS: Configurable CORS policies for web applications

Monitoring

Health Check

curl http://localhost:8000/health

Server Statistics

curl http://localhost:8000/api/v1/stats

OpenSearch Dashboards

Access analytics at http://localhost:5601 when using Docker Compose.

Examples & Demos

Quick Demo

To see the server in action with real-time messaging:

# Start the server first
docker compose up -d

# Run the simple demo (1 publisher, 1 subscriber)
./run_demo.sh simple

# Run the full live demo (3 publishers, 2 subscribers, 60 seconds)
./run_demo.sh live

Demo Features

  • Simple Demo (examples/simple_demo.py):

    • 1 publisher sending 5 different message types
    • 1 subscriber receiving and displaying messages
    • Perfect for quick testing and understanding basic functionality
  • Live Demo (examples/live_demo.py):

    • 3 publishers with different sending intervals (fast, medium, slow)
    • 2 subscribers listening to multiple channels
    • Real-time statistics and colored output
    • Demonstrates concurrent messaging and channel routing
    • Generates realistic message types: alerts, metrics, notifications, status updates

Other Examples

See the examples/ directory for complete working examples:

  • basic_client.py - Simple pub/sub client
  • authenticated_client.py - Client with authentication and team channels
  • http_publisher.py - Publishing via HTTP API
  • simple_demo.py - Quick demonstration script
  • live_demo.py - Full-featured real-time demo
  • run_demo.sh - Demo runner script with multiple options

Demo Runner Usage

# Available demo options
./run_demo.sh help

# Quick functionality test
./run_demo.sh simple

# Full real-time demonstration  
./run_demo.sh live

# Check server status
./run_demo.sh check

# Run specific examples
./run_demo.sh basic    # Basic client
./run_demo.sh auth     # Authenticated client
./run_demo.sh http     # HTTP publisher

Documentation

Comprehensive documentation is available in the docs/ directory:

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make changes with tests
  4. Run quality checks (black, ruff, mypy, pytest)
  5. Submit a pull request

License

[License information here]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gnosari_realtime-0.1.2.tar.gz (28.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gnosari_realtime-0.1.2-py3-none-any.whl (31.2 kB view details)

Uploaded Python 3

File details

Details for the file gnosari_realtime-0.1.2.tar.gz.

File metadata

  • Download URL: gnosari_realtime-0.1.2.tar.gz
  • Upload date:
  • Size: 28.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.13.2 Darwin/23.4.0

File hashes

Hashes for gnosari_realtime-0.1.2.tar.gz
Algorithm Hash digest
SHA256 24d1fe37d155c4a19ed9e8c7ab201f1379178dccca88138e2ab374fde4e94f89
MD5 bf56dd735350a2d7e622f17beca4e9e6
BLAKE2b-256 9189194edb2302926f9abd8c207ecd5d7d4b3693721ae18fa49e9aaea527acab

See more details on using hashes here.

File details

Details for the file gnosari_realtime-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: gnosari_realtime-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 31.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.13.2 Darwin/23.4.0

File hashes

Hashes for gnosari_realtime-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3eeb3413d39565599bd5348340564a68d210645067ac78c599717c47d86d2f0c
MD5 4ff1418918c99c29b22886de72100e6c
BLAKE2b-256 08a86142ed986a7021709fd8bca7eecb455bffa00c7a31a3d60dad53dbbd69c2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page