Skip to main content

Python bindings for rsolace

Project description

pyrsolace

License: GPL v3 Python PyPI

Python bindings for rsolace with full async/await support and proper GIL release.

✨ Key Features

  • 🐍 Full asyncio Support: Native async/await patterns with asyncio
  • 🔓 GIL Release: Properly releases GIL during blocking operations
  • 🔄 Sync + Async: Choose the best pattern for your use case
  • 📦 Complete API: Pub/Sub, Request/Reply, Message Caching, Events
  • High Performance: Zero-copy message handling from Rust
  • 🛡️ Type Safe: Complete type hints with .pyi files

🚀 Installation

# Using pip
pip install pyrsolace

# Using uv (recommended)
uv add pyrsolace

🔥 Quick Start

Async Example (NEW!)

import asyncio
import pyrsolace

async def main():
    # Initialize client
    client = pyrsolace.Client()
    
    # Connect to Solace broker
    connected = client.connect(
        host="tcp://localhost:55555",
        vpn="default",
        username="admin",
        password="admin",
        compression_level=5
    )
    
    if not connected:
        print("Failed to connect")
        return
    
    # Subscribe to topics
    client.subscribe_ext("test/topic/*", pyrsolace.SubscribeFlag.RequestConfirm)
    
    # Get async receivers
    async_msg_receiver = client.get_async_msg_receiver()
    async_event_receiver = client.get_async_event_receiver()
    
    # Handle messages asynchronously
    async def message_handler():
        while True:
            try:
                msg = await async_msg_receiver.recv()
                print(f"Async received: {msg.topic} - {msg.data}")
            except Exception as e:
                print(f"Message handler error: {e}")
                break
    
    # Handle events asynchronously
    async def event_handler():
        while True:
            try:
                event = await async_event_receiver.recv()
                print(f"Event: {event.session_event}")
            except Exception as e:
                print(f"Event handler error: {e}")
                break
    
    # Start async handlers
    msg_task = asyncio.create_task(message_handler())
    event_task = asyncio.create_task(event_handler())
    
    # Send some test messages
    for i in range(5):
        msg = pyrsolace.Msg(
            topic="test/topic/async",
            data=f"Async message {i}".encode()
        )
        client.send_msg(msg)
        await asyncio.sleep(1)
    
    # Send async request (true async, no blocking!)
    try:
        request_msg = pyrsolace.Msg(
            topic="test/request", 
            data=b"Hello async world!", 
            corr_id="req123"
        )
        
        response = await client.send_request_async(request_msg)
        print(f"Async response: {response.data}")
    except Exception as e:
        print(f"Async request failed: {e}")
    
    # Cleanup
    await asyncio.sleep(2)
    msg_task.cancel()
    event_task.cancel()
    client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Sync Example (Enhanced with GIL Release)

import pyrsolace
import threading
import time

def message_handler(receiver, name):
    """Handle messages synchronously with proper GIL release."""
    while True:
        try:
            # This properly releases GIL, allowing other threads to run
            msg = receiver.recv()
            print(f"{name} received: {msg.topic} - {msg.data}")
        except Exception as e:
            print(f"{name} handler error: {e}")
            break

def main():
    client = pyrsolace.Client()
    
    # Connect
    connected = client.connect(
        host="tcp://localhost:55555",
        vpn="default",
        username="admin", 
        password="admin"
    )
    
    if not connected:
        print("Failed to connect")
        return
    
    # Subscribe
    client.subscribe("test/topic/*")
    
    # Get receivers
    msg_receiver = client.get_msg_receiver()
    event_receiver = client.get_event_receiver()
    
    # Start background threads (GIL is properly released)
    msg_thread = threading.Thread(
        target=message_handler, 
        args=(msg_receiver, "Messages")
    )
    event_thread = threading.Thread(
        target=message_handler, 
        args=(event_receiver, "Events")
    )
    
    msg_thread.start()
    event_thread.start()
    
    # Send messages
    for i in range(5):
        msg = pyrsolace.Msg(
            topic="test/topic/sync",
            data=f"Sync message {i}".encode()
        )
        client.send_msg(msg)
        time.sleep(1)
    
    client.disconnect()

