Python SDK for DigiXT Real-Time Data Delivery Infrastructure
Project description
DigiXT Real-Time SDK for Python
A Python SDK for connecting to DigiXT real-time data delivery infrastructure powered by Centrifugo and Kafka.
Features
- 🔌 WebSocket Connection: Async WebSocket connection to Centrifugo
- 🔐 Authentication: JWT token-based authentication via auth service
- 📡 Channel Subscriptions: Subscribe to real-time data channels
- 🔄 Auto Reconnection: Automatic reconnection with configurable retry logic
- ✅ API Key Validation: Periodic API key validation to ensure connection validity
- 📨 Message Publishing: Publish messages to channels via HTTP API
- 🎯 Event Handling: Comprehensive event system for connection and message events
Installation
Install from PyPI (Recommended)
pip install digixt-realtime-sdk
Install from Source
If you need to install from the source repository:
# Clone the repository
git clone https://github.com/digixt/digixt-centrifugo.git
cd digixt-centrifugo/clients/python
# Install in development mode
pip install -e .
# Or install in production mode
pip install .
Development Installation
For contributing to the SDK:
cd clients/python
pip install -e ".[dev]"
This installs the package in editable mode with development dependencies including pytest, black, and flake8.
Quick Start
Basic Usage
import asyncio
from digixt_realtime_sdk import DigiXTSDK
async def main():
# Initialize SDK
sdk = DigiXTSDK(
url='ws://localhost:8000/connection/websocket',
auth_url='http://localhost:3000',
username='myuser',
api_key='my-api-key',
user='myuser', # User ID for presence tracking
debug=True
)
# Set up event handlers
sdk.on('connected', lambda ctx: print('✅ Connected!', ctx))
sdk.on('disconnected', lambda ctx: print('❌ Disconnected', ctx))
sdk.on('error', lambda err: print('❌ Error:', err))
sdk.on('api_key_expired', lambda err: print('⚠️ API key expired:', err))
# Connect to server
await sdk.connect()
# Subscribe to channel with message handler
async def on_message(ctx):
print(f"📨 Message on {ctx['channel']}: {ctx['data']}")
await sdk.subscribe('updates', on_publication=on_message)
# Keep connection alive
print("Listening for messages... Press Ctrl+C to stop")
try:
await asyncio.sleep(3600) # Run for 1 hour
except KeyboardInterrupt:
print("\nShutting down...")
# Disconnect
await sdk.disconnect()
if __name__ == '__main__':
asyncio.run(main())
Advanced Usage
import asyncio
from digixt_realtime_sdk import DigiXTSDK
async def main():
sdk = DigiXTSDK(
url='ws://localhost:8000/connection/websocket',
auth_url='http://localhost:3000',
username='myuser',
api_key='my-api-key',
user='myuser',
debug=True,
reconnect_timeout=5,
max_reconnect_attempts=10,
api_key_validation_interval=30
)
# Connection events
def on_connected(ctx):
print(f"Connected! Client ID: {ctx.get('client')}")
print(f"Connection state: {sdk.get_connection_state()}")
def on_disconnected(ctx):
print(f"Disconnected: {ctx.get('reason')}")
def on_error(err):
print(f"Error: {err.get('error')}")
def on_api_key_expired(err):
print(f"API key expired: {err.get('error')}")
# Handle API key expiration (e.g., refresh token)
sdk.on('connected', on_connected)
sdk.on('disconnected', on_disconnected)
sdk.on('error', on_error)
sdk.on('api_key_expired', on_api_key_expired)
# Connect
await sdk.connect()
# Subscribe to multiple channels
async def on_updates(ctx):
print(f"Updates channel: {ctx['data']}")
async def on_telemetry(ctx):
print(f"Telemetry channel: {ctx['data']}")
await sdk.subscribe('updates', on_publication=on_updates)
await sdk.subscribe('telemetry', on_publication=on_telemetry)
# Publish a message (via HTTP API)
try:
result = await sdk.publish('updates', {'message': 'Hello from Python!'})
print(f"Published: {result}")
except Exception as e:
print(f"Publish failed: {e}")
# Keep running
await asyncio.sleep(60)
# Unsubscribe from a channel
await sdk.unsubscribe('telemetry')
# Disconnect
await sdk.disconnect()
if __name__ == '__main__':
asyncio.run(main())
API Reference
DigiXTSDK
Constructor
DigiXTSDK(
url='ws://localhost:8000/connection/websocket',
api_url='http://localhost:8000/api',
auth_url='http://localhost:3000',
api_key='',
username='',
user=None,
token=None,
debug=False,
reconnect_timeout=5,
max_reconnect_attempts=10,
api_key_validation_interval=30
)
Parameters:
url(str): WebSocket URL for Centrifugoapi_url(str): HTTP API URL for Centrifugoauth_url(str): Auth service URLapi_key(str): API key for authenticationusername(str): Username for authenticationuser(str, optional): User ID for token generation (defaults to username or generates unique ID)token(str, optional): Pre-generated JWT tokendebug(bool): Enable debug loggingreconnect_timeout(int): Seconds to wait before reconnectingmax_reconnect_attempts(int): Maximum reconnection attemptsapi_key_validation_interval(int): Seconds between API key validations
Methods
async connect()
Connect to Centrifugo WebSocket server.
Raises:
ConnectionError: If connection failsAuthenticationError: If authentication fails
async disconnect()
Disconnect from server and clean up resources.
async subscribe(channel, on_publication=None, on_join=None, on_leave=None, on_error=None)
Subscribe to a channel.
Parameters:
channel(str): Channel name to subscribe toon_publication(callable, optional): Callback for received messageson_join(callable, optional): Callback when user joins channelon_leave(callable, optional): Callback when user leaves channelon_error(callable, optional): Callback for subscription errors
Raises:
SubscriptionError: If subscription fails
async unsubscribe(channel)
Unsubscribe from a channel.
Parameters:
channel(str): Channel name to unsubscribe from
async publish(channel, data)
Publish a message to a channel via HTTP API.
Parameters:
channel(str): Channel namedata(any): Message data to publish
Returns:
- Response from Centrifugo API
Raises:
DigiXTError: If publish fails
on(event, handler)
Register event handler.
Parameters:
event(str): Event name ('connected', 'disconnected', 'error', etc.)handler(callable): Callback function
off(event, handler)
Unregister event handler.
Parameters:
event(str): Event namehandler(callable): Callback function to remove
get_connection_state()
Get current connection state.
Returns:
- Dictionary with connection state information
Events
connected: Emitted when connection is establisheddisconnected: Emitted when connection is losterror: Emitted when an error occursapi_key_expired: Emitted when API key validation failsapi_key_validated: Emitted when API key validation succeedssubscribed: Emitted when successfully subscribed to a channelreconnect_failed: Emitted when max reconnection attempts reached
Exceptions
DigiXTError: Base exception for all SDK errorsConnectionError: Raised when connection failsAuthenticationError: Raised when authentication failsSubscriptionError: Raised when subscription operations failTokenError: Raised when token operations fail
Usage Examples
Command Line (CLI)
After installing the package, you can use the CLI tool digixt-sdk:
# Basic usage - subscribe to a channel
digixt-sdk \
--url ws://localhost:8000/connection/websocket \
--auth-url http://localhost:3000 \
--username myuser \
--api-key my-api-key \
--channel updates
# Subscribe to multiple channels
digixt-sdk \
--url ws://localhost:8000/connection/websocket \
--auth-url http://localhost:3000 \
--username myuser \
--api-key my-api-key \
--channels updates telemetry
# Publish a message
digixt-sdk \
--url ws://localhost:8000/connection/websocket \
--auth-url http://localhost:3000 \
--username myuser \
--api-key my-api-key \
--channel updates \
--publish '{"message": "Hello from CLI"}'
# Run with timeout (60 seconds)
digixt-sdk \
--url ws://localhost:8000/connection/websocket \
--auth-url http://localhost:3000 \
--username myuser \
--api-key my-api-key \
--channel updates \
--timeout 60
# JSON output mode
digixt-sdk \
--url ws://localhost:8000/connection/websocket \
--auth-url http://localhost:3000 \
--username myuser \
--api-key my-api-key \
--channel updates \
--json-output
# Debug mode
digixt-sdk \
--url ws://localhost:8000/connection/websocket \
--auth-url http://localhost:3000 \
--username myuser \
--api-key my-api-key \
--channel updates \
--debug
You can also use it as a Python module:
python -m digixt_realtime_sdk.cli --help
Or use it programmatically in Python scripts:
import asyncio
from digixt_realtime_sdk import DigiXTSDK
async def main():
sdk = DigiXTSDK(
url='ws://localhost:8000/connection/websocket',
auth_url='http://localhost:3000',
username='myuser',
api_key='my-api-key'
)
await sdk.connect()
await sdk.subscribe('updates', on_publication=lambda ctx: print(ctx['data']))
await asyncio.sleep(60)
await sdk.disconnect()
asyncio.run(main())
Jupyter Notebook
Jupyter notebooks support top-level await, making async SDK usage straightforward:
# In a Jupyter notebook cell
import asyncio
from digixt_realtime_sdk import DigiXTSDK
# Initialize
sdk = DigiXTSDK(
url='ws://localhost:8000/connection/websocket',
auth_url='http://localhost:3000',
username='myuser',
api_key='my-api-key',
debug=True
)
# Set up handlers
messages = []
def on_message(ctx):
messages.append(ctx['data'])
print(f"Received: {ctx['data']}")
sdk.on('connected', lambda ctx: print('Connected!'))
sdk.on('error', lambda err: print(f'Error: {err}'))
# Connect and subscribe (use await at top level)
await sdk.connect()
await sdk.subscribe('updates', on_publication=on_message)
# Analyze messages later
print(f"Total messages: {len(messages)}")
See examples/jupyter_example.ipynb for a complete Jupyter notebook example.
Python Scripts
See the examples/ directory for more usage examples:
basic_example.py: Basic connection and subscriptionproduction_example.py: Production-ready example with error handlingjupyter_example.ipynb: Jupyter notebook example
Requirements
- Python 3.8+
- websockets>=12.0
- aiohttp>=3.9.0
License
MIT
Support
For issues and questions, please open an issue on GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file digixt_realtime_sdk-1.0.0.tar.gz.
File metadata
- Download URL: digixt_realtime_sdk-1.0.0.tar.gz
- Upload date:
- Size: 16.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
74eefb6f0d9836ef0d1ac89b214d528cd4cf7bf29d15561e45783941982f03c4
|
|
| MD5 |
ccf97e9b73347b5bf6dafd0424832070
|
|
| BLAKE2b-256 |
cc8e28f545817ce52b0211ba5b3cfbdb8ce3c63bb4ac91f4af6edb5f7ee836eb
|
File details
Details for the file digixt_realtime_sdk-1.0.0-py3-none-any.whl.
File metadata
- Download URL: digixt_realtime_sdk-1.0.0-py3-none-any.whl
- Upload date:
- Size: 14.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
963aa30d0bd485b504f620e55a1b2c4004b42783b718a4006156a5c13feba025
|
|
| MD5 |
0b39fd98ef086264df02999324321973
|
|
| BLAKE2b-256 |
2d0c6cfeb75fa9d313f8ca330f65a80c8647bae6061bfba5a7f8741de9488be7
|