Skip to main content

The official Realtime Pub/Sub client for Python

Project description

Realtime Pub/Sub Client for Python

The realtime-pubsub-client is a Python client library for interacting with Realtime Pub/Sub applications. It enables developers to manage real-time WebSocket connections, handle subscriptions, and process messages efficiently. The library provides a simple and flexible API to interact with realtime applications, supporting features like publishing/sending messages, subscribing to topics, handling acknowledgements, and waiting for replies with timeout support.

Features

  • WebSocket Connection Management: Seamlessly connect and disconnect from the Realtime Pub/Sub service with automatic reconnection support.
  • Topic Subscription: Subscribe and unsubscribe to topics for receiving messages.
  • Topic Publishing: Publish messages to specific topics with optional message types and compression.
  • Message Sending: Send messages to backend applications with optional message types and compression.
  • Event Handling: Handle incoming messages with custom event listeners.
  • Acknowledgements and Replies: Wait for gateway acknowledgements or replies to messages with timeout support.
  • Error Handling: Robust error handling and logging capabilities.
  • Asynchronous Support: Built using asyncio for efficient asynchronous programming.

Installation

Install the realtime-pubsub-client library via pip:

pip install realtime-pubsub-client

Getting Started

This guide will help you set up and use the realtime-pubsub-client library in your Python project.

Connecting to the Server

First, import the RealtimeClient class and create a new instance with the required configuration:

import asyncio
import logging
import os
from realtime_pubsub_client import RealtimeClient


async def main():
    async def get_url():
        # replace with your access token retrieval strategy
        access_token = os.environ.get('ACCESS_TOKEN')
        app_id = os.environ.get('APP_ID')

        # return the WebSocket URL with the access token
        return f"wss://genesis.r7.21no.de/apps/{app_id}?access_token={access_token}"

    client_options = {
        'logger': logging.getLogger('RealtimeClient'),
        'websocket_options': {
            'url_provider': get_url,
        },
    }
    client = RealtimeClient(client_options)

    async def on_session_started(connection_info):
        print('Connection ID:', connection_info['id'])
        # Subscribe to topics here
        await client.subscribe_remote_topic('topic1')
        await client.subscribe_remote_topic('topic2')

    client.on('session.started', on_session_started)

    await client.connect()
    await client.wait_for('session.started')


asyncio.run(main())

Subscribing to Incoming Messages

You can handle messages for specific topics and message types:

Note: The topic and message type are separated by a dot (.) in the event name.

def handle_message(message, reply_fn):
    # Message handling logic here
    print('Received message:', message['data']['payload'])


client.on('topic1.action1', handle_message)

Wildcard subscriptions are also supported:

client.on('topic1.*', handle_message)

Publishing Messages

Publish messages to a topic:

await client.publish('topic1', 'Hello, world!', {
    'messageType': 'text-message',
})

Responding to Incoming Messages

Set up event listeners to handle incoming messages:

async def handle_message(message, reply_fn):
    # Processing the message
    print('Received message:', message['data']['payload'])

    # Sending a reply
    await reply_fn('Message received!', 'ok')


client.on('topic1.text-message', handle_message)

Waiting for Acknowledgements and Replies

  • wait_for_ack(timeout=None): Waits for an acknowledgement of the message, with an optional timeout in seconds.
  • wait_for_reply(timeout=None): Waits for a reply to the message, with an optional timeout in seconds.

Wait for the Realtime Gateway acknowledgement after publishing a message:

waiter = await client.publish('secure/peer-to-peer1', 'Hi', {
    'messageType': 'greeting',
})
await waiter.wait_for_ack()

Wait for the Realtime Gateway acknowledgement after sending a message:

waiter = await client.send({
    # Message payload
}, {
    'messageType': 'create',
})
await waiter.wait_for_ack()

Wait for a reply with a timeout:

waiter = await client.send({
    # Message payload
}, {
    'messageType': 'create',
})
await waiter.wait_for_reply(timeout=5)  # Wait for up to 5 seconds

Error Handling

Handle errors and disconnections:

def on_error(error):
    print('WebSocket error:', error)


def on_close(event):
    print('WebSocket closed:', event)


client.on('error', on_error)
client.on('close', on_close)

API Reference

RealtimeClient

Constructor

RealtimeClient(config)

Creates a new RealtimeClient instance.

  • config: Configuration options for the client.

Methods

  • connect(): Connects the client to the WebSocket Messaging Gateway.

    await client.connect()
    
  • disconnect(): Terminates the WebSocket connection.

    await client.disconnect()
    
  • subscribe_remote_topic(topic): Subscribes the connection to a remote topic.

    await client.subscribe_remote_topic(topic)
    
  • unsubscribe_remote_topic(topic): Unsubscribes the connection from a remote topic.

    await client.unsubscribe_remote_topic(topic)
    
  • publish(topic, payload, options=None): Publishes a message to a topic.

    waiter = await client.publish(topic, payload, options)
    

    Returns a WaitFor instance to wait for acknowledgements or replies.

  • send(payload, options=None): Sends a message to the server.

    waiter = await client.send(payload, options)
    

    Returns a WaitFor instance to wait for acknowledgements or replies.

  • wait(ms): Waits for a specified duration in milliseconds. Utility function for waiting in async functions.

    await wait(ms)
    

Events

  • 'session.started': Emitted when the session starts.

    client.on('session.started', on_session_started)
    
  • 'error': Emitted on WebSocket errors.

    client.on('error', on_error)
    
  • 'close': Emitted when the WebSocket connection closes.

    client.on('close', on_close)
    
  • Custom Events: Handle custom events based on topic and message type.

    client.on('TOPIC_NAME.MESSAGE_TYPE', handle_message)
    

    Note: Wildcard subscriptions are also supported.

License

This library is licensed under the MIT License.


For more detailed examples and advanced configurations, please refer to the documentation.

Notes

  • Ensure that you have an account and an app set up with Realtime Pub/Sub.
  • Customize the url_provider or URL to retrieve the access token for connecting to your realtime application.
  • Implement the get_auth_token function according to your authentication mechanism.
  • Optionally use the logger option to integrate with your application's logging system.
  • Handle errors and disconnections gracefully to improve the robustness of your application.
  • Make sure to handle timeouts when waiting for replies to avoid hanging operations.

Feel free to contribute to this project by submitting issues or pull requests on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

realtime_pubsub_client-0.1.4.tar.gz (10.5 kB view hashes)

Uploaded Source

Built Distribution

realtime_pubsub_client-0.1.4-py3-none-any.whl (10.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page