Skip to main content

Python library for building CyTube microservices via Kryten bridge and NATS

Project description

kryten-py

Python library for building CyTube microservices via Kryten bridge and NATS.

Python Version License

Overview

kryten-py provides a high-level, type-safe API for interacting with CyTube servers through the Kryten bridge and NATS message bus. It enables rapid development of microservices like:

  • Chat bots - Automated chat responses and commands
  • DJ bots - Playlist management and automation
  • Moderation tools - User management and content filtering
  • Analytics systems - Event tracking and statistics
  • Integration services - Connect CyTube to external platforms

Features

  • Async-first API - Built on asyncio for high performance
  • Type safety - Full typing with Pydantic models
  • Decorator-based handlers - Intuitive event subscription
  • Automatic reconnection - Resilient to network failures
  • 18+ command methods - Complete CyTube control
  • Health monitoring - Built-in metrics and status
  • Lifecycle events - Service startup, shutdown, and coordination
  • KeyValue store - State persistence with NATS JetStream
  • Easy testing - Mock client for unit tests
  • Comprehensive docs - Examples and API reference

Installation

Basic Installation

pip install kryten-py

With Optional Dependencies

# For YAML configuration support
pip install kryten-py[yaml]

# For environment variable loading
pip install kryten-py[dotenv]

# Install all extras
pip install kryten-py[all]

Development Installation

# Clone repository
git clone https://github.com/yourusername/kryten-py.git
cd kryten-py

# Install with Poetry
uv sync

# Or with pip in editable mode
pip install -e ".[all]"

Documentation

Quick Start

Simple Echo Bot

import asyncio
from kryten import KrytenClient, ChatMessageEvent


async def main():
    """Simple echo bot that repeats user messages."""
    
    config = {
        "nats": {
            "servers": ["nats://localhost:4222"],
            "user": "kryten",
            "password": "secret"
        },
        "channels": [
            {"domain": "cytu.be", "channel": "lounge"}
        ]
    }
    
    async with KrytenClient(config) as client:
        # Listen for events (1-to-Many broadcast)
        @client.on("chatmsg")
        async def on_chat(event: ChatMessageEvent):
            """Echo user messages."""
            print(f"Chat: {event.username}: {event.message}")
            
            # Send commands (1-to-1 direct)
            if event.message.startswith("!ping"):
                await client.send_command(
                    service="robot",
                    type="say",
                    body=f"Pong! {event.username}"
                )

        await client.run()
        if event.username != "MyBot":  # Don't echo ourselves
            await client.send_chat(
                event.channel,
                f"{event.username} said: {event.message}"
            )
    
    print("Echo bot started!")
    await client.run()  # Run until Ctrl+C

if name == "main": asyncio.run(main())


### Configuration from File

**config.json:**
```json
{
  "nats": {
    "servers": ["nats://localhost:4222"],
    "user": "${NATS_USER}",
    "password": "${NATS_PASSWORD}"
  },
  "channels": [
    {"domain": "cytu.be", "channel": "lounge"}
  ]
}

bot.py:

from kryten import KrytenClient, KrytenConfig

# Load configuration with environment variable substitution
config = KrytenConfig.from_json("config.json")

async with KrytenClient(config) as client:
    # ... your bot logic
    await client.run()

Core Concepts

Event Subscription

Use decorators to register event handlers:

@client.on("chatmsg")
async def handle_chat(event: ChatMessageEvent):
    """Handle all chat messages."""
    print(f"{event.username}: {event.message}")

@client.on("chatmsg", channel="lounge")
async def handle_lounge_only(event: ChatMessageEvent):
    """Handle chat only from lounge channel."""
    if "!ping" in event.message:
        await client.send_chat(event.channel, "Pong!")

Command Publishing

Send commands to CyTube:

# Chat
await client.send_chat("lounge", "Hello world!")
await client.send_pm("lounge", "alice", "Private message")

# Playlist management
await client.add_media("lounge", "yt", "dQw4w9WgXcQ")
await client.delete_media("lounge", uid=42)
await client.shuffle_playlist("lounge")

# Playback control
await client.pause("lounge")
await client.play("lounge")
await client.seek("lounge", 30.0)

# Moderation
await client.kick_user("lounge", "spammer", reason="Spam")
await client.ban_user("lounge", "troll")
await client.mute_user("lounge", "noisy_user")
await client.shadow_mute_user("lounge", "subtle_troll")
await client.unmute_user("lounge", "reformed_user")

# Advanced moderation
await client.assign_leader("lounge", "trusted_dj")
await client.play_next("lounge")  # Skip to next video

Health Monitoring

# Get client health status
health = client.health()

print(f"Connected: {health.connected}")
print(f"Events received: {health.events_received}")
print(f"Commands sent: {health.commands_sent}")
print(f"Avg latency: {health.avg_event_latency_ms:.2f}ms")

# Check connection
if client.is_connected:
    print("Client is connected to NATS")

# Get channel info
for channel in client.channels:
    print(f"{channel.domain}/{channel.channel}: {channel.events_received} events")

Lifecycle Events

The LifecycleEventPublisher helps coordinate service lifecycle across your microservices architecture. It publishes events when services start up, shut down, connect, or disconnect, and enables group-wide restart coordination.

Basic Usage

from kryten import KrytenClient, LifecycleEventPublisher

async def main():
    config = {
        "nats": {"servers": ["nats://localhost:4222"]},
        "channels": [{"domain": "cytu.be", "channel": "lounge"}]
    }
    
    async with KrytenClient(config) as client:
        # Create lifecycle publisher using client's NATS connection
        lifecycle = LifecycleEventPublisher(
            nats_client=client._nats,
            service_name="my-bot",
            service_version="1.0.0"
        )
        
        # Start lifecycle management
        await lifecycle.start()
        
        try:
            # Publish startup event
            await lifecycle.publish_startup()
            
            # Your service logic here...
            await client.run()
            
        finally:
            # Publish shutdown and stop
            await lifecycle.publish_shutdown()
            await lifecycle.stop()

Lifecycle Events

All lifecycle events include automatic metadata (hostname, timestamp, uptime):

# Startup - service is starting
await lifecycle.publish_startup(additional={"config_version": "2.1"})
# Subject: kryten.lifecycle.my-bot.startup

# Shutdown - service is stopping
await lifecycle.publish_shutdown(reason="Planned maintenance")
# Subject: kryten.lifecycle.my-bot.shutdown

# Connected - connection established
await lifecycle.publish_connected(target="NATS cluster")
# Subject: kryten.lifecycle.my-bot.connected

# Disconnected - connection lost
await lifecycle.publish_disconnected(reason="Network error")
# Subject: kryten.lifecycle.my-bot.disconnected

Group Restart Coordination

Coordinate graceful restarts across multiple service instances:

# Register callback for restart notices
async def handle_restart(restart_data: dict):
    print(f"Restart requested by {restart_data['service_name']}")
    print(f"Reason: {restart_data.get('reason', 'None')}")
    
    # Perform graceful shutdown
    await save_state()
    await cleanup()
    
    # Exit for process manager to restart
    sys.exit(0)

lifecycle.on_restart_notice(handle_restart)

# Request group-wide restart
await lifecycle.publish_group_restart(
    reason="Configuration updated",
    delay_seconds=30
)
# Subject: kryten.lifecycle.group.restart

# Handle restart requests from other services
client.on_group_restart(handle_restart)

Monitoring Lifecycle Events

Other services can subscribe to lifecycle events:

import json
from kryten import KrytenClient

async def monitor_services():
    config = {
        "nats": {"servers": ["nats://localhost:4222"]},
        "channels": [{"domain": "cytu.be", "channel": "lounge"}]
    }
    
    async with KrytenClient(config) as client:
        async def lifecycle_handler(msg):
            event = json.loads(msg.data.decode())
            service = event["service_name"]
            event_type = event["event_type"]
            
            print(f"{service}: {event_type}")
            print(f"  Uptime: {event.get('uptime_seconds', 0)}s")
            print(f"  Hostname: {event['hostname']}")
        
        # Subscribe to all lifecycle events using client's NATS connection
        await client._nats.subscribe("kryten.lifecycle.>", cb=lifecycle_handler)
        await client.run()

KeyValue Store

The KV store helpers provide a simple interface to NATS JetStream KeyValue stores for state persistence and sharing data between services.

Basic Operations

from kryten import KrytenClient

async def main():
    config = {
        "nats": {"servers": ["nats://localhost:4222"]},
        "channels": [{"domain": "cytu.be", "channel": "lounge"}]
    }
    
    async with KrytenClient(config) as client:
        # Store simple values
        await client.kv_put("my-service-state", "counter", 42, as_json=True)
        await client.kv_put("my-service-state", "status", "running")
        
        # Retrieve values
        counter = await client.kv_get("my-service-state", "counter", default=0, parse_json=True)  # Returns 42
        status_bytes = await client.kv_get("my-service-state", "status", default=b"unknown")
        status = status_bytes.decode() if isinstance(status_bytes, bytes) else status_bytes  # "running"
        
        # Delete values
        await client.kv_delete("my-service-state", "counter")
        
        # Your bot logic here...
        await client.run()

JSON Serialization

Automatically serialize/deserialize complex data:

# Store complex objects
user_data = {
    "username": "alice",
    "rank": 3,
    "joined": "2024-01-15T10:00:00Z",
    "badges": ["verified", "moderator"]
}

await client.kv_put("my-service-state", "user:alice", user_data, as_json=True)

# Retrieve and parse JSON
user = await client.kv_get("my-service-state", "user:alice", parse_json=True, default={})
print(user["username"])  # "alice"
print(user["badges"])    # ["verified", "moderator"]

Bulk Operations

# List all keys
all_keys = await client.kv_keys("my-service-state")
print(f"Found {len(all_keys)} keys")

# Get all key-value pairs
all_data = await client.kv_get_all("my-service-state", parse_json=True)
for key, value in all_data.items():
    print(f"{key}: {value}")

Practical Example: State Persistence

from kryten import KrytenClient, LifecycleEventPublisher

async def main():
    config = {
        "nats": {"servers": ["nats://localhost:4222"]},
        "channels": [{"domain": "cytu.be", "channel": "lounge"}]
    }
    
    async with KrytenClient(config) as client:
        # Create lifecycle publisher
        lifecycle = LifecycleEventPublisher(
            nats_client=client._nats,
            service_name="echo-bot",
            service_version="1.0.0"
        )
        
        # Load state from KV store
        message_count = await client.kv_get("bot-state", "message_count", default=0, parse_json=True)
        
        await lifecycle.start()
        await lifecycle.publish_startup()
        
        @client.on("chatmsg")
        async def handle_chat(event):
            nonlocal message_count
            message_count += 1
            
            # Persist state every 10 messages
            if message_count % 10 == 0:
                await client.kv_put("bot-state", "message_count", message_count, as_json=True)
        
        try:
            await client.run()
        finally:
            # Save final state
            await client.kv_put("bot-state", "message_count", message_count, as_json=True)
            await lifecycle.publish_shutdown()
            await lifecycle.stop()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Testing

Using MockKrytenClient

import pytest
from kryten import MockKrytenClient, ChatMessageEvent


@pytest.mark.asyncio
async def test_bot_responds_to_ping():
    """Test bot responds to !ping command."""
    
    config = {
        "nats": {"servers": ["nats://localhost:4222"]},
        "channels": [{"domain": "test.local", "channel": "test"}]
    }
    
    client = MockKrytenClient(config)
    
    @client.on("chatmsg")
    async def handle_command(event: ChatMessageEvent):
        if event.message == "!ping":
            await client.send_chat(event.channel, "Pong!")
    
    async with client:
        # Simulate user sending !ping
        await client.simulate_event("chatmsg", {
            "username": "alice",
            "message": "!ping",
            "timestamp": "2024-01-15T10:00:00Z",
            "rank": 1
        })
        
        # Verify bot responded
        commands = client.get_published_commands()
        assert len(commands) == 1
        assert commands[0]["data"]["message"] == "Pong!"

Running Tests

# Run all tests
pytest

# Run with coverage
pytest --cov=kryten --cov-report=html

# Run specific test file
pytest tests/test_mock.py

# Run tests in parallel
pytest -n auto

Examples

See the examples/ directory for complete examples:

  • echo_bot.py - Simple echo bot
  • dj_bot.py - Automated DJ with playlist management
  • moderator_bot.py - Chat moderation tool
  • analytics.py - Event tracking and statistics
  • multi_channel.py - Bot operating on multiple channels
  • lifecycle_and_kv_example.py - Lifecycle events and KeyValue store integration

Architecture

Your Bot/Service
       ↓
   kryten-py (this library)
       ↓
     NATS Message Bus
       ↓
   Kryten Bridge
       ↓
   CyTube Server
  • kryten-py abstracts NATS complexity and provides high-level API
  • NATS handles pub/sub messaging between services
  • Kryten Bridge translates between NATS and CyTube Socket.IO
  • CyTube manages channels, users, and media streaming

Configuration

NATS Settings

{
  "nats": {
    "servers": ["nats://localhost:4222"],  # NATS server URLs
    "user": "username",                     # Optional authentication
    "password": "password",
    "token": "auth_token",                  # Alternative to user/pass
    "connect_timeout": 10,                  # Connection timeout (seconds)
    "reconnect_time_wait": 2,               # Reconnect delay (seconds)
    "max_reconnect_attempts": -1,           # -1 = infinite retries
    "ping_interval": 120,                   # Keepalive ping interval
  }
}

Channel Settings

{
  "channels": [
    {"domain": "cytu.be", "channel": "lounge"},
    {"domain": "cytu.be", "channel": "movies"},
    {"domain": "test.cytube.local", "channel": "testing"}
  ]
}

Client Settings

{
  "retry_attempts": 3,           # Command retry attempts
  "retry_delay": 1.0,            # Initial retry delay (seconds)
  "handler_timeout": 30.0,       # Max handler execution time (seconds)
  "max_concurrent_handlers": 1000,  # Max concurrent handlers
  "log_level": "INFO"            # Logging level
}

API Reference

Note: For a complete API reference including all methods, parameters, and exceptions, see LIBRARY_REFERENCE.md.

KrytenClient

Main client class for interacting with CyTube via NATS.

Methods:

  • connect() - Establish NATS connection
  • disconnect() - Close connection gracefully
  • on(event_name, channel=None, domain=None) - Register event handler (decorator)
  • run() - Start event processing loop
  • stop() - Stop event processing loop
  • health() - Get health status and metrics
  • send_chat(channel, message, domain=None) - Send chat message
  • send_pm(channel, username, message, domain=None) - Send private message
  • add_media(channel, media_type, media_id, position="end", domain=None) - Add media to playlist
  • delete_media(channel, uid, domain=None) - Delete media from playlist
  • move_media(channel, uid, position, domain=None) - Move media in playlist
  • jump_to(channel, uid, domain=None) - Jump to media in playlist
  • clear_playlist(channel, domain=None) - Clear entire playlist
  • shuffle_playlist(channel, domain=None) - Shuffle playlist
  • set_temp(channel, uid, is_temp=True, domain=None) - Set temporary flag on media
  • pause(channel, domain=None) - Pause playback
  • play(channel, domain=None) - Resume playback
  • seek(channel, time_seconds, domain=None) - Seek to time
  • kick_user(channel, username, reason=None, domain=None) - Kick user
  • ban_user(channel, username, reason=None, domain=None) - Ban user
  • mute_user(channel, username, domain=None) - Mute user from chatting
  • shadow_mute_user(channel, username, domain=None) - Shadow mute user (only mods see messages)
  • unmute_user(channel, username, domain=None) - Remove mute/shadow mute
  • voteskip(channel, domain=None) - Vote to skip media
  • assign_leader(channel, username, domain=None) - Give/remove leader status
  • play_next(channel, domain=None) - Skip to next video immediately

Properties:

  • is_connected - Check if connected to NATS
  • channels - Get list of configured channels

Event Models

  • RawEvent - Raw CyTube event with metadata
  • ChatMessageEvent - Chat message event
  • UserJoinEvent - User joined channel
  • UserLeaveEvent - User left channel
  • ChangeMediaEvent - Media changed in playlist
  • PlaylistUpdateEvent - Playlist modified

Exceptions

  • KrytenError - Base exception
  • KrytenConnectionError - Connection failed or lost
  • KrytenValidationError - Invalid configuration or data
  • KrytenTimeoutError - Operation timed out
  • PublishError - Failed to publish command
  • HandlerError - Event handler raised exception

Requirements

  • Python 3.11+
  • NATS server (local or remote)
  • Kryten bridge (deployed and configured)
  • CyTube server (accessible from Kryten bridge)

Contributing

Contributions are welcome! Please:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes with tests
  4. Run tests and linting (pytest && ruff check)
  5. Commit your changes (git commit -m 'Add amazing feature')
  6. Push to branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Acknowledgments

  • Built for the CyTube platform
  • Uses NATS for messaging
  • Powered by Pydantic for data validation

Related Projects

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kryten_py-0.11.6.tar.gz (209.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kryten_py-0.11.6-py3-none-any.whl (51.2 kB view details)

Uploaded Python 3

File details

Details for the file kryten_py-0.11.6.tar.gz.

File metadata

  • Download URL: kryten_py-0.11.6.tar.gz
  • Upload date:
  • Size: 209.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kryten_py-0.11.6.tar.gz
Algorithm Hash digest
SHA256 11da8c02ca80f000a689ba08486300279ddeb240b8eaef0a487d611393d80a26
MD5 64b3a720bee12ee34dbc127491ee5104
BLAKE2b-256 bd9c9fbf77f10a8ee77e0fa947e28c94600270d51c235b83be2bcacdf05a90fc

See more details on using hashes here.

Provenance

The following attestation bundles were made for kryten_py-0.11.6.tar.gz:

Publisher: python-publish.yml on grobertson/kryten-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kryten_py-0.11.6-py3-none-any.whl.

File metadata

  • Download URL: kryten_py-0.11.6-py3-none-any.whl
  • Upload date:
  • Size: 51.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kryten_py-0.11.6-py3-none-any.whl
Algorithm Hash digest
SHA256 7f26a809b4dc8687631b42393f928d6131d1b12c2c588d4d9242dd811ea69e2a
MD5 174400584119a79e04c4a31cf7d0f913
BLAKE2b-256 7750824ade81d9f45c09bba0d8c3baca6e6bf912a1307479febd0bbbcfbfe8cb

See more details on using hashes here.

Provenance

The following attestation bundles were made for kryten_py-0.11.6-py3-none-any.whl:

Publisher: python-publish.yml on grobertson/kryten-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page