Skip to main content

WebSocket module for Bazis framework.

Project description

Bazis WS

PyPI version Python Versions License

Extension package for Bazis, providing WebSocket connections with authentication support, Redis pub/sub, and user online status tracking.

Quick Start

uv add bazis-ws
# Add mixin to user model
from django.contrib.auth.models import AbstractUser
from bazis.contrib.ws.models_abstract import UserWsMixin
from bazis.core.models_abstract import JsonApiMixin

class User(UserWsMixin, JsonApiMixin, AbstractUser):
    """User with WebSocket support"""
    class Meta:
        verbose_name = 'User'
        verbose_name_plural = 'Users'

# Register WebSocket route
from bazis.core.app import app
from bazis.contrib.ws.ws import ws_route

app.router.routes.append(ws_route)

Table of Contents

Description

Bazis WS is an extension package for the Bazis framework that provides a fully-featured WebSocket communication system. The package includes:

  • UserWsMixin — mixin for user model with WebSocket support
  • WsEndpoint — ready-to-use WebSocket endpoint with JWT authentication
  • Redis Pub/Sub — messaging system between servers and clients
  • Online Status Tracking — automatic detection of online/offline users
  • Personal Channels — each user has their own channel for receiving messages
  • Common Channel — for broadcasting messages to all connected users

This package requires installation of bazis and a running Redis server.

Requirements

  • Python: 3.12+
  • bazis: latest version
  • PostgreSQL: 12+
  • Redis: For pub/sub and caching
  • Additional libraries:
    • python-jose — for JWT handling
    • psycopg[binary] — for asynchronous PostgreSQL access
    • redis — for Redis operations

Installation

Using uv (recommended)

uv add bazis-ws

Using pip

pip install bazis-ws

Core Components

UserWsMixin

Mixin for user model that adds WebSocket support.

Location: bazis.contrib.ws.models_abstract.UserWsMixin

Properties:

  • user_channel — user's personal channel in Redis (format: user_ws:{user_id})
  • ws_session — WebSocket session key in Redis (format: user_ws:{user_id}:session)
  • is_online — boolean property indicating whether the user is connected to WebSocket

Methods:

  • ws_publish(data: dict) -> int — send message to user via their personal channel

Usage Example:

from django.contrib.auth.models import AbstractUser
from bazis.contrib.ws.models_abstract import UserWsMixin
from bazis.core.models_abstract import JsonApiMixin

class User(UserWsMixin, JsonApiMixin, AbstractUser):
    """User with WebSocket support"""
    class Meta:
        verbose_name = 'User'
        verbose_name_plural = 'Users'

WsEndpoint

WebSocket endpoint with authentication and session management support.

Location: bazis.contrib.ws.ws.WsEndpoint

Based on: starlette.endpoints.WebSocketEndpoint

Key Features:

  1. JWT Token Authentication:

    • On connection: ws://api.example.com/ws?token=<jwt_token>
    • During session: sending {"token": "<jwt_token>"}
  2. Automatic Online Status Tracking:

    • Status update every 5 seconds
    • Redis entry TTL: 10 seconds
  3. Channel Subscription:

    • User's personal channel
    • Common channel for all users
  4. Ping/Pong:

    • Client sends {"type": "ping"}
    • Server responds {"type": "pong"}

Connection Lifecycle:

1. on_connect()       accept connection
2. session_start()    authenticate and start background tasks
   ├─ task_online_update()     update online status
   └─ task_listen_queue()      listen to Redis channels
3. on_receive()       handle incoming messages
4. on_disconnect()    cleanup resources
5. session_stop()     stop background tasks

Architecture

┌─────────────┐                    ┌──────────────┐
│   Client    │◄──WebSocket───────►│  WsEndpoint  │
│ (Browser/   │                    │              │
│  Mobile)    │                    │  Starlette   │
└─────────────┘                    └───────┬──────┘
                                           │
                                           │ JWT Auth
                                           ▼
                                    ┌──────────────┐
                                    │  PostgreSQL  │
                                    │  (User DB)   │
                                    └──────────────┘
                                           │
                                           │
                                           ▼
