Skip to main content

RPC Client for camera.ui

Project description

RPC Library for Python

A high-performance, type-safe RPC library built on NATS messaging system for Python applications.

Features

  • 🚀 High Performance: Achieves 300-1500+ MB/s throughput with sub-millisecond latency
  • 🔒 Type Safety: Full type annotations and runtime type checking
  • 🌊 Streaming: Async generators with push/pull patterns
  • 🔄 Auto-reconnection: Resilient connection management
  • 📦 Auto-chunking: Transparent handling of large payloads
  • 🎯 Service Discovery: Automatic service registration and discovery
  • ⚖️ Load Balancing: Built-in queue-based load distribution
  • 🔀 Channels: Bidirectional real-time communication
  • 🎨 Decorators: Clean API with Python decorators
  • 🤝 Cross-language: Full compatibility with TypeScript implementation

Installation

pip install camera-ui-rpc

Quick Start

Basic RPC Handlers

For simple RPC endpoints without service discovery:

from camera_ui_rpc import create_rpc_client, RPCClass
import asyncio

# Define your handlers
@RPCClass
class MathHandlers:
    async def add(self, a: float, b: float) -> float:
        return a + b
    
    async def multiply(self, a: float, b: float) -> float:
        return a * b
    
    # Streaming method
    async def fibonacci(self, n: int):
        a, b = 0, 1
        for _ in range(n):
            yield a
            a, b = b, a + b

async def main():
    # Server: Register RPC handlers
    server = create_rpc_client({
        'servers': ['nats://localhost:4222'],
        'name': 'math-server'
    })
    
    await server.connect()
    
    # Register handlers under a namespace
    await server.register_handler('math', MathHandlers())
    
    print('Math RPC handlers registered')
    await asyncio.Event().wait()

asyncio.run(main())
# Client: Use RPC proxy
async def main():
    client = create_rpc_client({
        'servers': ['nats://localhost:4222'],
        'name': 'math-client'
    })
    
    await client.connect()
    
    # Create a typed proxy (no service discovery)
    math = client.create_proxy('math')
    
    # Call methods
    result = await math.add(5, 3)
    print(f'5 + 3 = {result}')  # 8
    
    # Use streaming
    async for num in math.fibonacci(10):
        print(f'Fibonacci: {num}')

asyncio.run(main())

NATS Micro Services

For production services with discovery, monitoring, and load balancing:

from camera_ui_rpc import ServiceConfig

# Server: Register as NATS micro service
async def main():
    server = create_rpc_client({
        'servers': ['nats://localhost:4222'],
        'name': 'math-service'
    })
    
    await server.connect()
    
    # Register as a proper NATS micro service
    await server.service.register_handler(
        ServiceConfig(
            name='math',
            version='1.0.0',
            description='Math operations service',
            queue_group='math-workers'  # For load balancing
        ),
        MathHandlers()
    )
    
    print('Math micro service is running')
    await asyncio.Event().wait()
# Client: Discover and use service
async def main():
    client = create_rpc_client({
        'servers': ['nats://localhost:4222'],
        'name': 'client'
    })
    
    await client.connect()
    
    # Discover service by name (with load balancing)
    math = await client.create_service_proxy('math')
    
    # Use the service
    result = await math.add(10, 20)

Core Concepts

RPC Client

The foundation of all operations:

from camera_ui_rpc import create_rpc_client

client = create_rpc_client({
    'servers': ['nats://localhost:4222'],
    'name': 'my-app',
    'auth': {'user': 'app', 'pass': 'secret'},
    'timeout': 5000,  # 5 second default timeout
    'reconnect': True,
    'maxReconnectAttempts': -1  # infinite
})

await client.connect()

RPC Handlers vs NATS Services

RPC Handlers (register_handler):

  • Simple namespace-based RPC endpoints
  • Direct addressing via namespace
  • No automatic discovery or load balancing
  • Lightweight for internal communication

NATS Micro Services (service.register_handler):

  • Full NATS micro service features
  • Service discovery via name
  • Automatic load balancing with queue groups
  • Monitoring and stats
  • Ideal for production microservices

Handler Organization

from camera_ui_rpc import RPCClass, RPCNested
from typing import Dict, List, Optional

@RPCClass
class UserHandlers:
    def __init__(self):
        self.users: Dict[str, User] = {}
    
    async def get_user(self, user_id: str) -> Optional[User]:
        return self.users.get(user_id)
    
    async def create_user(self, data: UserData) -> User:
        user = User(id=generate_id(), **data)
        self.users[user.id] = user
        return user
    
    # Nested object for organization
    @RPCNested
    class admin:
        def __init__(self, parent):
            self.parent = parent
        
        async def list_users(self) -> List[User]:
            return list(self.parent.users.values())
        
        async def delete_user(self, user_id: str) -> bool:
            return self.parent.users.pop(user_id, None) is not None

