Python library for building CyTube microservices via Kryten bridge and NATS
Project description
kryten-py
Python library for building CyTube microservices via Kryten bridge and NATS.
Overview
kryten-py provides a high-level, type-safe API for interacting with CyTube servers through the Kryten bridge and NATS message bus. It enables rapid development of microservices like:
- Chat bots - Automated chat responses and commands
- DJ bots - Playlist management and automation
- Moderation tools - User management and content filtering
- Analytics systems - Event tracking and statistics
- Integration services - Connect CyTube to external platforms
Features
- ✅ Async-first API - Built on asyncio for high performance
- ✅ Type safety - Full typing with Pydantic models
- ✅ Decorator-based handlers - Intuitive event subscription
- ✅ Automatic reconnection - Resilient to network failures
- ✅ 18+ command methods - Complete CyTube control
- ✅ Health monitoring - Built-in metrics and status
- ✅ Lifecycle events - Service startup, shutdown, and coordination
- ✅ KeyValue store - State persistence with NATS JetStream
- ✅ Easy testing - Mock client for unit tests
- ✅ Comprehensive docs - Examples and API reference
Installation
Basic Installation
pip install kryten-py
With Optional Dependencies
# For YAML configuration support
pip install kryten-py[yaml]
# For environment variable loading
pip install kryten-py[dotenv]
# Install all extras
pip install kryten-py[all]
Development Installation
# Clone repository
git clone https://github.com/yourusername/kryten-py.git
cd kryten-py
# Install with Poetry
uv sync
# Or with pip in editable mode
pip install -e ".[all]"
Documentation
- Command Protocol - Guide to sending commands (READ THIS FIRST)
- Library Reference - Comprehensive API guide
- Deployment & Monitoring - Service lifecycle, heartbeats, and metrics
- State Management - KV Store best practices
- Error Handling - Exceptions and retry logic
- Examples - Complete code examples
Quick Start
Simple Echo Bot
import asyncio
from kryten import KrytenClient, ChatMessageEvent
async def main():
"""Simple echo bot that repeats user messages."""
config = {
"nats": {
"servers": ["nats://localhost:4222"],
"user": "kryten",
"password": "secret"
},
"channels": [
{"domain": "cytu.be", "channel": "lounge"}
]
}
async with KrytenClient(config) as client:
# Listen for events (1-to-Many broadcast)
@client.on("chatmsg")
async def on_chat(event: ChatMessageEvent):
"""Echo user messages."""
print(f"Chat: {event.username}: {event.message}")
# Send commands (1-to-1 direct)
if event.message.startswith("!ping"):
await client.send_command(
service="robot",
type="say",
body=f"Pong! {event.username}"
)
await client.run()
if event.username != "MyBot": # Don't echo ourselves
await client.send_chat(
event.channel,
f"{event.username} said: {event.message}"
)
print("Echo bot started!")
await client.run() # Run until Ctrl+C
if name == "main": asyncio.run(main())
### Configuration from File
**config.json:**
```json
{
"nats": {
"servers": ["nats://localhost:4222"],
"user": "${NATS_USER}",
"password": "${NATS_PASSWORD}"
},
"channels": [
{"domain": "cytu.be", "channel": "lounge"}
]
}
bot.py:
from kryten import KrytenClient, KrytenConfig
# Load configuration with environment variable substitution
config = KrytenConfig.from_json("config.json")
async with KrytenClient(config) as client:
# ... your bot logic
await client.run()
Core Concepts
Event Subscription
Use decorators to register event handlers:
@client.on("chatmsg")
async def handle_chat(event: ChatMessageEvent):
"""Handle all chat messages."""
print(f"{event.username}: {event.message}")
@client.on("chatmsg", channel="lounge")
async def handle_lounge_only(event: ChatMessageEvent):
"""Handle chat only from lounge channel."""
if "!ping" in event.message:
await client.send_chat(event.channel, "Pong!")
Command Publishing
Send commands to CyTube:
# Chat
await client.send_chat("lounge", "Hello world!")
await client.send_pm("lounge", "alice", "Private message")
# Playlist management
await client.add_media("lounge", "yt", "dQw4w9WgXcQ")
await client.delete_media("lounge", uid=42)
await client.shuffle_playlist("lounge")
# Playback control
await client.pause("lounge")
await client.play("lounge")
await client.seek("lounge", 30.0)
# Moderation
await client.kick_user("lounge", "spammer", reason="Spam")
await client.ban_user("lounge", "troll")
await client.mute_user("lounge", "noisy_user")
await client.shadow_mute_user("lounge", "subtle_troll")
await client.unmute_user("lounge", "reformed_user")
# Advanced moderation
await client.assign_leader("lounge", "trusted_dj")
await client.play_next("lounge") # Skip to next video
Health Monitoring
# Get client health status
health = client.health()
print(f"Connected: {health.connected}")
print(f"Events received: {health.events_received}")
print(f"Commands sent: {health.commands_sent}")
print(f"Avg latency: {health.avg_event_latency_ms:.2f}ms")
# Check connection
if client.is_connected:
print("Client is connected to NATS")
# Get channel info
for channel in client.channels:
print(f"{channel.domain}/{channel.channel}: {channel.events_received} events")
Lifecycle Events
The LifecycleEventPublisher helps coordinate service lifecycle across your microservices architecture. It publishes events when services start up, shut down, connect, or disconnect, and enables group-wide restart coordination.
Basic Usage
from kryten import KrytenClient, LifecycleEventPublisher
async def main():
config = {
"nats": {"servers": ["nats://localhost:4222"]},
"channels": [{"domain": "cytu.be", "channel": "lounge"}]
}
async with KrytenClient(config) as client:
# Create lifecycle publisher using client's NATS connection
lifecycle = LifecycleEventPublisher(
nats_client=client._nats,
service_name="my-bot",
service_version="1.0.0"
)
# Start lifecycle management
await lifecycle.start()
try:
# Publish startup event
await lifecycle.publish_startup()
# Your service logic here...
await client.run()
finally:
# Publish shutdown and stop
await lifecycle.publish_shutdown()
await lifecycle.stop()
Lifecycle Events
All lifecycle events include automatic metadata (hostname, timestamp, uptime):
# Startup - service is starting
await lifecycle.publish_startup(additional={"config_version": "2.1"})
# Subject: kryten.lifecycle.my-bot.startup
# Shutdown - service is stopping
await lifecycle.publish_shutdown(reason="Planned maintenance")
# Subject: kryten.lifecycle.my-bot.shutdown
# Connected - connection established
await lifecycle.publish_connected(target="NATS cluster")
# Subject: kryten.lifecycle.my-bot.connected
# Disconnected - connection lost
await lifecycle.publish_disconnected(reason="Network error")
# Subject: kryten.lifecycle.my-bot.disconnected
Group Restart Coordination
Coordinate graceful restarts across multiple service instances:
# Register callback for restart notices
async def handle_restart(restart_data: dict):
print(f"Restart requested by {restart_data['service_name']}")
print(f"Reason: {restart_data.get('reason', 'None')}")
# Perform graceful shutdown
await save_state()
await cleanup()
# Exit for process manager to restart
sys.exit(0)
lifecycle.on_restart_notice(handle_restart)
# Request group-wide restart
await lifecycle.publish_group_restart(
reason="Configuration updated",
delay_seconds=30
)
# Subject: kryten.lifecycle.group.restart
# Handle restart requests from other services
client.on_group_restart(handle_restart)
Monitoring Lifecycle Events
Other services can subscribe to lifecycle events:
import json
from kryten import KrytenClient
async def monitor_services():
config = {
"nats": {"servers": ["nats://localhost:4222"]},
"channels": [{"domain": "cytu.be", "channel": "lounge"}]
}
async with KrytenClient(config) as client:
async def lifecycle_handler(msg):
event = json.loads(msg.data.decode())
service = event["service_name"]
event_type = event["event_type"]
print(f"{service}: {event_type}")
print(f" Uptime: {event.get('uptime_seconds', 0)}s")
print(f" Hostname: {event['hostname']}")
# Subscribe to all lifecycle events using client's NATS connection
await client._nats.subscribe("kryten.lifecycle.>", cb=lifecycle_handler)
await client.run()
KeyValue Store
The KV store helpers provide a simple interface to NATS JetStream KeyValue stores for state persistence and sharing data between services.
Basic Operations
from kryten import KrytenClient
async def main():
config = {
"nats": {"servers": ["nats://localhost:4222"]},
"channels": [{"domain": "cytu.be", "channel": "lounge"}]
}
async with KrytenClient(config) as client:
# Store simple values
await client.kv_put("my-service-state", "counter", 42, as_json=True)
await client.kv_put("my-service-state", "status", "running")
# Retrieve values
counter = await client.kv_get("my-service-state", "counter", default=0, parse_json=True) # Returns 42
status_bytes = await client.kv_get("my-service-state", "status", default=b"unknown")
status = status_bytes.decode() if isinstance(status_bytes, bytes) else status_bytes # "running"
# Delete values
await client.kv_delete("my-service-state", "counter")
# Your bot logic here...
await client.run()
JSON Serialization
Automatically serialize/deserialize complex data:
# Store complex objects
user_data = {
"username": "alice",
"rank": 3,
"joined": "2024-01-15T10:00:00Z",
"badges": ["verified", "moderator"]
}
await client.kv_put("my-service-state", "user:alice", user_data, as_json=True)
# Retrieve and parse JSON
user = await client.kv_get("my-service-state", "user:alice", parse_json=True, default={})
print(user["username"]) # "alice"
print(user["badges"]) # ["verified", "moderator"]
Bulk Operations
# List all keys
all_keys = await client.kv_keys("my-service-state")
print(f"Found {len(all_keys)} keys")
# Get all key-value pairs
all_data = await client.kv_get_all("my-service-state", parse_json=True)
for key, value in all_data.items():
print(f"{key}: {value}")
Practical Example: State Persistence
from kryten import KrytenClient, LifecycleEventPublisher
async def main():
config = {
"nats": {"servers": ["nats://localhost:4222"]},
"channels": [{"domain": "cytu.be", "channel": "lounge"}]
}
async with KrytenClient(config) as client:
# Create lifecycle publisher
lifecycle = LifecycleEventPublisher(
nats_client=client._nats,
service_name="echo-bot",
service_version="1.0.0"
)
# Load state from KV store
message_count = await client.kv_get("bot-state", "message_count", default=0, parse_json=True)
await lifecycle.start()
await lifecycle.publish_startup()
@client.on("chatmsg")
async def handle_chat(event):
nonlocal message_count
message_count += 1
# Persist state every 10 messages
if message_count % 10 == 0:
await client.kv_put("bot-state", "message_count", message_count, as_json=True)
try:
await client.run()
finally:
# Save final state
await client.kv_put("bot-state", "message_count", message_count, as_json=True)
await lifecycle.publish_shutdown()
await lifecycle.stop()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Testing
Using MockKrytenClient
import pytest
from kryten import MockKrytenClient, ChatMessageEvent
@pytest.mark.asyncio
async def test_bot_responds_to_ping():
"""Test bot responds to !ping command."""
config = {
"nats": {"servers": ["nats://localhost:4222"]},
"channels": [{"domain": "test.local", "channel": "test"}]
}
client = MockKrytenClient(config)
@client.on("chatmsg")
async def handle_command(event: ChatMessageEvent):
if event.message == "!ping":
await client.send_chat(event.channel, "Pong!")
async with client:
# Simulate user sending !ping
await client.simulate_event("chatmsg", {
"username": "alice",
"message": "!ping",
"timestamp": "2024-01-15T10:00:00Z",
"rank": 1
})
# Verify bot responded
commands = client.get_published_commands()
assert len(commands) == 1
assert commands[0]["data"]["message"] == "Pong!"
Running Tests
# Run all tests
pytest
# Run with coverage
pytest --cov=kryten --cov-report=html
# Run specific test file
pytest tests/test_mock.py
# Run tests in parallel
pytest -n auto
Examples
See the examples/ directory for complete examples:
echo_bot.py- Simple echo botdj_bot.py- Automated DJ with playlist managementmoderator_bot.py- Chat moderation toolanalytics.py- Event tracking and statisticsmulti_channel.py- Bot operating on multiple channelslifecycle_and_kv_example.py- Lifecycle events and KeyValue store integration
Architecture
Your Bot/Service
↓
kryten-py (this library)
↓
NATS Message Bus
↓
Kryten Bridge
↓
CyTube Server
- kryten-py abstracts NATS complexity and provides high-level API
- NATS handles pub/sub messaging between services
- Kryten Bridge translates between NATS and CyTube Socket.IO
- CyTube manages channels, users, and media streaming
Configuration
NATS Settings
{
"nats": {
"servers": ["nats://localhost:4222"], # NATS server URLs
"user": "username", # Optional authentication
"password": "password",
"token": "auth_token", # Alternative to user/pass
"connect_timeout": 10, # Connection timeout (seconds)
"reconnect_time_wait": 2, # Reconnect delay (seconds)
"max_reconnect_attempts": -1, # -1 = infinite retries
"ping_interval": 120, # Keepalive ping interval
}
}
Channel Settings
{
"channels": [
{"domain": "cytu.be", "channel": "lounge"},
{"domain": "cytu.be", "channel": "movies"},
{"domain": "test.cytube.local", "channel": "testing"}
]
}
Client Settings
{
"retry_attempts": 3, # Command retry attempts
"retry_delay": 1.0, # Initial retry delay (seconds)
"handler_timeout": 30.0, # Max handler execution time (seconds)
"max_concurrent_handlers": 1000, # Max concurrent handlers
"log_level": "INFO" # Logging level
}
API Reference
Note: For a complete API reference including all methods, parameters, and exceptions, see LIBRARY_REFERENCE.md.
KrytenClient
Main client class for interacting with CyTube via NATS.
Methods:
connect()- Establish NATS connectiondisconnect()- Close connection gracefullyon(event_name, channel=None, domain=None)- Register event handler (decorator)run()- Start event processing loopstop()- Stop event processing loophealth()- Get health status and metricssend_chat(channel, message, domain=None)- Send chat messagesend_pm(channel, username, message, domain=None)- Send private messageadd_media(channel, media_type, media_id, position="end", domain=None)- Add media to playlistdelete_media(channel, uid, domain=None)- Delete media from playlistmove_media(channel, uid, position, domain=None)- Move media in playlistjump_to(channel, uid, domain=None)- Jump to media in playlistclear_playlist(channel, domain=None)- Clear entire playlistshuffle_playlist(channel, domain=None)- Shuffle playlistset_temp(channel, uid, is_temp=True, domain=None)- Set temporary flag on mediapause(channel, domain=None)- Pause playbackplay(channel, domain=None)- Resume playbackseek(channel, time_seconds, domain=None)- Seek to timekick_user(channel, username, reason=None, domain=None)- Kick userban_user(channel, username, reason=None, domain=None)- Ban usermute_user(channel, username, domain=None)- Mute user from chattingshadow_mute_user(channel, username, domain=None)- Shadow mute user (only mods see messages)unmute_user(channel, username, domain=None)- Remove mute/shadow mutevoteskip(channel, domain=None)- Vote to skip mediaassign_leader(channel, username, domain=None)- Give/remove leader statusplay_next(channel, domain=None)- Skip to next video immediately
Properties:
is_connected- Check if connected to NATSchannels- Get list of configured channels
Event Models
RawEvent- Raw CyTube event with metadataChatMessageEvent- Chat message eventUserJoinEvent- User joined channelUserLeaveEvent- User left channelChangeMediaEvent- Media changed in playlistPlaylistUpdateEvent- Playlist modified
Exceptions
KrytenError- Base exceptionKrytenConnectionError- Connection failed or lostKrytenValidationError- Invalid configuration or dataKrytenTimeoutError- Operation timed outPublishError- Failed to publish commandHandlerError- Event handler raised exception
Requirements
- Python 3.11+
- NATS server (local or remote)
- Kryten bridge (deployed and configured)
- CyTube server (accessible from Kryten bridge)
Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes with tests
- Run tests and linting (
pytest && ruff check) - Commit your changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Documentation: Read the Docs
Acknowledgments
Related Projects
- Kryten Bridge - CyTube to NATS gateway
- CyTube - Synchronized media streaming platform
- nats-py - Python NATS client
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kryten_py-0.11.6.tar.gz.
File metadata
- Download URL: kryten_py-0.11.6.tar.gz
- Upload date:
- Size: 209.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
11da8c02ca80f000a689ba08486300279ddeb240b8eaef0a487d611393d80a26
|
|
| MD5 |
64b3a720bee12ee34dbc127491ee5104
|
|
| BLAKE2b-256 |
bd9c9fbf77f10a8ee77e0fa947e28c94600270d51c235b83be2bcacdf05a90fc
|
Provenance
The following attestation bundles were made for kryten_py-0.11.6.tar.gz:
Publisher:
python-publish.yml on grobertson/kryten-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kryten_py-0.11.6.tar.gz -
Subject digest:
11da8c02ca80f000a689ba08486300279ddeb240b8eaef0a487d611393d80a26 - Sigstore transparency entry: 785344636
- Sigstore integration time:
-
Permalink:
grobertson/kryten-py@2331b1f571e24f7f38c265961c4288a6fdb13bb6 -
Branch / Tag:
refs/tags/v0.4.4 - Owner: https://github.com/grobertson
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@2331b1f571e24f7f38c265961c4288a6fdb13bb6 -
Trigger Event:
push
-
Statement type:
File details
Details for the file kryten_py-0.11.6-py3-none-any.whl.
File metadata
- Download URL: kryten_py-0.11.6-py3-none-any.whl
- Upload date:
- Size: 51.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f26a809b4dc8687631b42393f928d6131d1b12c2c588d4d9242dd811ea69e2a
|
|
| MD5 |
174400584119a79e04c4a31cf7d0f913
|
|
| BLAKE2b-256 |
7750824ade81d9f45c09bba0d8c3baca6e6bf912a1307479febd0bbbcfbfe8cb
|
Provenance
The following attestation bundles were made for kryten_py-0.11.6-py3-none-any.whl:
Publisher:
python-publish.yml on grobertson/kryten-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kryten_py-0.11.6-py3-none-any.whl -
Subject digest:
7f26a809b4dc8687631b42393f928d6131d1b12c2c588d4d9242dd811ea69e2a - Sigstore transparency entry: 785344661
- Sigstore integration time:
-
Permalink:
grobertson/kryten-py@2331b1f571e24f7f38c265961c4288a6fdb13bb6 -
Branch / Tag:
refs/tags/v0.4.4 - Owner: https://github.com/grobertson
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@2331b1f571e24f7f38c265961c4288a6fdb13bb6 -
Trigger Event:
push
-
Statement type: