Python bindings for rsolace
Project description
pyrsolace
Python bindings for rsolace with full async/await support and proper GIL release.
✨ Key Features
- 🐍 Full asyncio Support: Native async/await patterns with asyncio
- 🔓 GIL Release: Properly releases GIL during blocking operations
- 🔄 Sync + Async: Choose the best pattern for your use case
- 📦 Complete API: Pub/Sub, Request/Reply, Message Caching, Events
- ⚡ High Performance: Zero-copy message handling from Rust
- 🛡️ Type Safe: Complete type hints with
.pyifiles
🚀 Installation
# Using pip
pip install pyrsolace
# Using uv (recommended)
uv add pyrsolace
🔥 Quick Start
Async Example (NEW!)
import asyncio
import pyrsolace
async def main():
# Initialize client
client = pyrsolace.Client()
# Connect to Solace broker
connected = client.connect(
host="tcp://localhost:55555",
vpn="default",
username="admin",
password="admin",
compression_level=5
)
if not connected:
print("Failed to connect")
return
# Subscribe to topics
client.subscribe_ext("test/topic/*", pyrsolace.SubscribeFlag.RequestConfirm)
# Get async receivers
async_msg_receiver = client.get_async_msg_receiver()
async_event_receiver = client.get_async_event_receiver()
# Handle messages asynchronously
async def message_handler():
while True:
try:
msg = await async_msg_receiver.recv()
print(f"Async received: {msg.topic} - {msg.data}")
except Exception as e:
print(f"Message handler error: {e}")
break
# Handle events asynchronously
async def event_handler():
while True:
try:
event = await async_event_receiver.recv()
print(f"Event: {event.session_event}")
except Exception as e:
print(f"Event handler error: {e}")
break
# Start async handlers
msg_task = asyncio.create_task(message_handler())
event_task = asyncio.create_task(event_handler())
# Send some test messages
for i in range(5):
msg = pyrsolace.Msg(
topic="test/topic/async",
data=f"Async message {i}".encode()
)
client.send_msg(msg)
await asyncio.sleep(1)
# Send async request (true async, no blocking!)
try:
request_msg = pyrsolace.Msg(
topic="test/request",
data=b"Hello async world!",
corr_id="req123"
)
response = await client.send_request_async(request_msg)
print(f"Async response: {response.data}")
except Exception as e:
print(f"Async request failed: {e}")
# Cleanup
await asyncio.sleep(2)
msg_task.cancel()
event_task.cancel()
client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Sync Example (Enhanced with GIL Release)
import pyrsolace
import threading
import time
def message_handler(receiver, name):
"""Handle messages synchronously with proper GIL release."""
while True:
try:
# This properly releases GIL, allowing other threads to run
msg = receiver.recv()
print(f"{name} received: {msg.topic} - {msg.data}")
except Exception as e:
print(f"{name} handler error: {e}")
break
def main():
client = pyrsolace.Client()
# Connect
connected = client.connect(
host="tcp://localhost:55555",
vpn="default",
username="admin",
password="admin"
)
if not connected:
print("Failed to connect")
return
# Subscribe
client.subscribe("test/topic/*")
# Get receivers
msg_receiver = client.get_msg_receiver()
event_receiver = client.get_event_receiver()
# Start background threads (GIL is properly released)
msg_thread = threading.Thread(
target=message_handler,
args=(msg_receiver, "Messages")
)
event_thread = threading.Thread(
target=message_handler,
args=(event_receiver, "Events")
)
msg_thread.start()
event_thread.start()
# Send messages
for i in range(5):
msg = pyrsolace.Msg(
topic="test/topic/sync",
data=f"Sync message {i}".encode()
)
client.send_msg(msg)
time.sleep(1)
client.disconnect()
if __name__ == "__main__":
main()
Callback-based Example
import pyrsolace
import time
def on_message(msg):
"""Message callback function."""
print(f"Callback received: {msg.topic} - {msg.data}")
def on_event(event):
"""Event callback function."""
print(f"Event: {event.session_event} - {event.info}")
def main():
client = pyrsolace.Client()
# Set callbacks
client.set_msg_callback(on_message)
client.set_event_callback(on_event)
# Connect and subscribe
client.connect(
host="tcp://localhost:55555",
vpn="default",
username="admin",
password="admin"
)
client.subscribe("test/topic/*")
# Send messages
for i in range(5):
msg = pyrsolace.Msg(
topic="test/topic/callback",
data=f"Callback message {i}".encode()
)
client.send_msg(msg)
time.sleep(1)
client.disconnect()
if __name__ == "__main__":
main()
🔄 Sync vs Async
| Pattern | Best For | Usage | GIL Behavior |
|---|---|---|---|
| Callbacks | Simple event handling | client.set_msg_callback(fn) |
Released during callback |
| Sync Receivers | Threading, blocking I/O | receiver.recv() |
Released during recv |
| Async Receivers | High concurrency | await async_receiver.recv() |
N/A (async) |
Migration from Sync to Async
# Before: Sync only
receiver = client.get_msg_receiver()
msg = receiver.recv() # Blocks thread (but releases GIL)
# After: True async
async_receiver = client.get_async_msg_receiver()
msg = await async_receiver.recv() # Non-blocking, async
# Mixed: Use both in same application
sync_receiver = client.get_msg_receiver() # For background threads
async_receiver = client.get_async_msg_receiver() # For async tasks
📋 API Reference
Client Class
class Client:
def connect(self, host: str, vpn: str, username: str, password: str, ...) -> bool
def disconnect(self) -> None
def subscribe(self, topic: str) -> ReturnCode
def subscribe_ext(self, topic: str, flag: SubscribeFlag) -> ReturnCode
# Message sending
def send_msg(self, msg: Msg) -> ReturnCode
def send_reply(self, rx_msg: Msg, reply_msg: Msg) -> ReturnCode
# Sync receivers (with GIL release)
def get_msg_receiver(self) -> MsgReceiver
def get_request_receiver(self) -> MsgReceiver
def get_p2p_receiver(self) -> MsgReceiver
def get_event_receiver(self) -> EventReceiver
# Async receivers (NEW!)
def get_async_msg_receiver(self) -> AsyncMsgReceiver
def get_async_request_receiver(self) -> AsyncMsgReceiver
def get_async_p2p_receiver(self) -> AsyncMsgReceiver
def get_async_event_receiver(self) -> AsyncEventReceiver
# Request/Reply
def send_request(self, msg: Msg, timeout: int) -> MsgReceiver
async def send_request_async(self, msg: Msg) -> Msg # NEW!
# Callbacks
def set_msg_callback(self, callback: Callable[[Msg], None]) -> None
def set_event_callback(self, callback: Callable[[Event], None]) -> None
Message Class
class Msg:
def __init__(self, topic: str = None, data: bytes = None, ...) -> None
# Properties
topic: str
data: bytes
corr_id: str
reply_topic: str
delivery_mode: DeliveryMode
# Methods
def set_user_prop(self, key: str, value: str) -> None
def get_user_prop(self, key: str) -> str
def dump(self) -> str
Receiver Classes
class MsgReceiver:
def recv(self) -> Msg # Releases GIL
class AsyncMsgReceiver:
async def recv(self) -> Msg # True async
class EventReceiver:
def recv(self) -> Event # Releases GIL
class AsyncEventReceiver:
async def recv(self) -> Event # True async
🛠️ Development
Building from Source
# Clone repository
git clone https://github.com/Yvictor/rsolace.git
cd rsolace/pyrsolace
# Using uv (recommended)
uv build
uv pip install -e .
# Using maturin
pip install maturin
maturin develop --release
Running Tests
# Run tests
uv run pytest tests/
# Run specific tests
uv run pytest tests/test_msg.py -v
🔧 Configuration
Connection Parameters
client.connect(
host="tcp://broker:55555", # Broker URL
vpn="vpn_name", # VPN name
username="user", # Username
password="pass", # Password
client_name="my_client", # Client identifier
compression_level=5, # 1-9 (higher = more compression)
connect_timeout_ms=30000, # Connection timeout
connect_retries=3, # Retry attempts
reconnect_retries=10, # Auto-reconnect attempts
keep_alive_ms=3000, # Keep-alive interval
reapply_subscriptions=True, # Restore subs on reconnect
generate_sender_id=True, # Add sender ID to messages
generate_timestamps=True, # Add timestamps
)
Message Properties
msg = pyrsolace.Msg(
topic="my/topic",
data=b"payload",
corr_id="request-123",
reply_topic="reply/topic",
delivery_mode=pyrsolace.DeliveryMode.Persistent
)
# User properties
msg.set_user_prop("priority", "high")
msg.set_user_prop("version", "1.0")
🎯 Advanced Examples
Async Producer-Consumer Pattern
import asyncio
from asyncio import Queue
async def producer(client, queue):
"""Produce messages to queue."""
for i in range(100):
msg = pyrsolace.Msg(
topic=f"data/stream/{i % 10}",
data=f"Data packet {i}".encode()
)
await queue.put(msg)
await asyncio.sleep(0.1)
async def consumer(client, queue):
"""Consume messages from queue."""
while True:
msg = await queue.get()
client.send_msg(msg)
queue.task_done()
async def message_processor(client):
"""Process incoming messages."""
receiver = client.get_async_msg_receiver()
while True:
msg = await receiver.recv()
# Process message asynchronously
await process_message(msg)
async def main():
client = pyrsolace.Client()
client.connect(...)
queue = Queue(maxsize=100)
# Start producer, consumer, and processor
await asyncio.gather(
producer(client, queue),
consumer(client, queue),
message_processor(client)
)
Request/Reply Service
async def request_handler(client):
"""Handle incoming requests asynchronously."""
receiver = client.get_async_request_receiver()
while True:
request = await receiver.recv()
# Process request
response_data = await process_request(request.data)
# Send reply
reply = pyrsolace.Msg(
topic=request.reply_topic,
data=response_data,
corr_id=request.corr_id
)
client.send_reply(request, reply)
🚀 Performance Tips
Async Best Practices
- Use Semaphores: Limit concurrent operations
semaphore = asyncio.Semaphore(10)
async with semaphore:
await process_message(msg)
- Batch Operations: Group related operations
messages = []
async for msg in message_stream():
messages.append(msg)
if len(messages) >= 100:
await process_batch(messages)
messages.clear()
- Graceful Shutdown: Cancel tasks properly
try:
await main_task
except asyncio.CancelledError:
await cleanup()
📚 Documentation
- Main Project: See root README for complete documentation
- Rust Library: Check rsolace for Rust-specific features
- Type Hints: Complete API in
pyrsolace.pyi - Examples: More examples in
tests/directory
🤝 Contributing
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Run tests:
uv run pytest - Submit a pull request
📄 License
GPL-3.0-only License - see LICENSE for details.
Powered by rsolace ⚡ - High-performance Rust Solace bindings
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyrsolace-0.3.4-cp37-abi3-win_amd64.whl.
File metadata
- Download URL: pyrsolace-0.3.4-cp37-abi3-win_amd64.whl
- Upload date:
- Size: 2.0 MB
- Tags: CPython 3.7+, Windows x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
28380d32bc5f449e2eef658bc2ff0027568646b194b1c5a6ca5d7e577ee98aad
|
|
| MD5 |
9c67543c191b66c0608de8dc3f8c9f2d
|
|
| BLAKE2b-256 |
816336836dceb3bb489e5dacfa7e77fc1a013ec499aa2e9107cc129d1a841aac
|
File details
Details for the file pyrsolace-0.3.4-cp37-abi3-win32.whl.
File metadata
- Download URL: pyrsolace-0.3.4-cp37-abi3-win32.whl
- Upload date:
- Size: 1.7 MB
- Tags: CPython 3.7+, Windows x86
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
94d120ddce6c29be17c99dec64be605a7922bd799b2169bcfbf8aa665821c05f
|
|
| MD5 |
929519063919cd9604ca1bbfc86fb105
|
|
| BLAKE2b-256 |
84fb3c7080cbaf13b0e5945c314df65232e7ab757b38abd7c183e3f4fbe6156e
|
File details
Details for the file pyrsolace-0.3.4-cp37-abi3-manylinux_2_34_aarch64.whl.
File metadata
- Download URL: pyrsolace-0.3.4-cp37-abi3-manylinux_2_34_aarch64.whl
- Upload date:
- Size: 1.3 MB
- Tags: CPython 3.7+, manylinux: glibc 2.34+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c2132444a936241d9b667ece647e591ba0da8e0b06cbce98dfeffd1ba6aa242
|
|
| MD5 |
1fccb6e462c92f324724cb22909a563c
|
|
| BLAKE2b-256 |
577bce9066c22dce0c9a61b0f62d03f5bf7b52b6fe702b379aa707161dc9d5d0
|
File details
Details for the file pyrsolace-0.3.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: pyrsolace-0.3.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 1.2 MB
- Tags: CPython 3.7+, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60d9059b5fc1ec88663a0dfb2a9b4a13b02f529b805789dfcd9f21692202c779
|
|
| MD5 |
13bf51e3d02a4a45c30e104752c1e4f7
|
|
| BLAKE2b-256 |
c46564ba9c35b4f6ba2030b6fe24910df1163b74cf3e1a28f54304779d0cc38a
|
File details
Details for the file pyrsolace-0.3.4-cp37-abi3-macosx_11_0_arm64.whl.
File metadata
- Download URL: pyrsolace-0.3.4-cp37-abi3-macosx_11_0_arm64.whl
- Upload date:
- Size: 2.0 MB
- Tags: CPython 3.7+, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ca2cbd9406f2e9ea3862f39ddef77903f2586419696227feb03dd36d3aace4f
|
|
| MD5 |
3b1328cec1bf51aca508becc26aec735
|
|
| BLAKE2b-256 |
831820af595199de55c352de4a95de3dc372df4e27d3f4df1ff9ab7c6bda7ed7
|
File details
Details for the file pyrsolace-0.3.4-cp37-abi3-macosx_10_12_x86_64.whl.
File metadata
- Download URL: pyrsolace-0.3.4-cp37-abi3-macosx_10_12_x86_64.whl
- Upload date:
- Size: 2.2 MB
- Tags: CPython 3.7+, macOS 10.12+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
62878d94cd8cc41c1dd1cabbc498dc62c704c9b9bc0e0fc9635de5ff10a94535
|
|
| MD5 |
aed03a43db996d7bbf7b50664dc3f079
|
|
| BLAKE2b-256 |
13bba38d668e226bde1679b4ead9ed4d5780062125106922fa209689a389c85b
|