# Register as handlers or service
await client.register_handler('users', UserHandlers())  # Simple RPC
# OR
await client.service.register_handler(                  # NATS service
    ServiceConfig(
        name='users',
        version='1.0.0',
        queue_group='user-workers'
    ),
    UserHandlers()
)

Channels

Real-time bidirectional communication between multiple clients:

# Server/Client A - Join channel and listen
client_a = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'client-a'})
await client_a.connect()

chat_a = await client_a.channel('room:general')

def on_message_a(msg):
    print(f"[Client A received] {msg['user']}: {msg['text']}")

chat_a.on('message', on_message_a)

# Client B - Join same channel
client_b = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'client-b'})
await client_b.connect()

chat_b = await client_b.channel('room:general')

def on_message_b(msg):
    print(f"[Client B received] {msg['user']}: {msg['text']}")

chat_b.on('message', on_message_b)

# Send messages - all clients in channel receive them
await chat_a.send({'user': 'Alice', 'text': 'Hello everyone!'})
# Output on Client B: [Client B received] Alice: Hello everyone!

await chat_b.send({'user': 'Bob', 'text': 'Hi Alice!'})
# Output on Client A: [Client A received] Bob: Hi Alice!

# Request/Reply pattern in channels
# Client B handles requests
async def handle_info_request(msg):
    if msg.get('type') == 'get-info':
        return {'users': 2, 'topic': 'general chat'}

await chat_b.on_request(handle_info_request)

# Client A makes request
info = await chat_a.request({'type': 'get-info'})
print(f"Channel info: {info}")  # {'users': 2, 'topic': 'general chat'}

Private Channels

Direct one-to-one communication between specific clients:

# Client A (Alice) - Create private channel to Bob
alice_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'alice'})
await alice_client.connect()

# Both clients must use the same channel_id and specify the target client
alice_to_bob = await alice_client.private_channel('secret-chat', 'bob')

def on_alice_message(msg):
    print(f"[Alice received from Bob] {msg}")

alice_to_bob.on('message', on_alice_message)

# Client B (Bob) - Create private channel to Alice
bob_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'bob'})
await bob_client.connect()

# Bob must use the same channel_id ('secret-chat') to connect
bob_to_alice = await bob_client.private_channel('secret-chat', 'alice')

def on_bob_message(msg):
    print(f"[Bob received from Alice] {msg}")

bob_to_alice.on('message', on_bob_message)

# Exchange private messages
await alice_to_bob.send({'text': 'Hi Bob, this is private!', 'timestamp': '10:00'})
# Output: [Bob received from Alice] {'text': 'Hi Bob, this is private!', 'timestamp': '10:00'}

await bob_to_alice.send({'text': 'Hi Alice, got your message!', 'timestamp': '10:01'})
# Output: [Alice received from Bob] {'text': 'Hi Alice, got your message!', 'timestamp': '10:01'}

# Private channels are isolated - other clients cannot see these messages
charlie_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'charlie'})
await charlie_client.connect()
# Charlie cannot see Alice-Bob messages even if trying to listen

Advanced Features

Streaming

Two streaming patterns for different use cases:

# Push-based (server controls flow) - better performance
async def generate_data(self, count: int):
    # Method name includes "generate" for push-based iteration
    for i in range(count):
        yield {'index': i, 'data': b'x' * (1024 * 1024)}  # 1MB

# Pull-based (client controls flow) - better backpressure
async def pull_data(self, count: int):
    # Method name includes "pull" for pull-based iteration
    for i in range(count):
        yield {'index': i, 'data': await self.load_data(i)}

# Client usage
service = client.create_proxy('data')

async for item in service.generate_data(100):
    # Process as fast as server sends
    pass

async for item in service.pull_data(100):
    # Pull items at client's pace
    await process_item(item)

Error Handling

from camera_ui_rpc import RPCException, ErrorCode

service = client.create_proxy('myservice')

try:
    await service.some_method()
except RPCException as e:
    if e.code == ErrorCode.METHOD_NOT_FOUND:
        print('Method does not exist')
    elif e.code == ErrorCode.TIMEOUT:
        print('Request timed out')
    elif e.code == ErrorCode.CONNECTION_CLOSED:
        print('Connection lost')

Auto-chunking

Large payloads are automatically chunked:

service = client.create_proxy('data')

