Skip to main content

Python SDK for DigiXT Real-Time Data Delivery Infrastructure

Project description

DigiXT Real-Time SDK for Python

A Python SDK for connecting to DigiXT real-time data delivery infrastructure powered by Centrifugo and Kafka.

Features

  • 🔌 WebSocket Connection: Async WebSocket connection to Centrifugo
  • 🔐 Authentication: JWT token-based authentication via auth service
  • 📡 Channel Subscriptions: Subscribe to real-time data channels
  • 🔄 Auto Reconnection: Automatic reconnection with configurable retry logic
  • API Key Validation: Periodic API key validation to ensure connection validity
  • 📨 Message Publishing: Publish messages to channels via HTTP API
  • 🎯 Event Handling: Comprehensive event system for connection and message events

Installation

Install from PyPI (Recommended)

pip install digixt-realtime-sdk

Install from Source

If you need to install from the source repository:

# Clone the repository
git clone https://github.com/digixt/digixt-centrifugo.git
cd digixt-centrifugo/clients/python

# Install in development mode
pip install -e .

# Or install in production mode
pip install .

Development Installation

For contributing to the SDK:

cd clients/python
pip install -e ".[dev]"

This installs the package in editable mode with development dependencies including pytest, black, and flake8.

Quick Start

Basic Usage

import asyncio
from digixt_realtime_sdk import DigiXTSDK

async def main():
    # Initialize SDK
    sdk = DigiXTSDK(
        url='ws://localhost:8000/connection/websocket',
        auth_url='http://localhost:3000',
        username='myuser',
        api_key='my-api-key',
        user='myuser',  # User ID for presence tracking
        debug=True
    )
    
    # Set up event handlers
    sdk.on('connected', lambda ctx: print('✅ Connected!', ctx))
    sdk.on('disconnected', lambda ctx: print('❌ Disconnected', ctx))
    sdk.on('error', lambda err: print('❌ Error:', err))
    sdk.on('api_key_expired', lambda err: print('⚠️ API key expired:', err))
    
    # Connect to server
    await sdk.connect()
    
    # Subscribe to channel with message handler
    async def on_message(ctx):
        print(f"📨 Message on {ctx['channel']}: {ctx['data']}")
    
    await sdk.subscribe('updates', on_publication=on_message)
    
    # Keep connection alive
    print("Listening for messages... Press Ctrl+C to stop")
    try:
        await asyncio.sleep(3600)  # Run for 1 hour
    except KeyboardInterrupt:
        print("\nShutting down...")
    
    # Disconnect
    await sdk.disconnect()

if __name__ == '__main__':
    asyncio.run(main())

Advanced Usage

import asyncio
from digixt_realtime_sdk import DigiXTSDK

async def main():
    sdk = DigiXTSDK(
        url='ws://localhost:8000/connection/websocket',
        auth_url='http://localhost:3000',
        username='myuser',
        api_key='my-api-key',
        user='myuser',
        debug=True,
        reconnect_timeout=5,
        max_reconnect_attempts=10,
        api_key_validation_interval=30
    )
    
    # Connection events
    def on_connected(ctx):
        print(f"Connected! Client ID: {ctx.get('client')}")
        print(f"Connection state: {sdk.get_connection_state()}")
    
    def on_disconnected(ctx):
        print(f"Disconnected: {ctx.get('reason')}")
    
    def on_error(err):
        print(f"Error: {err.get('error')}")
    
    def on_api_key_expired(err):
        print(f"API key expired: {err.get('error')}")
        # Handle API key expiration (e.g., refresh token)
    
    sdk.on('connected', on_connected)
    sdk.on('disconnected', on_disconnected)
    sdk.on('error', on_error)
    sdk.on('api_key_expired', on_api_key_expired)
    
    # Connect
    await sdk.connect()
    
    # Subscribe to multiple channels
    async def on_updates(ctx):
        print(f"Updates channel: {ctx['data']}")
    
    async def on_telemetry(ctx):
        print(f"Telemetry channel: {ctx['data']}")
    
    await sdk.subscribe('updates', on_publication=on_updates)
    await sdk.subscribe('telemetry', on_publication=on_telemetry)
    
    # Publish a message (via HTTP API)
    try:
        result = await sdk.publish('updates', {'message': 'Hello from Python!'})
        print(f"Published: {result}")
    except Exception as e:
        print(f"Publish failed: {e}")
    
    # Keep running
    await asyncio.sleep(60)
    
    # Unsubscribe from a channel
    await sdk.unsubscribe('telemetry')
    
    # Disconnect
    await sdk.disconnect()

if __name__ == '__main__':
    asyncio.run(main())

API Reference

DigiXTSDK

Constructor

DigiXTSDK(
    url='ws://localhost:8000/connection/websocket',
    api_url='http://localhost:8000/api',
    auth_url='http://localhost:3000',
    api_key='',
    username='',
    user=None,
    token=None,
    debug=False,
    reconnect_timeout=5,
    max_reconnect_attempts=10,
    api_key_validation_interval=30
)

Parameters:

  • url (str): WebSocket URL for Centrifugo
  • api_url (str): HTTP API URL for Centrifugo
  • auth_url (str): Auth service URL
  • api_key (str): API key for authentication
  • username (str): Username for authentication
  • user (str, optional): User ID for token generation (defaults to username or generates unique ID)
  • token (str, optional): Pre-generated JWT token
  • debug (bool): Enable debug logging
  • reconnect_timeout (int): Seconds to wait before reconnecting
  • max_reconnect_attempts (int): Maximum reconnection attempts
  • api_key_validation_interval (int): Seconds between API key validations

Methods