┌─────────────┐                    ┌──────────────┐
│   Backend   │────publish────────►│    Redis     │
│   Service   │                    │   Pub/Sub    │
└─────────────┘                    └───────┬──────┘
                                           │
                                           │ subscribe
                                           ▼
                                    ┌──────────────┐
                                    │  WsEndpoint  │
                                    │              │
                                    └───────┬──────┘
                                           │
                                           │ send_json
                                           ▼
                                    ┌──────────────┐
                                    │   Client     │
                                    └──────────────┘

Redis Channels:

  • user_ws:{user_id} — user's personal channel
  • user_ws:common — common channel for all users
  • user_ws:{user_id}:session — active session key (TTL: 10 seconds)

Usage

Project Setup

1. Add mixin to user model:

# models.py
from django.contrib.auth.models import AbstractUser
from bazis.contrib.ws.models_abstract import UserWsMixin
from bazis.core.models_abstract import JsonApiMixin

class User(UserWsMixin, JsonApiMixin, AbstractUser):
    class Meta:
        verbose_name = 'User'
        verbose_name_plural = 'Users'

2. Add is_online field to user routes:

# routes.py
from bazis.contrib.ws.routes_abstract import UserWsRouteSet
from django.apps import apps

class UserRouteSet(UserWsRouteSet):
    model = apps.get_model('myapp.User')

3. Register WebSocket route:

# main.py or app.py
from bazis.core.app import app
from bazis.contrib.ws.ws import ws_route

app.router.routes.append(ws_route)

Connecting to WebSocket

JavaScript Client

class WebSocketClient {
  constructor(url, token) {
    this.url = url;
    this.token = token;
    this.ws = null;
    this.reconnectInterval = 5000;
    this.pingInterval = 30000;
    this.pingTimer = null;
  }

  connect() {
    this.ws = new WebSocket(`${this.url}?token=${this.token}`);

    this.ws.onopen = () => {
      console.log('WebSocket connected');
      this.startPing();
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleMessage(data);
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };

    this.ws.onclose = () => {
      console.log('WebSocket disconnected');
      this.stopPing();
      // Reconnect
      setTimeout(() => this.connect(), this.reconnectInterval);
    };
  }

  handleMessage(data) {
    switch (data.type) {
      case 'pong':
        console.log('Received pong');
        break;
      case 'data':
        console.log('Received data:', data.data);
        // Process received data
        this.onData(data.data);
        break;
      case 'error':
        console.error('Error:', data.code, data.detail);
        break;
      default:
        console.log('Unknown message type:', data);
    }
  }

  startPing() {
    this.pingTimer = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: 'ping' }));
      }
    }, this.pingInterval);
  }

  stopPing() {
    if (this.pingTimer) {
      clearInterval(this.pingTimer);
      this.pingTimer = null;
    }
  }

  onData(data) {
    // Override this method to handle data
    console.log('Data received:', data);
  }

  disconnect() {
    this.stopPing();
    if (this.ws) {
      this.ws.close();
    }
  }
}

// Usage
const ws = new WebSocketClient('ws://api.example.com/ws', jwtToken);
ws.onData = (data) => {
  console.log('Processing data:', data);
  // Your processing logic
};
ws.connect();

Python Client

import asyncio
import json
import websockets

async def websocket_client(url, token):
    uri = f"{url}?token={token}"
    
    async with websockets.connect(uri) as websocket:
        print("WebSocket connected")
        
        # Background task for ping
        async def send_ping():
            while True:
                await asyncio.sleep(30)
                await websocket.send(json.dumps({"type": "ping"}))
        
        ping_task = asyncio.create_task(send_ping())
        
        try:
            async for message in websocket:
                data = json.loads(message)
                
                if data['type'] == 'pong':
                    print("Received pong")
                elif data['type'] == 'data':
                    print(f"Received data: {data['data']}")
                elif data['type'] == 'error':
                    print(f"Error: {data['code']} - {data['detail']}")
        finally:
            ping_task.cancel()