# Automatically chunks data > server limit
large_data = b'x' * (100 * 1024 * 1024)  # 100MB
await service.process_large_data(large_data)  # Works transparently!

Isolated Connections

Isolated connections provide separate NATS connections for specific operations, preventing blocking of the main connection. They're available for:

  • Proxies: create_proxy() and create_service_proxy()
  • Handlers: register_handler()
  • Channels: channel() and private_channel()
# 1. Isolated proxy (returns object with proxy and close method)
isolated = client.create_proxy('service', isolated_connection=True)
result = await isolated.proxy.heavy_computation(params)
await isolated.close()  # Close when done

# 2. Isolated service proxy
service_isolated = await client.create_service_proxy('service', 
    isolated_connection=True
)
await service_isolated.proxy.process_large_dataset()
await service_isolated.close()

# 3. Isolated handler (for CPU-intensive services)
cleanup = await client.register_handler('heavy-service', handlers, 
    isolated_connection=True
)
# Later: await cleanup() to unregister and close

# 4. Isolated channel
channel = await client.channel('data-stream', isolated_connection=True)

Important:

  • Isolated connections must be closed explicitly when no longer needed
  • Alternatively, closing the main client (client.disconnect()) will automatically close all isolated connections and channels
  • Use isolated connections for: CPU-intensive operations, high-throughput streaming, or operations that might block

Property Access

Expose class properties for remote access using descriptors:

from camera_ui_rpc import RPCClass, RPCProperty

@RPCClass
class ConfigService:
    # Use RPCProperty descriptor
    name = RPCProperty()
    version = RPCProperty()
    max_connections = RPCProperty()
    
    def __init__(self):
        self.name = 'Config Service'
        self.version = '1.0.0'
        self.max_connections = 100

# Client usage
config = client.create_proxy('config')

# Read properties
name = await config.name
version = await config.version

# Update properties via setter methods
await config.setName('Updated Service')
await config.setMaxConnections(200)

Configuration

Client Options

from camera_ui_rpc.types import RPCClientOptions

options: RPCClientOptions = {
    'servers': ['nats://localhost:4222'],  # NATS servers
    'name': 'my-client',                   # Client identifier
    'auth': {                              # Authentication
        'user': 'username',
        'pass': 'password',
        'token': 'auth_token',
        'jwt': 'jwt_token'
    },
    'tls': {                               # TLS configuration
        'cert': 'path/to/cert.pem',
        'key': 'path/to/key.pem',
        'ca': 'path/to/ca.pem'
    },
    'timeout': 5000,                       # Default timeout (ms)
    'reconnect': True,                     # Auto-reconnect
    'max_reconnect_attempts': -1,          # -1 for infinite
    'reconnect_time_wait': 2000,           # Reconnect delay (ms)
    'max_payload_size': 4194304            # Override server limit
}

Service Options

from camera_ui_rpc import ServiceConfig

service_config = ServiceConfig(
    name='my-service',                     # Service name
    version='1.0.0',                       # Semantic version
    description='My service',              # Service description
    queue_group='my-service-queue',        # Load balancing group
    metadata={                             # Custom metadata
        'region': 'us-east',
        'environment': 'production'
    }
)

# Register service
await server.service.register_handler(service_config, MyHandlers())

Cross-Language Compatibility

This library is fully compatible with the TypeScript/Node.js implementation:

  • Identical wire protocol and message format
  • Same feature set and API patterns
  • Seamless service interoperability
  • Shared channel communication

Example: Python calling TypeScript service

# TypeScript service running
math = await client.create_service_proxy('math-service')
result = await math.calculate(10, 20)  # Works seamlessly!

Examples

Check the examples/ directory for complete examples:

  • service.py - Basic service implementation
  • channel_communication.py - Channel messaging
  • streaming.py - Streaming patterns
  • multi_service.py - Multiple services interaction
  • large_data_transfer.py - Handling large payloads
  • And 17 more examples...

License

MIT


Part of the camera.ui ecosystem - A comprehensive camera management solution.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

camera_ui_rpc-1.0.1.tar.gz (38.2 kB view details)

Uploaded Source

File details

Details for the file camera_ui_rpc-1.0.1.tar.gz.

File metadata

  • Download URL: camera_ui_rpc-1.0.1.tar.gz
  • Upload date:
  • Size: 38.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for camera_ui_rpc-1.0.1.tar.gz
Algorithm Hash digest
SHA256 689dc939846de1c5ecee339129dd550de56fc43f2fdc31028c54d82f810436b9
MD5 505e75588c048445a3780d77cff5792f
BLAKE2b-256 1c5fc96bd3d1d4a8a3644ad09b507dd20676b351789f60c4d324f8246aa99730

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page