The official Realtime Pub/Sub client for Python
Project description
Realtime Pub/Sub Client for Python
The realtime-pubsub-client
is a Python client library for interacting
with Realtime Pub/Sub applications. It enables developers to manage real-time WebSocket
connections, handle subscriptions, and process messages efficiently. The library provides a simple and flexible API to
interact with realtime applications, supporting features like publishing/sending messages, subscribing to topics,
handling acknowledgements, and waiting for replies with timeout support.
Features
- WebSocket Connection Management: Seamlessly connect and disconnect from the Realtime Pub/Sub service with automatic reconnection support.
- Topic Subscription: Subscribe and unsubscribe to topics for receiving messages.
- Topic Publishing: Publish messages to specific topics with optional message types and compression.
- Message Sending: Send messages to backend applications with optional message types and compression.
- Event Handling: Handle incoming messages with custom event listeners.
- Acknowledgements and Replies: Wait for gateway acknowledgements or replies to messages with timeout support.
- Error Handling: Robust error handling and logging capabilities.
- Asynchronous Support: Built using
asyncio
for efficient asynchronous programming.
Installation
Install the realtime-pubsub-client
library via pip:
pip install realtime-pubsub-client
Getting Started
This guide will help you set up and use the realtime-pubsub-client
library in your Python project.
Connecting to the Server
First, import the RealtimeClient
class and create a new instance with the required configuration:
import asyncio
import logging
import os
from realtime_pubsub_client import RealtimeClient
async def main():
async def get_url():
# replace with your access token retrieval strategy
access_token = os.environ.get('ACCESS_TOKEN')
app_id = os.environ.get('APP_ID')
# return the WebSocket URL with the access token
return f"wss://genesis.r7.21no.de/apps/{app_id}?access_token={access_token}"
client_options = {
'logger': logging.getLogger('RealtimeClient'),
'websocket_options': {
'url_provider': get_url,
},
}
client = RealtimeClient(client_options)
async def on_session_started(connection_info):
print('Connection ID:', connection_info['id'])
# Subscribe to topics here
await client.subscribe_remote_topic('topic1')
await client.subscribe_remote_topic('topic2')
client.on('session.started', on_session_started)
await client.connect()
await client.wait_for('session.started')
asyncio.run(main())
Subscribing to Incoming Messages
You can handle messages for specific topics and message types:
Note: The topic and message type are separated by a dot (
.
) in the event name.
def handle_message(message, reply_fn):
# Message handling logic here
print('Received message:', message['data']['payload'])
client.on('topic1.action1', handle_message)
Wildcard subscriptions are also supported:
client.on('topic1.*', handle_message)
Publishing Messages
Publish messages to a topic:
await client.publish('topic1', 'Hello, world!', message_type='text-message')
Responding to Incoming Messages
Set up event listeners to handle incoming messages:
async def handle_message(message, reply_fn):
# Processing the message
print('Received message:', message['data']['payload'])
# Sending a reply
await reply_fn('Message received!', 'ok')
client.on('topic1.text-message', handle_message)
Waiting for Acknowledgements and Replies
wait_for_ack(timeout=None)
: Waits for an acknowledgement of the message, with an optional timeout in seconds.wait_for_reply(timeout=None)
: Waits for a reply to the message, with an optional timeout in seconds.
Wait for the Realtime Gateway acknowledgement after publishing a message:
waiter = await client.publish('secure/peer-to-peer1', 'Hi', message_type='text-message')
await waiter.wait_for_ack()
Wait for the Realtime Gateway acknowledgement after sending a message:
waiter = await client.send({
# Message payload
}, message_type='create')
await waiter.wait_for_ack()
Wait for a reply with a timeout:
waiter = await client.send({
# Message payload
}, message_type='create')
await waiter.wait_for_reply(timeout=5) # Wait for up to 5 seconds
Error Handling
Handle errors and disconnections:
def on_error(error):
print('WebSocket error:', error)
def on_close(event):
print('WebSocket closed:', event)
client.on('error', on_error)
client.on('close', on_close)
API Reference
RealtimeClient
Constructor
RealtimeClient(config)
Creates a new RealtimeClient
instance.
config
: Configuration options for the client.
Methods
-
connect()
: Connects the client to the WebSocket Messaging Gateway.await client.connect()
-
disconnect()
: Terminates the WebSocket connection.await client.disconnect()
-
subscribe_remote_topic(topic)
: Subscribes the connection to a remote topic.await client.subscribe_remote_topic(topic)
-
unsubscribe_remote_topic(topic)
: Unsubscribes the connection from a remote topic.await client.unsubscribe_remote_topic(topic)
-
publish(topic, payload, message_type="broadcast", compress=False, message_id=None)
: Publishes a message to a topic.waiter = await client.publish(topic, payload)
Returns a
WaitFor
instance to wait for acknowledgements or replies. -
send(payload, compress=False, message_id=None)
: Sends a message to the server.waiter = await client.send(payload, options)
Returns a
WaitFor
instance to wait for acknowledgements or replies. -
wait(ms)
: Waits for a specified duration in milliseconds. Utility function for waiting in async functions.await wait(ms)
Events
-
'session.started'
: Emitted when the session starts.client.on('session.started', on_session_started)
-
'error'
: Emitted on WebSocket errors.client.on('error', on_error)
-
'close'
: Emitted when the WebSocket connection closes.client.on('close', on_close)
-
Custom Events: Handle custom events based on topic and message type.
client.on('TOPIC_NAME.MESSAGE_TYPE', handle_message)
Note: Wildcard subscriptions are also supported.
License
This library is licensed under the MIT License.
For more detailed examples and advanced configurations, please refer to the documentation.
Notes
- Ensure that you have an account and an app set up with Realtime Pub/Sub.
- Customize the
url_provider
or URL to retrieve the access token for connecting to your realtime application. - Implement the
get_auth_token
function according to your authentication mechanism. - Optionally use the
logger
option to integrate with your application's logging system. - Handle errors and disconnections gracefully to improve the robustness of your application.
- Make sure to handle timeouts when waiting for replies to avoid hanging operations.
Feel free to contribute to this project by submitting issues or pull requests on GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file realtime_pubsub_client-1.0.0.tar.gz
.
File metadata
- Download URL: realtime_pubsub_client-1.0.0.tar.gz
- Upload date:
- Size: 10.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f7636b4bb916d0a5c0a4d0bb34103eb811b274ab80c5ca3f3811a1348f38bdc0 |
|
MD5 | 84e07c06fd6f79acd3634d806aab3cad |
|
BLAKE2b-256 | 53669551fd8eb7c37da79e71eec669d1bea1493a18c6d75f5afdfb0be49d1d0f |
File details
Details for the file realtime_pubsub_client-1.0.0-py3-none-any.whl
.
File metadata
- Download URL: realtime_pubsub_client-1.0.0-py3-none-any.whl
- Upload date:
- Size: 11.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | be9c04c0f7ed9be24c50bf604343e455506b897169466f013bed84633f28443f |
|
MD5 | 4bfb83f5861473eedab0c0e5456fc81b |
|
BLAKE2b-256 | 4e8e21c351ee0cbb50c5ee4fe4eff76f1acc5e6cc2fcdd28313a1ffcf22aef5e |