if __name__ == "__main__":
    main()

Callback-based Example

import pyrsolace
import time

def on_message(msg):
    """Message callback function."""
    print(f"Callback received: {msg.topic} - {msg.data}")

def on_event(event):
    """Event callback function."""
    print(f"Event: {event.session_event} - {event.info}")

def main():
    client = pyrsolace.Client()
    
    # Set callbacks
    client.set_msg_callback(on_message)
    client.set_event_callback(on_event)
    
    # Connect and subscribe
    client.connect(
        host="tcp://localhost:55555",
        vpn="default",
        username="admin",
        password="admin"
    )
    
    client.subscribe("test/topic/*")
    
    # Send messages
    for i in range(5):
        msg = pyrsolace.Msg(
            topic="test/topic/callback",
            data=f"Callback message {i}".encode()
        )
        client.send_msg(msg)
        time.sleep(1)
    
    client.disconnect()

if __name__ == "__main__":
    main()

🔄 Sync vs Async

Pattern Best For Usage GIL Behavior
Callbacks Simple event handling client.set_msg_callback(fn) Released during callback
Sync Receivers Threading, blocking I/O receiver.recv() Released during recv
Async Receivers High concurrency await async_receiver.recv() N/A (async)

Migration from Sync to Async

# Before: Sync only
receiver = client.get_msg_receiver()
msg = receiver.recv()  # Blocks thread (but releases GIL)

# After: True async
async_receiver = client.get_async_msg_receiver()
msg = await async_receiver.recv()  # Non-blocking, async

# Mixed: Use both in same application
sync_receiver = client.get_msg_receiver()     # For background threads
async_receiver = client.get_async_msg_receiver()  # For async tasks

📋 API Reference

Client Class

class Client:
    def connect(self, host: str, vpn: str, username: str, password: str, ...) -> bool
    def disconnect(self) -> None
    def subscribe(self, topic: str) -> ReturnCode
    def subscribe_ext(self, topic: str, flag: SubscribeFlag) -> ReturnCode
    
    # Message sending
    def send_msg(self, msg: Msg) -> ReturnCode
    def send_reply(self, rx_msg: Msg, reply_msg: Msg) -> ReturnCode
    
    # Sync receivers (with GIL release)
    def get_msg_receiver(self) -> MsgReceiver
    def get_request_receiver(self) -> MsgReceiver
    def get_p2p_receiver(self) -> MsgReceiver
    def get_event_receiver(self) -> EventReceiver
    
    # Async receivers (NEW!)
    def get_async_msg_receiver(self) -> AsyncMsgReceiver
    def get_async_request_receiver(self) -> AsyncMsgReceiver
    def get_async_p2p_receiver(self) -> AsyncMsgReceiver
    def get_async_event_receiver(self) -> AsyncEventReceiver
    
    # Request/Reply
    def send_request(self, msg: Msg, timeout: int) -> MsgReceiver
    async def send_request_async(self, msg: Msg) -> Msg  # NEW!
    
    # Callbacks
    def set_msg_callback(self, callback: Callable[[Msg], None]) -> None
    def set_event_callback(self, callback: Callable[[Event], None]) -> None

Message Class

class Msg:
    def __init__(self, topic: str = None, data: bytes = None, ...) -> None
    
    # Properties
    topic: str
    data: bytes
    corr_id: str
    reply_topic: str
    delivery_mode: DeliveryMode
    
    # Methods
    def set_user_prop(self, key: str, value: str) -> None
    def get_user_prop(self, key: str) -> str
    def dump(self) -> str

Receiver Classes

class MsgReceiver:
    def recv(self) -> Msg  # Releases GIL

class AsyncMsgReceiver:
    async def recv(self) -> Msg  # True async

class EventReceiver:
    def recv(self) -> Event  # Releases GIL

class AsyncEventReceiver:
    async def recv(self) -> Event  # True async

🛠️ Development

Building from Source

# Clone repository
git clone https://github.com/Yvictor/rsolace.git
cd rsolace/pyrsolace

