Real-time WebSocket pub/sub server with optional AI processing
Project description
Gnosari Realtime
A high-performance, real-time WebSocket pub/sub server with optional AI processing capabilities. Built with Python, Starlette, and modern async technologies.
Features
- Real-time WebSocket Communication: High-performance WebSocket server with channel-based pub/sub
- Python SDK: Easy-to-use client library for programmatic access
- Authentication & Authorization: JWT-based authentication with team and user-level permissions
- Message Persistence: PostgreSQL storage with full message history
- Search & Analytics: OpenSearch integration for message search and analytics
- Redis Integration: Pub/sub broadcasting and caching for horizontal scaling
- Rate Limiting: Built-in rate limiting and connection management
- Extensible Architecture: Clean SOLID architecture ready for AI integrations
Quick Start
Installation
# Clone the repository
git clone <repository-url>
cd realtime-server
# Install dependencies
pip install poetry
poetry install
# Set up environment
cp .env.example .env
# Edit .env with your configuration
Start Services
Option 1: Docker Compose (Recommended)
# Start all services including the server
docker compose up -d
# Wait for services to be healthy, then run migrations
docker compose exec gnosari-server poetry run alembic upgrade head
Option 2: Local Development
# Start supporting services only
docker compose up -d postgres redis opensearch-node1
# Run database migrations
poetry run alembic upgrade head
# Start the server locally
poetry run gnosari-realtime-server
The server will start on http://localhost:8000 by default.
Docker Service Health
Check if all services are running:
# Check service status
docker compose ps
# Check service logs
docker compose logs gnosari-server
docker compose logs postgres
docker compose logs redis
# Check health status
curl http://localhost:8000/health
Usage
Python SDK
Basic Usage
import asyncio
from gnosari_realtime import RealtimeClient
async def message_handler(data):
print(f"Received: {data}")
async def main():
client = RealtimeClient("ws://localhost:8000/ws/v1")
await client.connect()
await client.subscribe("public:notifications", message_handler)
await client.publish("public:notifications", {"message": "Hello World!"})
# Keep listening
await asyncio.sleep(10)
await client.disconnect()
asyncio.run(main())
Authenticated Usage
# Create auth token first
import httpx
async with httpx.AsyncClient() as client:
response = await client.post("http://localhost:8000/api/v1/auth/token", json={
"user_id": "user123",
"teams": ["content-team"],
"permissions": ["publish_public"]
})
token = response.json()["token"]
# Use token with WebSocket client
client = RealtimeClient("ws://localhost:8000/ws/v1", auth_token=token)
await client.connect()
await client.subscribe("team:content-team", team_handler)
HTTP API
Publish Messages
# Create authentication token
curl -X POST http://localhost:8000/api/v1/auth/token \
-H "Content-Type: application/json" \
-d '{"user_id": "user123", "teams": ["content-team"]}'
# Publish to channel
curl -X POST http://localhost:8000/api/v1/publish/public:alerts \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{"data": {"message": "System alert", "level": "warning"}}'
Search Messages
curl -X GET "http://localhost:8000/api/v1/search?q=alert&channel=public:alerts" \
-H "Authorization: Bearer <token>"
Get Analytics
curl -X GET "http://localhost:8000/api/v1/analytics?channel=public:alerts" \
-H "Authorization: Bearer <token>"
Channels
Dynamic Creation: Channels are created automatically when first used - no pre-configuration required.
Access Control Patterns:
public:*- Public channels (requirepublish_publicpermission to publish)team:<team_id>- Team-specific channels (require team membership)user:<user_id>- User-private channels (only accessible by the specific user)admin:*- Admin-only channels (require admin privileges)
Permissions: Dynamic strings - create any permission name. Built-in ones: publish_public, read_analytics.
WebSocket API
Connection
Connect to ws://localhost:8000/ws/v1 with optional authentication:
- Header:
Authorization: Bearer <token> - Query param:
?token=<token>
Message Format
All WebSocket messages follow this schema:
{
"id": "uuid",
"type": "command|query|event|response|error",
"action": "subscribe|unsubscribe|publish|ping|...",
"version": "v1",
"payload": {},
"metadata": {
"timestamp": "2024-01-15T10:30:00Z",
"user_id": "user123",
"session_id": "session456"
}
}
Subscribe to Channel
{
"type": "command",
"action": "subscribe",
"payload": {
"channel": "public:notifications",
"filters": {"user_id": "user123"}
}
}
Publish Message
{
"type": "command",
"action": "publish",
"payload": {
"channel": "public:notifications",
"data": {"message": "Hello World!"}
}
}
Receive Messages
{
"type": "event",
"action": "message",
"payload": {
"channel": "public:notifications",
"data": {"message": "Hello World!"}
}
}
Configuration
Environment variables (see .env.example):
# Database
DATABASE_URL=postgresql+asyncpg://gnosari:gnosari123@localhost:5432/gnosari_realtime
# Redis
REDIS_URL=redis://localhost:6379/0
# OpenSearch
OPENSEARCH_URL=http://localhost:9200
# JWT Authentication
JWT_SECRET_KEY=your-secret-key
JWT_ALGORITHM=HS256
JWT_EXPIRE_MINUTES=1440
# Server
HOST=0.0.0.0
PORT=8000
DEBUG=true
LOG_LEVEL=INFO
# Rate Limiting
RATE_LIMIT_PER_MINUTE=100
MAX_CONNECTIONS_PER_CLIENT=5
Development
Running Tests
The project includes a comprehensive test suite with unit, integration, and end-to-end tests.
# Run all tests
./scripts/test.sh
# Quick test runner for development
./scripts/run_quick_tests.py
# Run specific test categories
poetry run pytest tests/unit/ # Unit tests only
poetry run pytest tests/integration/ # Integration tests only
# Run with coverage
poetry run pytest --cov=src/gnosari_realtime --cov-report=html
# Run specific test file
poetry run pytest tests/unit/test_schemas.py -v
Test Categories
-
Unit Tests: Core component testing with mocked dependencies
- Message schema validation
- Authentication and authorization logic
- Channel management and subscription handling
- Python SDK client functionality
-
Integration Tests: Service integration testing
- WebSocket communication protocols
- HTTP API endpoint functionality
- Database and external service integration
-
End-to-End Tests: Complete workflow testing
- Multi-client pub/sub scenarios
- Authentication flows
- Error recovery and resilience
Code Quality
# Format code
poetry run black .
# Lint code
poetry run ruff check .
# Type checking
poetry run mypy .
# Run all quality checks
./scripts/test.sh # Includes quality checks
Database Migrations
# Create new migration
poetry run alembic revision --autogenerate -m "description"
# Apply migrations
poetry run alembic upgrade head
# Rollback migration
poetry run alembic downgrade -1
Architecture
┌─────────────────────┐
│ Transport Layer │ (Starlette WebSocket)
├─────────────────────┤
│ Application Layer │ (Connection Management, Message Routing)
├─────────────────────┤
│ Integration Layer │ (Optional AI Processing Hooks)
├─────────────────────┤
│ Domain Layer │ (Channel Management, Message Schemas)
├─────────────────────┤
│ Infrastructure Layer│ (Database, Redis, OpenSearch)
└─────────────────────┘
Key Components
- Connection Manager: Handles WebSocket lifecycle and routing
- Channel Manager: Manages channel subscriptions and message routing
- Message Schemas: Pydantic models for type-safe message handling
- Authentication: JWT-based authentication with role-based access control
- Persistence: PostgreSQL for message history and connection tracking
- Search: OpenSearch for full-text search and analytics
- Caching: Redis for pub/sub and high-performance caching
Scaling
Horizontal Scaling
The server supports horizontal scaling through Redis pub/sub:
- Run multiple server instances
- All instances connect to the same Redis
- Messages are automatically distributed across instances
- WebSocket connections are load-balanced
Performance Tuning
- Adjust PostgreSQL connection pool size
- Configure Redis memory limits
- Set appropriate OpenSearch sharding
- Tune rate limiting parameters
Security
- Authentication: JWT tokens with configurable expiration
- Authorization: Channel-based access control with team/user permissions
- Rate Limiting: Per-client message and connection limits
- Input Validation: Comprehensive message validation and sanitization
- CORS: Configurable CORS policies for web applications
Monitoring
Health Check
curl http://localhost:8000/health
Server Statistics
curl http://localhost:8000/api/v1/stats
OpenSearch Dashboards
Access analytics at http://localhost:5601 when using Docker Compose.
Examples & Demos
Quick Demo
To see the server in action with real-time messaging:
# Start the server first
docker compose up -d
# Run the simple demo (1 publisher, 1 subscriber)
./run_demo.sh simple
# Run the full live demo (3 publishers, 2 subscribers, 60 seconds)
./run_demo.sh live
Demo Features
-
Simple Demo (
examples/simple_demo.py):- 1 publisher sending 5 different message types
- 1 subscriber receiving and displaying messages
- Perfect for quick testing and understanding basic functionality
-
Live Demo (
examples/live_demo.py):- 3 publishers with different sending intervals (fast, medium, slow)
- 2 subscribers listening to multiple channels
- Real-time statistics and colored output
- Demonstrates concurrent messaging and channel routing
- Generates realistic message types: alerts, metrics, notifications, status updates
Other Examples
See the examples/ directory for complete working examples:
basic_client.py- Simple pub/sub clientauthenticated_client.py- Client with authentication and team channelshttp_publisher.py- Publishing via HTTP APIsimple_demo.py- Quick demonstration scriptlive_demo.py- Full-featured real-time demorun_demo.sh- Demo runner script with multiple options
Demo Runner Usage
# Available demo options
./run_demo.sh help
# Quick functionality test
./run_demo.sh simple
# Full real-time demonstration
./run_demo.sh live
# Check server status
./run_demo.sh check
# Run specific examples
./run_demo.sh basic # Basic client
./run_demo.sh auth # Authenticated client
./run_demo.sh http # HTTP publisher
Documentation
Comprehensive documentation is available in the docs/ directory:
- LLM Quick Reference - Concise guide for AI/LLM agents and developers
- API Reference - Complete WebSocket and HTTP API documentation
- System Architecture - Technical architecture and design patterns
- Kubernetes Deployment - Production deployment guide with Helm charts
Contributing
- Fork the repository
- Create a feature branch
- Make changes with tests
- Run quality checks (
black,ruff,mypy,pytest) - Submit a pull request
License
[License information here]
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gnosari_realtime-0.1.0.tar.gz.
File metadata
- Download URL: gnosari_realtime-0.1.0.tar.gz
- Upload date:
- Size: 28.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.13.2 Darwin/23.4.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f86fba8b35cc7b6beafb2b6b82a6e98e2e1b5d003367b4a8b5c6b08d39055de4
|
|
| MD5 |
4b21e06ff16a9841c5bed92266dfd696
|
|
| BLAKE2b-256 |
db5ae43822cac4927c48c5e8681e1858ed03b620c8754ab6da1b51872db5b4e1
|
File details
Details for the file gnosari_realtime-0.1.0-py3-none-any.whl.
File metadata
- Download URL: gnosari_realtime-0.1.0-py3-none-any.whl
- Upload date:
- Size: 31.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.13.2 Darwin/23.4.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8b650ef71d9dfcc604e84e30c74e08f97038df2a701c8fd9e3a03a29c2723f07
|
|
| MD5 |
f0668ced0d325a761a4f226a26c28647
|
|
| BLAKE2b-256 |
d20d3b11c282b238ef8ceb05920c9bdc264049fb72120b291a5edb2e82fbced7
|