Skip to main content

Python SDK for EnSync Engine - high-performance real-time messaging via gRPC

Project description

EnSync SDK

Python SDK for EnSync - High-performance message streaming with end-to-end encryption. Built on gRPC for production use.

Installation

pip install ensync-sdk

Quick Start

import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine

load_dotenv()

async def quick_start():
    try:
        # 1. Initialize engine and create client
        engine = EnSyncEngine()  # Uses default: grpcs://node.gms.ensync.cloud
        client = await engine.create_client(
            os.environ.get("ENSYNC_APP_KEY"),
            {
                "app_decrypt_key": os.environ.get("ENSYNC_SECRET_KEY")
            }
        )
        
        # 2. Publish a message
        await client.publish(
            "orders/status",
            ["appId"],  # The appId of the receiving party
            {"order_id": "order-123", "status": "completed"}
        )
        
        # 3. Subscribe to messages with decorator pattern
        subscription = client.subscribe("orders/status")
        
        @subscription.handler
        async def handle_message(message):
            print(f"Received order update: {message['payload']['order_id']}")
            # Process message...
        
        # 5. Keep the program running
        try:
            await asyncio.Future()  # Run indefinitely
        except KeyboardInterrupt:
            await subscription.unsubscribe()
            await client.close()
            
    except Exception as e:
        print(f'Error: {e}')

if __name__ == "__main__":
    asyncio.run(quick_start())

Usage

Importing

Default (gRPC)

# Import the default engine class (gRPC)
from ensync_sdk import EnSyncEngine

# EnSync Cloud messaging (uses default URL)
engine = EnSyncEngine()  # Default: grpcs://node.gms.ensync.cloud

# Or specify self-hosted EnSync Messaging Engine URL
# engine = EnSyncEngine("grpcs://custom-server.com")

# Create authenticated client
client = await engine.create_client("your-app-key")