# Usage
asyncio.run(websocket_client('ws://api.example.com/ws', jwt_token))

Sending Messages to Users

From Django View or API Endpoint

from django.contrib.auth import get_user_model

User = get_user_model()

def send_notification_to_user(user_id, message):
    """Send notification to specific user"""
    user = User.objects.get(id=user_id)
    
    if user.is_online:
        user.ws_publish({
            'type': 'notification',
            'title': 'New Notification',
            'message': message,
            'timestamp': datetime.now().isoformat()
        })
        return True
    return False

From Celery Task

from celery import shared_task
from django.contrib.auth import get_user_model

User = get_user_model()

@shared_task
def notify_user_async(user_id, notification_data):
    """Asynchronously send notification to user"""
    try:
        user = User.objects.get(id=user_id)
        user.ws_publish({
            'type': 'task_completed',
            'data': notification_data
        })
    except User.DoesNotExist:
        pass

Broadcasting to All Online Users

from redis import Redis
from django.conf import settings
import json

redis = Redis.from_url(settings.CACHES['default']['LOCATION'])

def broadcast_message(message):
    """Send message to all connected users"""
    from bazis.contrib.ws import COMMON_CHANNEL
    
    redis.publish(COMMON_CHANNEL, json.dumps({
        'type': 'broadcast',
        'message': message
    }))

Checking Online Status

In Django Template

from django.contrib.auth import get_user_model

User = get_user_model()

def user_list_view(request):
    users = User.objects.all()
    
    online_users = [user for user in users if user.is_online]
    offline_users = [user for user in users if not user.is_online]
    
    return render(request, 'users.html', {
        'online_users': online_users,
        'offline_users': offline_users
    })

Via API (using UserWsRouteSet)

GET /api/v1/<app>/<resource>/
Authorization: Bearer <token>

Response:

{
  "data": [
    {
      "type": "app.user",
      "id": "123",
      "attributes": {
        "username": "john_doe",
        "email": "john@example.com",
        "is_online": true
      }
    }
  ]
}

WebSocket Protocol

Messages from Client

Ping

{
  "type": "ping"
}

Authentication During Session

