Python client for NotifyRelay pub/sub messaging service
Project description
NotifyRelay Python Client
A Python client library for the NotifyRelay pub/sub messaging service. Provides a queue-based message retrieval system with background polling for seamless integration into Python applications.
Features
- Queue-based message retrieval: Background thread polls for messages, main loop retrieves at its own pace
- Thread-safe: All operations are thread-safe for concurrent access
- Optional JSON deserialization: Automatic JSON parsing per topic with error handling
- Topic management: Dynamic subscription and unsubscription
- Publisher support: Simple API for publishing messages
- Context manager support: Clean resource management
- No callbacks required: Pull-based model gives your application full control
Installation
From source
cd python-client
pip install -e .
From PyPI (when published)
pip install notifyrelay
Quick Start
Subscribing to Messages
from notifyrelay import NotifyRelayClient
import time
# Create client
client = NotifyRelayClient(
base_url="https://your-app.herokuapp.com",
subscribe_key="your-subscribe-key"
)
# Create subscriber
subscriber = client.create_subscriber(
subscriber_id="my-unique-id",
subscriber_name="my-app"
)
# Subscribe to topics
subscriber.subscribe(["alerts", "logs"])
# Start background polling
subscriber.start()
# Main application loop
while True:
# Do your application work...
process_data()
# Check for new messages (non-blocking)
messages = subscriber.get_messages()
for msg in messages:
print(f"[{msg['topic']}] {msg['message']}")
time.sleep(0.1)
# Clean up
subscriber.stop()
Publishing Messages
from notifyrelay import NotifyRelayClient
client = NotifyRelayClient(
base_url="https://your-app.herokuapp.com",
publish_key="your-publish-key"
)
# Publish a message
result = client.publish("alerts", "System alert: High CPU usage")
print(f"Message published with ID: {result['messageId']}")
JSON Messages
import json
# Subscribe with automatic JSON parsing
subscriber.subscribe(["data"], json_mode=True)
subscriber.start()
# Later in your loop...
messages = subscriber.get_messages()
for msg in messages:
if msg['topic'] == 'data':
data = msg['message'] # Already parsed as dict/list
# Check for parse errors
if isinstance(data, dict) and '_json_parse_error' in data:
print(f"JSON parse error: {data['_json_parse_error']}")
print(f"Raw message: {data['_raw_message']}")
else:
print(f"Received data: {data}")
# Publishing JSON (serialize to string)
client.publish("data", json.dumps({"temperature": 72, "humidity": 45}))
API Reference
NotifyRelayClient
Main client for interacting with NotifyRelay service.
Constructor:
client = NotifyRelayClient(
base_url: str,
publish_key: Optional[str] = None,
subscribe_key: Optional[str] = None
)
Methods:
publish(topic: str, message: str) -> dict: Publish a message to a topiccreate_subscriber(subscriber_id: str, subscriber_name: str) -> Subscriber: Create a new subscriberget_status() -> dict: Get server statusget_subscribers() -> list: Get list of active subscribers
Subscriber
Handles message polling and queueing in a background thread.
Methods:
subscribe(topics: List[str], json_mode: bool = False): Set topics to subscribe toadd_topics(topics: List[str], json_mode: bool = False): Add more topicsremove_topics(topics: List[str]): Remove topicsstart(): Start background polling threadstop(): Stop background polling threadis_running() -> bool: Check if subscriber is runningget_messages(topic: Optional[str] = None, block: bool = False, timeout: Optional[float] = None) -> List[dict]: Get pending messagespeek_messages(topic: Optional[str] = None) -> List[dict]: View messages without removingclear_messages(): Clear all pending messagesqueue_size() -> int: Get number of pending messages
Message Format:
{
'topic': 'topic-name',
'message': 'message-content', # or parsed dict/list if json_mode=True
'timestamp': '2024-01-01T12:00:00.000Z', # Server timestamp
'received_at': 1234567890.123 # Local timestamp when received
}
Context Manager:
with subscriber:
# Subscriber automatically starts
while running:
messages = subscriber.get_messages()
process(messages)
# Subscriber automatically stops
Usage Patterns
Integration with Existing Event Loop
class MyApplication:
def __init__(self):
client = NotifyRelayClient(
base_url=config.NOTIFYRELAY_URL,
subscribe_key=config.SUBSCRIBE_KEY
)
self.subscriber = client.create_subscriber("my-app", "MyApp")
self.subscriber.subscribe(["commands", "events"])
self.subscriber.start()
def run(self):
while self.running:
# Your application's main work
self.process_frames()
self.update_ui()
# Check for relay messages
messages = self.subscriber.get_messages()
for msg in messages:
self.handle_relay_message(msg)
time.sleep(0.016) # ~60 FPS
def shutdown(self):
self.subscriber.stop()
Topic-Specific Filtering
# Get only messages for a specific topic
alert_messages = subscriber.get_messages(topic="alerts")
# Process different topics separately
for msg in subscriber.get_messages(topic="logs"):
logger.log(msg['message'])
for msg in subscriber.get_messages(topic="commands"):
executor.execute(msg['message'])
Blocking Wait for Messages
# Block until at least one message arrives (with timeout)
messages = subscriber.get_messages(block=True, timeout=5.0)
if messages:
print(f"Received {len(messages)} messages")
else:
print("Timeout - no messages received")
Examples
See the examples/ directory for complete working examples:
simple_subscriber.py: Basic message subscriptionjson_messages.py: Automatic JSON parsingpublisher.py: Publishing messages
Error Handling
from notifyrelay import NotifyRelayClient, AuthenticationError, ConnectionError
try:
client = NotifyRelayClient(base_url=url, publish_key=key)
client.publish("topic", "message")
except AuthenticationError:
print("Invalid authentication key")
except ConnectionError:
print("Failed to connect to server")
Requirements
- Python >= 3.7
- requests >= 2.25.0
License
MIT License - see LICENSE file for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Deploying to PyPI
For maintainers, to publish a new version to PyPI:
-
Update version: Increment the version number in both
setup.pyandpyproject.toml -
Install build tools (if not already installed):
pip install --upgrade build twine
-
Build the distribution packages:
cd python-client python -m build
-
Upload to PyPI:
python -m twine upload dist/*
You will be prompted for your PyPI username and password (or API token).
-
Verify the upload: Check https://pypi.org/project/notifyrelay/ to confirm the new version is available
Note: Make sure to clean the dist/ directory between releases to avoid uploading old versions:
rm -rf dist/
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file notifyrelay-0.1.0.tar.gz.
File metadata
- Download URL: notifyrelay-0.1.0.tar.gz
- Upload date:
- Size: 13.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a88c58b22966ddb1eef57e81bbc61611b365ae9f17239fac78d522d47d4cdbb7
|
|
| MD5 |
24d41a9522b1ba7bf4e5b6ee163e6fad
|
|
| BLAKE2b-256 |
ee36ce945b53a6ff551cd6f01ef9cc975daf57c405786e8e3b0ff978f6605816
|
File details
Details for the file notifyrelay-0.1.0-py3-none-any.whl.
File metadata
- Download URL: notifyrelay-0.1.0-py3-none-any.whl
- Upload date:
- Size: 11.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
91d4a511029ac7f86dfe506e921c51e139d4448f0819e688b555c2cb4df3633b
|
|
| MD5 |
c641c3a786d52e9fb2ea5ce9b943bf44
|
|
| BLAKE2b-256 |
7dc4353246316ee44856060d83edb3ae52858fb774a890ed3406ce11baf59c9e
|