Note: The default URL (grpcs://node.gms.ensync.cloud) is used for EnSync Cloud messaging. The grpcs:// protocol is required for secure communication.

API Reference

EnSyncEngine (gRPC - Default)

engine = EnSyncEngine(url, options=None)

Parameters

  • url (str, optional): Server URL for EnSync messaging service (default: grpcs://node.gms.ensync.cloud)
  • options (dict, optional): Configuration options
    • enableLogging (bool, default: False): Enable debug logging
    • reconnect_interval (int, default: 5000): Reconnection delay in ms
    • max_reconnect_attempts (int, default: 10): Maximum reconnection attempts

Creating a Client

Initialize the engine with your server URL and create a client with your app key.

# Initialize the engine (uses default URL)
engine = EnSyncEngine()

# Enable logs for debugging
engine_verbose = EnSyncEngine(options={
    "enableLogging": True
})

# Create a client
client = await engine.create_client("your-app-key")

Client Creation Parameters

  • app_key (str): Your application access key
  • options (dict, optional): Additional options
    • app_decrypt_key (str, optional): Secret key for decryption

Returns

EnSyncClient: Authenticated client instance

Publishing Messages

# Basic publish
await client.publish(
    "company/service/message-type",  # Message name
    ["appId"],                      # Recipients (appIds of receiving parties)
    {"data": "your payload"}        # Message payload
)

# With optional metadata
await client.publish(
    "company/service/message-type",
    ["appId"],                      # The appId of the receiving party
    {"data": "your payload"},
    {"custom_field": "value"}       # Optional metadata
)

Publish Parameters

  • message_name (str): Name/type of the message
  • recipients (list[str]): List of recipient appIds
  • payload (dict): Message data to send
  • metadata (dict, optional): Additional metadata (not encrypted)

Replying to Messages

Use the sender field from received messages to reply back:

async def handle_message(message):
    # Process the message
    print(f"Received message: {message['payload']}")
    
    # Reply back to the sender
    sender_app_id = message.get('sender')
    if sender_app_id:
        await client.publish(
            message.get('messageName'),
            [sender_app_id],  # Send back to the original sender
            {"status": "received", "response": "Processing complete"}
        )

Subscribing to Messages

# Using decorator pattern (recommended)
engine = EnSyncEngine()
client = await engine.create_client("your-app-key")

# Create subscription with decorator
subscription = client.subscribe("orders/status")

@subscription.handler
async def handle_message(message):
    print(f"Order {message['payload']['order_id']} updated")

# Access subscription control methods
await subscription.pause("Maintenance")
await subscription.resume()

# With custom decryption key
subscription2 = client.subscribe("orders/status", app_decrypt_key="secret-key")

@subscription2.handler
async def handle_secure_message(message):
    print(f"Received: {message['payload']}")

Message Structure

Messages received by handlers have the following structure:

{
    "idem": "message-unique-id",
    "messageName": "orders/status",
    "block": 12345,
    "timestamp": None,
    "payload": {"order_id": "123", "status": "completed"},
    "sender": "sender-public-key",
    "metadata": {"custom_field": "value"}
}

Subscription Control

# Pause message processing
await subscription.pause("Maintenance in progress")

# Resume message processing
await subscription.resume()

# Defer a message (requeue for later)
await subscription.defer(
    message['idem'],
    delay_ms=5000,
    reason="Temporary unavailability"
)

# Discard a message permanently
await subscription.discard(
    message['idem'],
    reason="Invalid data"
)

# Replay a specific message
replayed_message = await subscription.replay(message['idem'])

# Unsubscribe
await subscription.unsubscribe()

Closing Connections

# Close the client connection
await client.close()

# Using context manager (automatic cleanup)
async with engine.create_client("your-app-key") as client:
    await client.publish("message/name", ["appId"], {"data": "value"})
    # Connection automatically closed

Error Handling

from ensync_sdk import EnSyncEngine
from ensync_core import EnSyncError

try:
    engine = EnSyncEngine()
    client = await engine.create_client("your-app-key")
    
    await client.publish(
        "orders/order",
        ["appId"],
        {"order_id": "123"}
    )
except EnSyncError as e:
    print(f"EnSync error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Debugging with Logs

Enable logging to debug connection issues:

engine = EnSyncEngine(options={
    "enableLogging": True
})

Complete Examples

Publishing Example

import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine

load_dotenv()

async def publishing_example():
    # Create client
    engine = EnSyncEngine()
    client = await engine.create_client(os.environ.get("ENSYNC_APP_KEY"))
    
    # Basic publish - returns message ID
    message_id = await client.publish(
        "notifications/email",
        ["appId"],  # The appId of the receiving party
        {"to": "user@example.com", "subject": "Welcome!"}
    )
    print(f"Published message: {message_id}")
    
    # With metadata
    message_id = await client.publish(
        "notifications/email",
        ["appId"],
        {"to": "user@example.com", "subject": "Welcome!"},
        {"source": "email-service", "priority": "high"}
    )
    print(f"Published message with metadata: {message_id}")
    
    await client.close()

if __name__ == "__main__":
    asyncio.run(publishing_example())

Subscribing Example

import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine

load_dotenv()

async def subscribing_example():
    engine = EnSyncEngine()
    client = await engine.create_client(os.environ.get("ENSYNC_APP_KEY"))
    
    # Subscribe to messages with decorator pattern
    subscription = client.subscribe("notifications/email")
    
    @subscription.handler
    async def handle_email_notification(message):
        email_data = message['payload']
        print(f"Email sent to: {email_data['to']}")
        print(f"Subject: {email_data['subject']}")
        
        # Message is automatically acknowledged
    
    # Keep running
    try:
        await asyncio.Future()
    except KeyboardInterrupt:
        await subscription.unsubscribe()
        await client.close()

if __name__ == "__main__":
    asyncio.run(subscribing_example())

Best Practices

Connection Management

  • Use context managers for automatic cleanup
  • Handle reconnection gracefully with appropriate intervals
  • Close connections properly when shutting down

Message Design

  • Ensure the message name already exists in EnSync (provisioned via the UI or API) before publishing; only registered names are accepted by the platform
  • Use hierarchical message names: company/service/message-type
  • Ensure payloads comply with any schema registered for that message name (schemas are optional but enforced when present)
  • Use metadata for non-sensitive routing information

Security Best Practices

  • Store access keys in environment variables
  • Use app_decrypt_key for additional decryption layer
  • Never log or expose encryption keys
  • Validate message payloads before processing

Performance Optimization

  • Use gRPC (default) for better performance than WebSocket
  • Enable connection pooling for high-throughput scenarios
  • Batch related messages when possible
  • Use appropriate reconnect_interval based on your use case

Documentation

For complete documentation, examples, and API reference, visit:

Related Packages

  • ensync-core: Core utilities and error handling (automatically installed)
  • ensync-websocket: WebSocket alternative client

License

MIT License - see LICENSE file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ensync_sdk-0.4.3.tar.gz (20.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ensync_sdk-0.4.3-py3-none-any.whl (18.0 kB view details)

Uploaded Python 3

File details

Details for the file ensync_sdk-0.4.3.tar.gz.

File metadata

  • Download URL: ensync_sdk-0.4.3.tar.gz
  • Upload date:
  • Size: 20.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for ensync_sdk-0.4.3.tar.gz
Algorithm Hash digest
SHA256 2e94a987fabcf15e4e09b7d0327f51dfb905efb82e74888c60e4b239f6b9a948
MD5 5a18a57c248af62f929d2eeec3e729f8
BLAKE2b-256 e629aa15070d85b6e93cc1331cf3375e89dc706df5b0a2301e9734f8f5d01c1b

See more details on using hashes here.

File details

Details for the file ensync_sdk-0.4.3-py3-none-any.whl.

File metadata

  • Download URL: ensync_sdk-0.4.3-py3-none-any.whl
  • Upload date:
  • Size: 18.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for ensync_sdk-0.4.3-py3-none-any.whl
Algorithm Hash digest
SHA256 39b0d97d1b9c73df99408964561a829212b40f7407c28838bdc198ffefe34ff8
MD5 efb283bc3a6e2193de439f38ff99b13b
BLAKE2b-256 68fe04fb4b609f222917a104e7120bd44357663fae09fcc9bb3256be630dd4c4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page