# Using uv (recommended)
uv build
uv pip install -e .

# Using maturin
pip install maturin
maturin develop --release

Running Tests

# Run tests
uv run pytest tests/

# Run specific tests
uv run pytest tests/test_msg.py -v

🔧 Configuration

Connection Parameters

client.connect(
    host="tcp://broker:55555",          # Broker URL
    vpn="vpn_name",                     # VPN name
    username="user",                    # Username
    password="pass",                    # Password
    client_name="my_client",            # Client identifier
    compression_level=5,                # 1-9 (higher = more compression)
    connect_timeout_ms=30000,           # Connection timeout
    connect_retries=3,                  # Retry attempts
    reconnect_retries=10,               # Auto-reconnect attempts
    keep_alive_ms=3000,                 # Keep-alive interval
    reapply_subscriptions=True,         # Restore subs on reconnect
    generate_sender_id=True,            # Add sender ID to messages
    generate_timestamps=True,           # Add timestamps
)

Message Properties

msg = pyrsolace.Msg(
    topic="my/topic",
    data=b"payload",
    corr_id="request-123",
    reply_topic="reply/topic",
    delivery_mode=pyrsolace.DeliveryMode.Persistent
)

# User properties
msg.set_user_prop("priority", "high")
msg.set_user_prop("version", "1.0")

🎯 Advanced Examples

Async Producer-Consumer Pattern

import asyncio
from asyncio import Queue

async def producer(client, queue):
    """Produce messages to queue."""
    for i in range(100):
        msg = pyrsolace.Msg(
            topic=f"data/stream/{i % 10}",
            data=f"Data packet {i}".encode()
        )
        await queue.put(msg)
        await asyncio.sleep(0.1)

async def consumer(client, queue):
    """Consume messages from queue."""
    while True:
        msg = await queue.get()
        client.send_msg(msg)
        queue.task_done()

async def message_processor(client):
    """Process incoming messages."""
    receiver = client.get_async_msg_receiver()
    while True:
        msg = await receiver.recv()
        # Process message asynchronously
        await process_message(msg)

async def main():
    client = pyrsolace.Client()
    client.connect(...)
    
    queue = Queue(maxsize=100)
    
    # Start producer, consumer, and processor
    await asyncio.gather(
        producer(client, queue),
        consumer(client, queue),
        message_processor(client)
    )

Request/Reply Service

async def request_handler(client):
    """Handle incoming requests asynchronously."""
    receiver = client.get_async_request_receiver()
    
    while True:
        request = await receiver.recv()
        
        # Process request
        response_data = await process_request(request.data)
        
        # Send reply
        reply = pyrsolace.Msg(
            topic=request.reply_topic,
            data=response_data,
            corr_id=request.corr_id
        )
        client.send_reply(request, reply)

🚀 Performance Tips

Async Best Practices

  1. Use Semaphores: Limit concurrent operations
semaphore = asyncio.Semaphore(10)
async with semaphore:
    await process_message(msg)
  1. Batch Operations: Group related operations
messages = []
async for msg in message_stream():
    messages.append(msg)
    if len(messages) >= 100:
        await process_batch(messages)
        messages.clear()
  1. Graceful Shutdown: Cancel tasks properly
try:
    await main_task
except asyncio.CancelledError:
    await cleanup()

📚 Documentation

  • Main Project: See root README for complete documentation
  • Rust Library: Check rsolace for Rust-specific features
  • Type Hints: Complete API in pyrsolace.pyi
  • Examples: More examples in tests/ directory

🤝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Run tests: uv run pytest
  5. Submit a pull request

📄 License

GPL-3.0-only License - see LICENSE for details.


Powered by rsolace ⚡ - High-performance Rust Solace bindings

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

pyrsolace-0.3.9-cp37-abi3-win_amd64.whl (2.0 MB view details)

Uploaded CPython 3.7+Windows x86-64

pyrsolace-0.3.9-cp37-abi3-win32.whl (1.7 MB view details)

Uploaded CPython 3.7+Windows x86

