RPC Client for camera.ui
Project description
RPC Library for Python
A high-performance, type-safe RPC library built on NATS messaging system for Python applications.
Features
- 🚀 High Performance: Achieves 300-1500+ MB/s throughput with sub-millisecond latency
- 🔒 Type Safety: Full type annotations and runtime type checking
- 🌊 Streaming: Async generators with push/pull patterns
- 🔄 Auto-reconnection: Resilient connection management
- 📦 Auto-chunking: Transparent handling of large payloads
- 🎯 Service Discovery: Automatic service registration and discovery
- ⚖️ Load Balancing: Built-in queue-based load distribution
- 🔀 Channels: Bidirectional real-time communication
- 🎨 Decorators: Clean API with Python decorators
- 🤝 Cross-language: Full compatibility with TypeScript implementation
Installation
pip install camera-ui-rpc
Quick Start
Basic RPC Handlers
For simple RPC endpoints without service discovery:
from camera_ui_rpc import create_rpc_client, RPCClass
import asyncio
# Define your handlers
@RPCClass
class MathHandlers:
async def add(self, a: float, b: float) -> float:
return a + b
async def multiply(self, a: float, b: float) -> float:
return a * b
# Streaming method
async def fibonacci(self, n: int):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
async def main():
# Server: Register RPC handlers
server = create_rpc_client({
'servers': ['nats://localhost:4222'],
'name': 'math-server'
})
await server.connect()
# Register handlers under a namespace
await server.register_handler('math', MathHandlers())
print('Math RPC handlers registered')
await asyncio.Event().wait()
asyncio.run(main())
# Client: Use RPC proxy
async def main():
client = create_rpc_client({
'servers': ['nats://localhost:4222'],
'name': 'math-client'
})
await client.connect()
# Create a typed proxy (no service discovery)
math = client.create_proxy('math')
# Call methods
result = await math.add(5, 3)
print(f'5 + 3 = {result}') # 8
# Use streaming
async for num in math.fibonacci(10):
print(f'Fibonacci: {num}')
asyncio.run(main())
NATS Micro Services
For production services with discovery, monitoring, and load balancing:
from camera_ui_rpc import ServiceConfig
# Server: Register as NATS micro service
async def main():
server = create_rpc_client({
'servers': ['nats://localhost:4222'],
'name': 'math-service'
})
await server.connect()
# Register as a proper NATS micro service
await server.service.register_handler(
ServiceConfig(
name='math',
version='1.0.0',
description='Math operations service',
queue_group='math-workers' # For load balancing
),
MathHandlers()
)
print('Math micro service is running')
await asyncio.Event().wait()
# Client: Discover and use service
async def main():
client = create_rpc_client({
'servers': ['nats://localhost:4222'],
'name': 'client'
})
await client.connect()
# Discover service by name (with load balancing)
math = await client.create_service_proxy('math')
# Use the service
result = await math.add(10, 20)
Core Concepts
RPC Client
The foundation of all operations:
from camera_ui_rpc import create_rpc_client
client = create_rpc_client({
'servers': ['nats://localhost:4222'],
'name': 'my-app',
'auth': {'user': 'app', 'pass': 'secret'},
'timeout': 5000, # 5 second default timeout
'reconnect': True,
'maxReconnectAttempts': -1 # infinite
})
await client.connect()
RPC Handlers vs NATS Services
RPC Handlers (register_handler):
- Simple namespace-based RPC endpoints
- Direct addressing via namespace
- No automatic discovery or load balancing
- Lightweight for internal communication
NATS Micro Services (service.register_handler):
- Full NATS micro service features
- Service discovery via name
- Automatic load balancing with queue groups
- Monitoring and stats
- Ideal for production microservices
Handler Organization
from camera_ui_rpc import RPCClass, RPCNested
from typing import Dict, List, Optional
@RPCClass
class UserHandlers:
def __init__(self):
self.users: Dict[str, User] = {}
async def get_user(self, user_id: str) -> Optional[User]:
return self.users.get(user_id)
async def create_user(self, data: UserData) -> User:
user = User(id=generate_id(), **data)
self.users[user.id] = user
return user
# Nested object for organization
@RPCNested
class admin:
def __init__(self, parent):
self.parent = parent
async def list_users(self) -> List[User]:
return list(self.parent.users.values())
async def delete_user(self, user_id: str) -> bool:
return self.parent.users.pop(user_id, None) is not None
# Register as handlers or service
await client.register_handler('users', UserHandlers()) # Simple RPC
# OR
await client.service.register_handler( # NATS service
ServiceConfig(
name='users',
version='1.0.0',
queue_group='user-workers'
),
UserHandlers()
)
Channels
Real-time bidirectional communication between multiple clients:
# Server/Client A - Join channel and listen
client_a = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'client-a'})
await client_a.connect()
chat_a = await client_a.channel('room:general')
def on_message_a(msg):
print(f"[Client A received] {msg['user']}: {msg['text']}")
chat_a.on('message', on_message_a)
# Client B - Join same channel
client_b = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'client-b'})
await client_b.connect()
chat_b = await client_b.channel('room:general')
def on_message_b(msg):
print(f"[Client B received] {msg['user']}: {msg['text']}")
chat_b.on('message', on_message_b)
# Send messages - all clients in channel receive them
await chat_a.send({'user': 'Alice', 'text': 'Hello everyone!'})
# Output on Client B: [Client B received] Alice: Hello everyone!
await chat_b.send({'user': 'Bob', 'text': 'Hi Alice!'})
# Output on Client A: [Client A received] Bob: Hi Alice!
# Request/Reply pattern in channels
# Client B handles requests
async def handle_info_request(msg):
if msg.get('type') == 'get-info':
return {'users': 2, 'topic': 'general chat'}
await chat_b.on_request(handle_info_request)
# Client A makes request
info = await chat_a.request({'type': 'get-info'})
print(f"Channel info: {info}") # {'users': 2, 'topic': 'general chat'}
Private Channels
Direct one-to-one communication between specific clients:
# Client A (Alice) - Create private channel to Bob
alice_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'alice'})
await alice_client.connect()
# Both clients must use the same channel_id and specify the target client
alice_to_bob = await alice_client.private_channel('secret-chat', 'bob')
def on_alice_message(msg):
print(f"[Alice received from Bob] {msg}")
alice_to_bob.on('message', on_alice_message)
# Client B (Bob) - Create private channel to Alice
bob_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'bob'})
await bob_client.connect()
# Bob must use the same channel_id ('secret-chat') to connect
bob_to_alice = await bob_client.private_channel('secret-chat', 'alice')
def on_bob_message(msg):
print(f"[Bob received from Alice] {msg}")
bob_to_alice.on('message', on_bob_message)
# Exchange private messages
await alice_to_bob.send({'text': 'Hi Bob, this is private!', 'timestamp': '10:00'})
# Output: [Bob received from Alice] {'text': 'Hi Bob, this is private!', 'timestamp': '10:00'}
await bob_to_alice.send({'text': 'Hi Alice, got your message!', 'timestamp': '10:01'})
# Output: [Alice received from Bob] {'text': 'Hi Alice, got your message!', 'timestamp': '10:01'}
# Private channels are isolated - other clients cannot see these messages
charlie_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'charlie'})
await charlie_client.connect()
# Charlie cannot see Alice-Bob messages even if trying to listen
Advanced Features
Streaming
Two streaming patterns for different use cases:
# Push-based (server controls flow) - better performance
async def generate_data(self, count: int):
# Method name includes "generate" for push-based iteration
for i in range(count):
yield {'index': i, 'data': b'x' * (1024 * 1024)} # 1MB
# Pull-based (client controls flow) - better backpressure
async def pull_data(self, count: int):
# Method name includes "pull" for pull-based iteration
for i in range(count):
yield {'index': i, 'data': await self.load_data(i)}
# Client usage
service = client.create_proxy('data')
async for item in service.generate_data(100):
# Process as fast as server sends
pass
async for item in service.pull_data(100):
# Pull items at client's pace
await process_item(item)
Error Handling
from camera_ui_rpc import RPCException, ErrorCode
service = client.create_proxy('myservice')
try:
await service.some_method()
except RPCException as e:
if e.code == ErrorCode.METHOD_NOT_FOUND:
print('Method does not exist')
elif e.code == ErrorCode.TIMEOUT:
print('Request timed out')
elif e.code == ErrorCode.CONNECTION_CLOSED:
print('Connection lost')
Auto-chunking
Large payloads are automatically chunked:
service = client.create_proxy('data')
# Automatically chunks data > server limit
large_data = b'x' * (100 * 1024 * 1024) # 100MB
await service.process_large_data(large_data) # Works transparently!
Isolated Connections
Isolated connections provide separate NATS connections for specific operations, preventing blocking of the main connection. They're available for:
- Proxies:
create_proxy()andcreate_service_proxy() - Handlers:
register_handler() - Channels:
channel()andprivate_channel()
# 1. Isolated proxy (returns object with proxy and close method)
isolated = client.create_proxy('service', isolated_connection=True)
result = await isolated.proxy.heavy_computation(params)
await isolated.close() # Close when done
# 2. Isolated service proxy
service_isolated = await client.create_service_proxy('service',
isolated_connection=True
)
await service_isolated.proxy.process_large_dataset()
await service_isolated.close()
# 3. Isolated handler (for CPU-intensive services)
cleanup = await client.register_handler('heavy-service', handlers,
isolated_connection=True
)
# Later: await cleanup() to unregister and close
# 4. Isolated channel
channel = await client.channel('data-stream', isolated_connection=True)
Important:
- Isolated connections must be closed explicitly when no longer needed
- Alternatively, closing the main client (
client.disconnect()) will automatically close all isolated connections and channels - Use isolated connections for: CPU-intensive operations, high-throughput streaming, or operations that might block
Property Access
Expose class properties for remote access using descriptors:
from camera_ui_rpc import RPCClass, RPCProperty
@RPCClass
class ConfigService:
# Use RPCProperty descriptor
name = RPCProperty()
version = RPCProperty()
max_connections = RPCProperty()
def __init__(self):
self.name = 'Config Service'
self.version = '1.0.0'
self.max_connections = 100
# Client usage
config = client.create_proxy('config')
# Read properties
name = await config.name
version = await config.version
# Update properties via setter methods
await config.setName('Updated Service')
await config.setMaxConnections(200)
Configuration
Client Options
from camera_ui_rpc.types import RPCClientOptions
options: RPCClientOptions = {
'servers': ['nats://localhost:4222'], # NATS servers
'name': 'my-client', # Client identifier
'auth': { # Authentication
'user': 'username',
'pass': 'password',
'token': 'auth_token',
'jwt': 'jwt_token'
},
'tls': { # TLS configuration
'cert': 'path/to/cert.pem',
'key': 'path/to/key.pem',
'ca': 'path/to/ca.pem'
},
'timeout': 5000, # Default timeout (ms)
'reconnect': True, # Auto-reconnect
'max_reconnect_attempts': -1, # -1 for infinite
'reconnect_time_wait': 2000, # Reconnect delay (ms)
'max_payload_size': 4194304 # Override server limit
}
Service Options
from camera_ui_rpc import ServiceConfig
service_config = ServiceConfig(
name='my-service', # Service name
version='1.0.0', # Semantic version
description='My service', # Service description
queue_group='my-service-queue', # Load balancing group
metadata={ # Custom metadata
'region': 'us-east',
'environment': 'production'
}
)
# Register service
await server.service.register_handler(service_config, MyHandlers())
Cross-Language Compatibility
This library is fully compatible with the TypeScript/Node.js implementation:
- Identical wire protocol and message format
- Same feature set and API patterns
- Seamless service interoperability
- Shared channel communication
Example: Python calling TypeScript service
# TypeScript service running
math = await client.create_service_proxy('math-service')
result = await math.calculate(10, 20) # Works seamlessly!
Examples
Check the examples/ directory for complete examples:
service.py- Basic service implementationchannel_communication.py- Channel messagingstreaming.py- Streaming patternsmulti_service.py- Multiple services interactionlarge_data_transfer.py- Handling large payloads- And 17 more examples...
License
MIT
Part of the camera.ui ecosystem - A comprehensive camera management solution.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file camera_ui_rpc-1.0.0.tar.gz.
File metadata
- Download URL: camera_ui_rpc-1.0.0.tar.gz
- Upload date:
- Size: 38.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f2df0c3c0f80ce0954c3579435e426807e79193bed959fa9d218bd9acdb24d8b
|
|
| MD5 |
a93ecd569ad971414ce73079377277cd
|
|
| BLAKE2b-256 |
e0a27308b7ec9719db1d2c04a5b9773d73f24ddeb243590e1f6f979a52dbf913
|