{
  "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}

Messages from Server

Pong

{
  "type": "pong"
}

Data

{
  "type": "data",
  "data": {
    "type": "notification",
    "title": "New Message",
    "message": "You have a new message from admin"
  }
}

Error

{
  "type": "error",
  "code": "expired_token",
  "detail": "Token expired"
}

Error Codes:

  • expired_token — JWT token has expired
  • user_not_found — user not found in database

Examples

Real-time Chat Example

Backend (sending message):

from django.contrib.auth import get_user_model
from django.http import JsonResponse
from django.views.decorators.http import require_POST
import json

User = get_user_model()

@require_POST
def send_message(request):
    data = json.loads(request.body)
    recipient_id = data.get('recipient_id')
    message = data.get('message')
    
    try:
        recipient = User.objects.get(id=recipient_id)
        
        # Send message via WebSocket
        if recipient.is_online:
            recipient.ws_publish({
                'type': 'chat_message',
                'sender': {
                    'id': str(request.user.id),
                    'username': request.user.username
                },
                'message': message,
                'timestamp': datetime.now().isoformat()
            })
            return JsonResponse({'status': 'sent'})
        else:
            # Save offline message
            return JsonResponse({'status': 'saved_offline'})
            
    except User.DoesNotExist:
        return JsonResponse({'error': 'User not found'}, status=404)

Frontend (receiving message):

class ChatClient extends WebSocketClient {
  onData(data) {
    if (data.type === 'chat_message') {
      this.displayMessage(data.sender, data.message, data.timestamp);
    }
  }

  displayMessage(sender, message, timestamp) {
    const messageElement = document.createElement('div');
    messageElement.className = 'chat-message';
    messageElement.innerHTML = `
      <div class="sender">${sender.username}</div>
      <div class="message">${message}</div>
      <div class="timestamp">${new Date(timestamp).toLocaleString()}</div>
    `;
    document.getElementById('chat-messages').appendChild(messageElement);
  }
}

const chat = new ChatClient('ws://api.example.com/ws', jwtToken);
chat.connect();

Task Notification Example

Celery Task:

from celery import shared_task
from django.contrib.auth import get_user_model

User = get_user_model()

@shared_task
def process_long_running_task(user_id, task_data):
    """Long-running task with user notification"""
    user = User.objects.get(id=user_id)
    
    # Notify about start
    user.ws_publish({
        'type': 'task_started',
        'task_id': process_long_running_task.request.id,
        'message': 'Processing started...'
    })
    
    try:
        # Execute task
        result = perform_processing(task_data)
        
        # Notify about success
        user.ws_publish({
            'type': 'task_completed',
            'task_id': process_long_running_task.request.id,
            'result': result,
            'message': 'Processing completed successfully'
        })
        
    except Exception as e:
        # Notify about error
        user.ws_publish({
            'type': 'task_failed',
            'task_id': process_long_running_task.request.id,
            'error': str(e),
            'message': 'An error occurred during processing'
        })

Online Indicator Example

JavaScript Component:

class OnlineIndicator {
  constructor(userId) {
    this.userId = userId;
    this.indicator = document.getElementById(`user-${userId}-status`);
  }

  async checkStatus() {
    const response = await fetch(`/api/v1/users/user/${this.userId}/`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });
    
    const data = await response.json();
    const isOnline = data.data.attributes.is_online;
    
    this.updateIndicator(isOnline);
  }

  updateIndicator(isOnline) {
    if (isOnline) {
      this.indicator.classList.add('online');
      this.indicator.classList.remove('offline');
      this.indicator.textContent = 'Online';
    } else {
      this.indicator.classList.add('offline');
      this.indicator.classList.remove('online');
      this.indicator.textContent = 'Offline';
    }
  }
}

// Periodic status check
const indicator = new OnlineIndicator('user-123');
setInterval(() => indicator.checkStatus(), 10000);

License

Apache License 2.0

See LICENSE file for details.

Links

Support

If you have questions or issues:


Made with ❤️ by Bazis team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bazis_ws-2.2.1.tar.gz (74.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bazis_ws-2.2.1-py3-none-any.whl (13.0 kB view details)

Uploaded Python 3

File details

Details for the file bazis_ws-2.2.1.tar.gz.

File metadata

  • Download URL: bazis_ws-2.2.1.tar.gz
  • Upload date:
  • Size: 74.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for bazis_ws-2.2.1.tar.gz
Algorithm Hash digest
SHA256 764fd26f804ba0ca70c665db3db8daaa8d7fe39298048e3ab27567bf6985c8e6
MD5 1d72ba69524cfb6fa23aa67d38ec2f16
BLAKE2b-256 d3458feee397dcc84b813bc4b67f6fc17553d3f164fd6f7b3f1ce3381df44498

See more details on using hashes here.

File details

Details for the file bazis_ws-2.2.1-py3-none-any.whl.

File metadata

  • Download URL: bazis_ws-2.2.1-py3-none-any.whl
  • Upload date:
  • Size: 13.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for bazis_ws-2.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f076f1555205ca0b99ec65ea15b760f3708ab51d9c615bdb9589b5a230b81f5c
MD5 5d03880dd7494b71259962dca3477f69
BLAKE2b-256 1e74a533547c0e4e4a5bf07a38e16f3224413481f348379ea0d7bc4971632d98

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page