Python SDK for EnSync Engine - high-performance real-time messaging via gRPC
Project description
EnSync SDK
Python SDK for EnSync - High-performance message streaming with end-to-end encryption. Built on gRPC for production use.
Installation
pip install ensync-sdk
Quick Start
import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine
load_dotenv()
async def quick_start():
try:
# 1. Initialize engine and create client
engine = EnSyncEngine() # Uses default: grpcs://node.gms.ensync.cloud
client = await engine.create_client(
os.environ.get("ENSYNC_APP_KEY"),
{
"app_decrypt_key": os.environ.get("ENSYNC_SECRET_KEY")
}
)
# 2. Publish a message
await client.publish(
"orders/status",
["appId"], # The appId of the receiving party
{"order_id": "order-123", "status": "completed"}
)
# 3. Subscribe to messages with decorator pattern
subscription = client.subscribe("orders/status")
@subscription.handler
async def handle_message(message):
print(f"Received order update: {message['payload']['order_id']}")
# Process message...
# 5. Keep the program running
try:
await asyncio.Future() # Run indefinitely
except KeyboardInterrupt:
await subscription.unsubscribe()
await client.close()
except Exception as e:
print(f'Error: {e}')
if __name__ == "__main__":
asyncio.run(quick_start())
Usage
Importing
Default (gRPC)
# Import the default engine class (gRPC)
from ensync_sdk import EnSyncEngine
# EnSync Cloud messaging (uses default URL)
engine = EnSyncEngine() # Default: grpcs://node.gms.ensync.cloud
# Or specify self-hosted EnSync Messaging Engine URL
# engine = EnSyncEngine("grpcs://custom-server.com")
# Create authenticated client
client = await engine.create_client("your-app-key")
Note: The default URL (grpcs://node.gms.ensync.cloud) is used for EnSync Cloud messaging. The grpcs:// protocol is required for secure communication.
API Reference
EnSyncEngine (gRPC - Default)
engine = EnSyncEngine(url, options=None)
Parameters
- url (
str, optional): Server URL for EnSync messaging service (default:grpcs://node.gms.ensync.cloud) - options (
dict, optional): Configuration optionsenableLogging(bool, default:False): Enable debug loggingreconnect_interval(int, default:5000): Reconnection delay in msmax_reconnect_attempts(int, default:10): Maximum reconnection attempts
Creating a Client
Initialize the engine with your server URL and create a client with your app key.
# Initialize the engine (uses default URL)
engine = EnSyncEngine()
# Enable logs for debugging
engine_verbose = EnSyncEngine(options={
"enableLogging": True
})
# Create a client
client = await engine.create_client("your-app-key")
Client Creation Parameters
- app_key (
str): Your application access key - options (
dict, optional): Additional optionsapp_decrypt_key(str, optional): Secret key for decryption
Returns
EnSyncClient: Authenticated client instance
Publishing Messages
# Basic publish
await client.publish(
"company/service/message-type", # Message name
["appId"], # Recipients (appIds of receiving parties)
{"data": "your payload"} # Message payload
)
# With optional metadata
await client.publish(
"company/service/message-type",
["appId"], # The appId of the receiving party
{"data": "your payload"},
{"custom_field": "value"} # Optional metadata
)
Publish Parameters
- message_name (
str): Name/type of the message - recipients (
list[str]): List of recipient appIds - payload (
dict): Message data to send - metadata (
dict, optional): Additional metadata (not encrypted)
Replying to Messages
Use the sender field from received messages to reply back:
async def handle_message(message):
# Process the message
print(f"Received message: {message['payload']}")
# Reply back to the sender
sender_app_id = message.get('sender')
if sender_app_id:
await client.publish(
message.get('messageName'),
[sender_app_id], # Send back to the original sender
{"status": "received", "response": "Processing complete"}
)
Subscribing to Messages
# Using decorator pattern (recommended)
engine = EnSyncEngine()
client = await engine.create_client("your-app-key")
# Create subscription with decorator
subscription = client.subscribe("orders/status")
@subscription.handler
async def handle_message(message):
print(f"Order {message['payload']['order_id']} updated")
# Access subscription control methods
await subscription.pause("Maintenance")
await subscription.resume()
# With custom decryption key
subscription2 = client.subscribe("orders/status", app_decrypt_key="secret-key")
@subscription2.handler
async def handle_secure_message(message):
print(f"Received: {message['payload']}")
Message Structure
Messages received by handlers have the following structure:
{
"idem": "message-unique-id",
"messageName": "orders/status",
"block": 12345,
"timestamp": None,
"payload": {"order_id": "123", "status": "completed"},
"sender": "sender-public-key",
"metadata": {"custom_field": "value"}
}
Subscription Control
# Pause message processing
await subscription.pause("Maintenance in progress")
# Resume message processing
await subscription.resume()
# Defer a message (requeue for later)
await subscription.defer(
message['idem'],
delay_ms=5000,
reason="Temporary unavailability"
)
# Discard a message permanently
await subscription.discard(
message['idem'],
reason="Invalid data"
)
# Replay a specific message
replayed_message = await subscription.replay(message['idem'])
# Unsubscribe
await subscription.unsubscribe()
Closing Connections
# Close the client connection
await client.close()
# Using context manager (automatic cleanup)
async with engine.create_client("your-app-key") as client:
await client.publish("message/name", ["appId"], {"data": "value"})
# Connection automatically closed
Error Handling
from ensync_sdk import EnSyncEngine
from ensync_core import EnSyncError
try:
engine = EnSyncEngine()
client = await engine.create_client("your-app-key")
await client.publish(
"orders/order",
["appId"],
{"order_id": "123"}
)
except EnSyncError as e:
print(f"EnSync error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Debugging with Logs
Enable logging to debug connection issues:
engine = EnSyncEngine(options={
"enableLogging": True
})
Complete Examples
Publishing Example
import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine
load_dotenv()
async def publishing_example():
# Create client
engine = EnSyncEngine()
client = await engine.create_client(os.environ.get("ENSYNC_APP_KEY"))
# Basic publish - returns message ID
message_id = await client.publish(
"notifications/email",
["appId"], # The appId of the receiving party
{"to": "user@example.com", "subject": "Welcome!"}
)
print(f"Published message: {message_id}")
# With metadata
message_id = await client.publish(
"notifications/email",
["appId"],
{"to": "user@example.com", "subject": "Welcome!"},
{"source": "email-service", "priority": "high"}
)
print(f"Published message with metadata: {message_id}")
await client.close()
if __name__ == "__main__":
asyncio.run(publishing_example())
Subscribing Example
import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine
load_dotenv()
async def subscribing_example():
engine = EnSyncEngine()
client = await engine.create_client(os.environ.get("ENSYNC_APP_KEY"))
# Subscribe to messages with decorator pattern
subscription = client.subscribe("notifications/email")
@subscription.handler
async def handle_email_notification(message):
email_data = message['payload']
print(f"Email sent to: {email_data['to']}")
print(f"Subject: {email_data['subject']}")
# Message is automatically acknowledged
# Keep running
try:
await asyncio.Future()
except KeyboardInterrupt:
await subscription.unsubscribe()
await client.close()
if __name__ == "__main__":
asyncio.run(subscribing_example())
Best Practices
Connection Management
- Use context managers for automatic cleanup
- Handle reconnection gracefully with appropriate intervals
- Close connections properly when shutting down
Message Design
- Ensure the message name already exists in EnSync (provisioned via the UI or API) before publishing; only registered names are accepted by the platform
- Use hierarchical message names:
company/service/message-type - Ensure payloads comply with any schema registered for that message name (schemas are optional but enforced when present)
- Use metadata for non-sensitive routing information
Security Best Practices
- Store access keys in environment variables
- Use
app_decrypt_keyfor additional decryption layer - Never log or expose encryption keys
- Validate message payloads before processing
Performance Optimization
- Use gRPC (default) for better performance than WebSocket
- Enable connection pooling for high-throughput scenarios
- Batch related messages when possible
- Use appropriate
reconnect_intervalbased on your use case
Documentation
For complete documentation, examples, and API reference, visit:
Related Packages
- ensync-core: Core utilities and error handling (automatically installed)
- ensync-websocket: WebSocket alternative client
License
MIT License - see LICENSE file for details
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ensync_sdk-0.4.4.tar.gz.
File metadata
- Download URL: ensync_sdk-0.4.4.tar.gz
- Upload date:
- Size: 20.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
880817c48f13d8405c376e12b761dfe8702f0d695b87b1c95c24adde2b088c6f
|
|
| MD5 |
6825688dd6d5abfe57f8b14844c9eef3
|
|
| BLAKE2b-256 |
0ee26efb57bbc3f6ea1bb800afbb277e285829c11999aea314d4d0c659f089ea
|
File details
Details for the file ensync_sdk-0.4.4-py3-none-any.whl.
File metadata
- Download URL: ensync_sdk-0.4.4-py3-none-any.whl
- Upload date:
- Size: 18.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0319c0acf7d99e031a608da21bff39e579c03c6f6bc0f34b956970cfe983476e
|
|
| MD5 |
439b0f20969f6e488b6aa4ac89569809
|
|
| BLAKE2b-256 |
4b9457ab295bc2499e83b37663e8e3bc53f558ce81da47af740cfac5894bdb9f
|