pyrsolace-0.3.9-cp37-abi3-manylinux_2_34_aarch64.whl (1.3 MB view details)

Uploaded CPython 3.7+manylinux: glibc 2.34+ ARM64

pyrsolace-0.3.9-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB view details)

Uploaded CPython 3.7+manylinux: glibc 2.17+ x86-64

pyrsolace-0.3.9-cp37-abi3-macosx_11_0_arm64.whl (2.1 MB view details)

Uploaded CPython 3.7+macOS 11.0+ ARM64

pyrsolace-0.3.9-cp37-abi3-macosx_10_12_x86_64.whl (2.2 MB view details)

Uploaded CPython 3.7+macOS 10.12+ x86-64

File details

Details for the file pyrsolace-0.3.9-cp37-abi3-win_amd64.whl.

File metadata

  • Download URL: pyrsolace-0.3.9-cp37-abi3-win_amd64.whl
  • Upload date:
  • Size: 2.0 MB
  • Tags: CPython 3.7+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.11.5

File hashes

Hashes for pyrsolace-0.3.9-cp37-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 7ab336edf5eef387b908989ca33219704cfd7032ef6c073ec96d33c12d3e6d95
MD5 1a3de898b65fc0645b5a10b6d0c9edaf
BLAKE2b-256 e214be1e26eba3d4a12e07b697ccddc670c6336696e17eeea91b9525dea8d280

See more details on using hashes here.

File details

Details for the file pyrsolace-0.3.9-cp37-abi3-win32.whl.

File metadata

  • Download URL: pyrsolace-0.3.9-cp37-abi3-win32.whl
  • Upload date:
  • Size: 1.7 MB
  • Tags: CPython 3.7+, Windows x86
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.11.5

File hashes

Hashes for pyrsolace-0.3.9-cp37-abi3-win32.whl
Algorithm Hash digest
SHA256 a6b7fa92ae5d02dc97ada9aef61fb32c01e302173286adac24b577e06c314b7d
MD5 b2075eda9da02287bbda3776596ed6d7
BLAKE2b-256 535c5e98486fc3ab79db26b68cf02b92d1dfb4bd7b9a4eea02940191c4bb1474

See more details on using hashes here.

File details

Details for the file pyrsolace-0.3.9-cp37-abi3-manylinux_2_34_aarch64.whl.

File metadata

File hashes

Hashes for pyrsolace-0.3.9-cp37-abi3-manylinux_2_34_aarch64.whl
Algorithm Hash digest
SHA256 fa56199127e6aa8679950085219f81ac89d39ee11e8308a3e34f276c5460abb5
MD5 e0068ad51f074562bb7b7af6469d31de
BLAKE2b-256 e94ac9086bfbb842d572c52864ef6a909f71de3cd9f0891785de819f2e92fcdc

See more details on using hashes here.

File details

Details for the file pyrsolace-0.3.9-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for pyrsolace-0.3.9-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 dd2852d544d88b76c3e2a116de782e41f2c32b17d3db7f20fd7a1e8c352cb9f2
MD5 f1e5e66c09870d525f341b2c58c2a23e
BLAKE2b-256 65bd7c44e4137d68e203c4c0a4168ea150294f7d584f5f7f9974adee380cfd9a

See more details on using hashes here.

File details

Details for the file pyrsolace-0.3.9-cp37-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for pyrsolace-0.3.9-cp37-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 e95655f3081ceb9183a3b3444e53da7d2954d6bc8389fcdf4126d16e17686e11
MD5 482b3f3d5c5a3a753f052cac96625f6b
BLAKE2b-256 d4c7e8fdc4212a64fa31844973fc4ca628f8edb8ea9a340bab615a48c7b74052

See more details on using hashes here.

File details

Details for the file pyrsolace-0.3.9-cp37-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for pyrsolace-0.3.9-cp37-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 b895476b9f626fd7dffcbed87904b25b105663efb830b76e9391165a4b88ca99
MD5 a9561ac709c68912ba537844d7ecebef
BLAKE2b-256 fc846bf12b45f25a603196e4c5ca9ccfdc088ff5e8e5822bf43ba265a62b331d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page