Autobahn Client
Project description
Autobahn Client
A modern, production-ready Python client for Autobahn WebSocket communication with built-in RPC support, automatic reconnection, and comprehensive error handling.
Features
- 🚀 WebSocket Communication: Reliable WebSocket client with automatic reconnection
- 🔧 RPC System: Powerful decorator-based RPC with type safety and validation
- ⚡ Async/Await: Fully async/await compatible
- 🛡️ Error Handling: Comprehensive error handling with detailed logging
- 🔄 Auto-Reconnection: Automatic reconnection with configurable intervals
- 📝 Type Safety: Full type annotations and validation
- 🏗️ Protobuf: Protocol Buffers for efficient message serialization
Installation
pip install autobahn_client
Quick Start
import asyncio
from autobahn_client import Autobahn
from autobahn_client.util import Address
async def main():
# Create client
client = Autobahn(Address("localhost", 8080))
# Connect
await client.begin()
# Subscribe to a topic
async def message_handler(payload: bytes):
print(f"Received: {payload}")
await client.subscribe("my-topic", message_handler)
# Publish a message
await client.publish("my-topic", b"Hello, World!")
# Keep running
await asyncio.sleep(1)
asyncio.run(main())
Configuration
Basic Configuration
from autobahn_client import Autobahn
from autobahn_client.util import Address
# Basic configuration
client = Autobahn(
address=Address("localhost", 8080),
reconnect=True, # Enable auto-reconnection
reconnect_interval_seconds=1.0 # Reconnect every 1 second
)
Connection Management
# Start the client
await client.begin()
# Check connection status
await client.ping()
# Manual publish (will auto-reconnect if needed)
await client.publish("topic", b"data")
# Subscribe with error handling
async def safe_handler(payload: bytes):
try:
# Process message
print(f"Got: {payload}")
except Exception as e:
print(f"Handler error: {e}")
await client.subscribe("events", safe_handler)
# Unsubscribe
await client.unsubscribe("events")
Pub/Sub System Architecture
The following diagram shows how the publish/subscribe system works:
graph TD
A["Publisher Client"] --> B["WebSocket Connection"]
C["Subscriber Client 1"] --> D["WebSocket Connection"]
E["Subscriber Client 2"] --> F["WebSocket Connection"]
B --> G["Autobahn Server"]
D --> G
F --> G
G --> H["Topic: 'chat'"]
G --> I["Topic: 'events'"]
G --> J["Topic: 'notifications'"]
A --> K["1. Publish Message<br/>topic='chat'<br/>payload='Hello World!'"]
K --> B
B --> G
G --> L["2. Forward to Subscribers"]
L --> M["3. Deliver to Client 1<br/>topic='chat'<br/>payload='Hello World!'"]
L --> N["3. Deliver to Client 2<br/>topic='chat'<br/>payload='Hello World!'"]
M --> D
N --> F
D --> O["4. Callback Executed<br/>message_handler(payload)"]
F --> P["4. Callback Executed<br/>message_handler(payload)"]
Q["Subscribe Process"] --> R["client.subscribe('chat', handler)"]
R --> S["SUBSCRIBE message sent"]
S --> G
G --> T["Client added to topic subscribers"]
style A fill:#e3f2fd
style C fill:#e8f5e8
style E fill:#e8f5e8
style G fill:#fff3e0
style H fill:#f3e5f5
style I fill:#f3e5f5
style J fill:#f3e5f5
style Q fill:#fce4ec
How it works:
- Subscription: Clients subscribe to topics they're interested in
- Publication: Any client can publish messages to any topic
- Distribution: The server forwards messages to all subscribers of that topic
- Delivery: Each subscriber's callback function is executed with the message
RPC System
The Autobahn client includes a powerful RPC system with decorators for both client calls and server functions. RPC functions are automatically discovered and registered when you call begin() - no manual registration required!
RPC Server (Service Provider)
Create RPC services using the @rpc_function decorator:
from autobahn_client import Autobahn
from autobahn_client.proto.message_pb2 import PublishMessage
from google.protobuf.message import Message
import asyncio
# Define your RPC functions - they will be auto-registered!
@Autobahn.rpc_function()
async def get_user(request: PublishMessage) -> PublishMessage:
"""Get user information."""
# Process the request
user_data = f"User: {request.topic}"
# Return response
return PublishMessage(
topic="user-response",
payload=user_data.encode()
)
@Autobahn.rpc_function()
async def process_data(request: PublishMessage) -> None:
"""Process data without returning anything."""
print(f"Processing: {request.payload.decode()}")
# No return value
async def main():
client = Autobahn(Address("localhost", 8080))
# Just call begin() - RPC functions are automatically registered!
await client.begin()
print("RPC server running with auto-registered functions...")
# Keep the server running
while True:
await asyncio.sleep(1)
asyncio.run(main())
RPC Client (Service Consumer)
Create RPC client calls using the @rpc_callable decorator:
from autobahn_client import Autobahn
from autobahn_client.proto.message_pb2 import PublishMessage
import asyncio
# Define RPC client methods
@Autobahn.rpc_callable(timeout_ms=5000)
async def call_get_user(request: PublishMessage) -> PublishMessage:
"""Client method to call get_user RPC."""
pass # Implementation handled by decorator
@Autobahn.rpc_callable(timeout_ms=3000)
async def call_process_data(request: PublishMessage) -> None:
"""Client method to call process_data RPC."""
pass # Implementation handled by decorator
async def main():
client = Autobahn(Address("localhost", 8080))
await client.begin()
# Make RPC calls
try:
# Call with return value
request = PublishMessage(topic="john_doe", payload=b"get user info")
response = await call_get_user(client, request)
print(f"User info: {response.payload.decode()}")
# Call without return value
data_request = PublishMessage(topic="data", payload=b"some data")
await call_process_data(client, data_request)
print("Data processed successfully")
except TimeoutError as e:
print(f"RPC timeout: {e}")
except Exception as e:
print(f"RPC error: {e}")
asyncio.run(main())
RPC Type Safety
All RPC functions must have proper type annotations:
from typing import Optional
from google.protobuf.message import Message
# ✅ Correct: Proper type annotations
@Autobahn.rpc_function()
async def valid_rpc(request: PublishMessage) -> Optional[PublishMessage]:
if request.payload:
return PublishMessage(topic="response", payload=b"data")
return None
# ❌ Invalid: Missing type annotations
@Autobahn.rpc_function()
async def invalid_rpc(request): # No type annotation
return "error"
# ❌ Invalid: Wrong parameter count
@Autobahn.rpc_function()
async def invalid_rpc2(req1: Message, req2: Message) -> Message: # Too many params
pass
How RPC Auto-Registration Works
- Decoration Time: When you use
@Autobahn.rpc_function(), the function is added to a global registry - Connection Time: When you call
client.begin(), all registered RPC functions are automatically discovered and registered with that client instance - Ready to Serve: Your RPC services are immediately available to handle incoming requests
# Step 1: Decorate your functions (happens at import time)
@Autobahn.rpc_function()
async def my_service(request: MyRequest) -> MyResponse:
return MyResponse(data="processed")
# Step 2: Create client and call begin() - auto-registration happens here!
async def main():
client = Autobahn(Address("localhost", 8080))
await client.begin() # 🎉 my_service is now automatically registered!
# Your RPC service is ready to handle calls
await asyncio.sleep(float('inf'))
RPC System Architecture
The following diagram shows the complete RPC request/response flow:
graph TD
A["RPC Client"] --> B["@rpc_callable decorated function"]
C["RPC Server"] --> D["@rpc_function decorated function"]
B --> E["1. Generate Call ID<br/>uuid4()"]
E --> F["2. Serialize Request<br/>protobuf → bytes"]
F --> G["3. Subscribe to Response Topic<br/>RPC/FUNCTIONAL_SERVICE/OUTPUT/{call_id}"]
G --> H["4. Publish Request<br/>Topic: RPC/FUNCTIONAL_SERVICE/{func_signature}<br/>Payload: RPCRequestMessage"]
H --> I["WebSocket Transport"]
I --> J["5. Server Receives Request"]
J --> K["6. Deserialize Request<br/>bytes → protobuf"]
K --> L["7. Execute Business Logic<br/>await rpc_function(request)"]
L --> M["8. Serialize Response<br/>protobuf → bytes"]
M --> N["9. Publish Response<br/>Topic: RPC/FUNCTIONAL_SERVICE/OUTPUT/{call_id}<br/>Payload: RPCResponseMessage"]
N --> O["WebSocket Transport"]
O --> P["10. Client Receives Response"]
P --> Q["11. Deserialize Response<br/>bytes → protobuf"]
Q --> R["12. Return Result<br/>or raise Exception"]
S["Auto-Registration Process"] --> T["@rpc_function adds to global registry"]
T --> U["client.begin() called"]
U --> V["Auto-discover registered functions"]
V --> W["Subscribe to RPC topics"]
W --> X["Ready to handle RPC calls"]
Y["Error Handling"] --> Z["Timeout: asyncio.wait_for()"]
Y --> AA["Serialization Errors"]
Y --> BB["Business Logic Exceptions"]
Y --> CC["Connection Failures"]
style A fill:#e3f2fd
style C fill:#e8f5e8
style I fill:#fff3e0
style O fill:#fff3e0
style S fill:#f1f8e9
style Y fill:#ffebee
RPC Flow Explained:
- Client Side: Decorated function generates unique call ID and subscribes to response topic
- Request: Serialized request published to function-specific topic
- Server Side: Auto-registered function receives and processes the request
- Response: Result serialized and published back to the response topic
- Completion: Client receives response, deserializes it, and returns the result
Key Features:
- 🔄 Automatic Registration: Functions decorated with
@rpc_functionare auto-discovered - 🆔 Unique Call IDs: Each RPC call gets a UUID to match requests with responses
- ⏱️ Timeout Handling: Configurable timeouts with proper cleanup
- 🛡️ Error Handling: Comprehensive error handling at every step
- 🔒 Type Safety: Full type validation for requests and responses
Advanced Features
Custom Message Types
from google.protobuf.message import Message
# Define your custom protobuf message types
# (You would typically generate these from .proto files)
@Autobahn.rpc_function()
async def handle_custom_message(request: YourCustomMessage) -> YourResponseMessage:
# Process custom message
response = YourResponseMessage()
response.field = request.input_field
return response
Error Handling
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
async def robust_handler(payload: bytes):
try:
# Your message processing logic
data = json.loads(payload.decode())
# Process data...
except json.JSONDecodeError:
logging.error("Invalid JSON received")
except Exception as e:
logging.error(f"Handler error: {e}")
await client.subscribe("topic", robust_handler)
Connection Monitoring
async def monitor_connection():
client = Autobahn(Address("localhost", 8080), reconnect=True)
await client.begin()
while True:
try:
await client.ping()
print("Connection healthy")
except ConnectionError:
print("Connection lost - will auto-reconnect")
await asyncio.sleep(5)
Error Handling & Logging
The client provides comprehensive error handling and logging:
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# The client will automatically log:
# - Connection events
# - RPC calls and responses
# - Errors and exceptions
# - Reconnection attempts
# Handle specific errors
try:
await client.publish("topic", b"data")
except ConnectionError:
print("Not connected")
except Exception as e:
print(f"Unexpected error: {e}")
API Reference
Autobahn Class
Constructor
Autobahn(
address: Address, # WebSocket server address
reconnect: bool = True, # Enable auto-reconnection
reconnect_interval_seconds: float = 1.0 # Reconnection interval
)
Methods
async begin()- Start the client, connect to server, and auto-register all RPC functionsasync publish(topic: str, payload: bytes)- Publish message to topicasync subscribe(topic: str, callback: Callable)- Subscribe to topicasync unsubscribe(topic: str)- Unsubscribe from topicasync ping()- Send ping to server
Decorators
@Autobahn.rpc_function()- Mark function as RPC service (auto-registered onbegin())@Autobahn.rpc_callable(timeout_ms: float = 3000)- Create RPC client call
RPC Auto-Registration
When you call begin(), the client automatically:
- Discovers all functions decorated with
@rpc_function - Registers them as RPC services
- Sets up WebSocket subscriptions to handle incoming RPC calls
- Logs the registration process
No manual registration required!
Address Class
Address(host: str, port: int)
Simple dataclass for WebSocket server address.
Protocol Buffers
This package uses Protocol Buffers for message serialization. Key message types:
PublishMessage- Standard publish/subscribe messagesTopicMessage- Topic subscription/unsubscriptionRPCRequestMessage- RPC request messagesRPCResponseMessage- RPC response messages
Compiling Proto Files
If you modify the .proto files:
# Install protobuf compiler
# macOS: brew install protobuf
# Ubuntu: apt-get install protobuf-compiler
# Compile proto files
protoc --python_out=. proto/message.proto
Best Practices
1. Always Use Type Annotations
# ✅ Good
@Autobahn.rpc_function()
async def process_user(request: UserRequest) -> UserResponse:
pass
# ❌ Bad
@Autobahn.rpc_function()
async def process_user(request):
pass
2. Handle Errors Gracefully
async def safe_rpc_call():
try:
result = await my_rpc_call(client, request)
return result
except TimeoutError:
logging.warning("RPC call timed out")
return None
except Exception as e:
logging.error(f"RPC call failed: {e}")
raise
3. Use Connection Monitoring
# Enable auto-reconnection
client = Autobahn(address, reconnect=True)
# Monitor connection health
async def health_check():
while True:
try:
await client.ping()
except Exception:
pass # Auto-reconnection will handle it
await asyncio.sleep(30)
4. Proper Resource Cleanup
async def main():
client = Autobahn(Address("localhost", 8080))
try:
await client.begin()
# Your application logic
except KeyboardInterrupt:
print("Shutting down...")
finally:
# Cleanup is handled automatically
pass
Examples
Check out the examples/ directory for complete working examples:
basic_client.py- Basic pub/sub clientrpc_server.py- RPC service providerrpc_client.py- RPC service consumerchat_application.py- Complete chat application
Contributing
- Fork the repository
- Create a feature branch
- Add tests for new features
- Ensure all tests pass
- Submit a pull request
License
MIT License - see LICENSE file for details.
Changelog
v0.1.1 (Latest)
- 🎉 Complete RPC system overhaul
- ✅ Fixed type extraction bug
- ✅ Fixed response handling logic
- ✅ Fixed race conditions in service registration
- ✅ Added instance-specific service registry
- ✅ Comprehensive error handling and logging
- ✅ Added input validation for RPC decorators
- ✅ Improved Optional type handling
- ✅ Added resource cleanup mechanisms
v0.1.0
- Basic WebSocket client
- Simple pub/sub functionality
- Initial RPC implementation
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file autobahn_client-0.1.2.tar.gz.
File metadata
- Download URL: autobahn_client-0.1.2.tar.gz
- Upload date:
- Size: 21.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ecc59d9113f2090aaa0864086de2ef2f96d7c7aaf777bbb9cc3e4f2f5f8a4c39
|
|
| MD5 |
096c52871f0961559f865a08abd1635e
|
|
| BLAKE2b-256 |
0230f574a69840e94569852ba35770bd0f8b1351a385f1e1befa982e815bec62
|
File details
Details for the file autobahn_client-0.1.2-py3-none-any.whl.
File metadata
- Download URL: autobahn_client-0.1.2-py3-none-any.whl
- Upload date:
- Size: 16.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f5f135300acf5aba70ce8c3f46f73b4227028509e56689e29dff92b4fe44c41
|
|
| MD5 |
50adc72436d93f224eda530d34c389a0
|
|
| BLAKE2b-256 |
1d76b0c6bb1bcf9e32964d92f6a361a132d105259367cbde0fb9238458cc64bb
|