async connect()

Connect to Centrifugo WebSocket server.

Raises:

  • ConnectionError: If connection fails
  • AuthenticationError: If authentication fails
async disconnect()

Disconnect from server and clean up resources.

async subscribe(channel, on_publication=None, on_join=None, on_leave=None, on_error=None)

Subscribe to a channel.

Parameters:

  • channel (str): Channel name to subscribe to
  • on_publication (callable, optional): Callback for received messages
  • on_join (callable, optional): Callback when user joins channel
  • on_leave (callable, optional): Callback when user leaves channel
  • on_error (callable, optional): Callback for subscription errors

Raises:

  • SubscriptionError: If subscription fails
async unsubscribe(channel)

Unsubscribe from a channel.

Parameters:

  • channel (str): Channel name to unsubscribe from
async publish(channel, data)

Publish a message to a channel via HTTP API.

Parameters:

  • channel (str): Channel name
  • data (any): Message data to publish

Returns:

  • Response from Centrifugo API

Raises:

  • DigiXTError: If publish fails
on(event, handler)

Register event handler.

Parameters:

  • event (str): Event name ('connected', 'disconnected', 'error', etc.)
  • handler (callable): Callback function
off(event, handler)

Unregister event handler.

Parameters:

  • event (str): Event name
  • handler (callable): Callback function to remove
get_connection_state()

Get current connection state.

Returns:

  • Dictionary with connection state information

Events

  • connected: Emitted when connection is established
  • disconnected: Emitted when connection is lost
  • error: Emitted when an error occurs
  • api_key_expired: Emitted when API key validation fails
  • api_key_validated: Emitted when API key validation succeeds
  • subscribed: Emitted when successfully subscribed to a channel
  • reconnect_failed: Emitted when max reconnection attempts reached

Exceptions

  • DigiXTError: Base exception for all SDK errors
  • ConnectionError: Raised when connection fails
  • AuthenticationError: Raised when authentication fails
  • SubscriptionError: Raised when subscription operations fail
  • TokenError: Raised when token operations fail

Usage Examples

Command Line (CLI)

After installing the package, you can use the CLI tool digixt-sdk:

# Basic usage - subscribe to a channel
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates

# Subscribe to multiple channels
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channels updates telemetry

# Publish a message
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates \
  --publish '{"message": "Hello from CLI"}'

# Run with timeout (60 seconds)
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates \
  --timeout 60

# JSON output mode
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates \
  --json-output

# Debug mode
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates \
  --debug

You can also use it as a Python module:

python -m digixt_realtime_sdk.cli --help

Or use it programmatically in Python scripts:

import asyncio
from digixt_realtime_sdk import DigiXTSDK

async def main():
    sdk = DigiXTSDK(
        url='ws://localhost:8000/connection/websocket',
        auth_url='http://localhost:3000',
        username='myuser',
        api_key='my-api-key'
    )
    await sdk.connect()
    await sdk.subscribe('updates', on_publication=lambda ctx: print(ctx['data']))
    await asyncio.sleep(60)
    await sdk.disconnect()

asyncio.run(main())

Jupyter Notebook

Jupyter notebooks support top-level await, making async SDK usage straightforward:

# In a Jupyter notebook cell
import asyncio
from digixt_realtime_sdk import DigiXTSDK

# Initialize
sdk = DigiXTSDK(
    url='ws://localhost:8000/connection/websocket',
    auth_url='http://localhost:3000',
    username='myuser',
    api_key='my-api-key',
    debug=True
)

# Set up handlers
messages = []
def on_message(ctx):
    messages.append(ctx['data'])
    print(f"Received: {ctx['data']}")

sdk.on('connected', lambda ctx: print('Connected!'))
sdk.on('error', lambda err: print(f'Error: {err}'))

# Connect and subscribe (use await at top level)
await sdk.connect()
await sdk.subscribe('updates', on_publication=on_message)

# Analyze messages later
print(f"Total messages: {len(messages)}")

See examples/jupyter_example.ipynb for a complete Jupyter notebook example.

Python Scripts

See the examples/ directory for more usage examples:

  • basic_example.py: Basic connection and subscription
  • production_example.py: Production-ready example with error handling
  • jupyter_example.ipynb: Jupyter notebook example

Requirements

  • Python 3.8+
  • websockets>=12.0
  • aiohttp>=3.9.0

License

MIT

Support

For issues and questions, please open an issue on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

digixt_realtime_sdk-1.0.1.tar.gz (17.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

digixt_realtime_sdk-1.0.1-py3-none-any.whl (15.5 kB view details)

Uploaded Python 3

File details

Details for the file digixt_realtime_sdk-1.0.1.tar.gz.

File metadata

  • Download URL: digixt_realtime_sdk-1.0.1.tar.gz
  • Upload date:
  • Size: 17.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for digixt_realtime_sdk-1.0.1.tar.gz
Algorithm Hash digest
SHA256 af7dfd5063d460dea91ee6a0e5d98746238cf317d2e2d59c986861b9d21f5721
MD5 8f7ea8e22152ec0bc69f5db433fce70d
BLAKE2b-256 77d43da4ad56d5c0b795cb30c83737ae9b8fa24ccc98de8703b6a19386885e75

See more details on using hashes here.

File details

Details for the file digixt_realtime_sdk-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for digixt_realtime_sdk-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 17397a698274d22e6491904dc5d8653e51447b179d876bf1d36b29b63ae7a341
MD5 4cd2abd1668a43c305dc1b2660447278
BLAKE2b-256 78ba22e84c5dfc5461b2256f296c846e7ea562913e0583db14ded3ede7a98cc6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page