Skip to main content

Python SDK for EnSync Engine - high-performance real-time messaging via gRPC

Project description

EnSync SDK

Python SDK for EnSync - High-performance message streaming with end-to-end encryption. Built on gRPC for production use.

Installation

pip install ensync-sdk

Quick Start

import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine

load_dotenv()

async def quick_start():
    try:
        # 1. Initialize engine and create client
        engine = EnSyncEngine("node.ensync.cloud")
        client = await engine.create_client(
            os.environ.get("ENSYNC_APP_KEY"),
            {
                "app_secret_key": os.environ.get("ENSYNC_SECRET_KEY")
            }
        )
        
        # 2. Publish an message
        await client.publish(
            "orders/status/updated",
            ["appId"],  # The appId of the receiving party
            {"order_id": "order-123", "status": "completed"}
        )
        
        # 3. Subscribe to messages
        subscription = await client.subscribe("orders/status/updated")
        
        # 4. Handle incoming messages
        async def handle_message(message):
            print(f"Received order update: {message['payload']['order_id']}")
            # Process message...
        
        subscription.on(handle_message)
        
        # 5. Keep the program running
        try:
            await asyncio.Future()  # Run indefinitely
        except KeyboardInterrupt:
            await subscription.unsubscribe()
            await client.close()
            
    except Exception as e:
        print(f'Error: {e}')

if __name__ == "__main__":
    asyncio.run(quick_start())

Usage

Importing

Default (gRPC)

# Import the default engine class (gRPC)
from ensync_sdk import EnSyncEngine

# Production - uses secure TLS on port 443 by default
engine = EnSyncEngine("node.ensync.cloud")

# Development - uses insecure connection on port 50051 by default
# engine = EnSyncEngine("localhost")

# Create authenticated client
client = await engine.create_client("your-app-key")

WebSocket Alternative

# Import the WebSocket engine class
from ensync_sdk import EnSyncWebSocketEngine

# Initialize WebSocket client
engine = EnSyncWebSocketEngine("wss://node.ensync.cloud")
client = await engine.create_client("your-app-key")

Connection URL Guidelines:

  • Production URLs automatically use secure TLS (port 443)
  • localhost automatically uses insecure connection (port 50051)
  • Explicit protocols: grpcs:// (secure) or grpc:// (insecure)
  • Custom ports: node.ensync.cloud:9090

API Reference

EnSyncEngine (gRPC - Default)

engine = EnSyncEngine(url, options=None)

Parameters

  • url (str): Server URL for EnSync service
  • options (dict, optional): Configuration options
    • enableLogging (bool, default: False): Enable debug logging
    • disable_tls (bool, default: False): Disable TLS encryption
    • reconnect_interval (int, default: 5000): Reconnection delay in ms
    • max_reconnect_attempts (int, default: 10): Maximum reconnection attempts

Creating a Client

Initialize the engine with your server URL and create a client with your app key.

# Initialize the engine (gRPC with TLS)
engine = EnSyncEngine("node.ensync.cloud")

# Enable logs for debugging
engine_verbose = EnSyncEngine("node.ensync.cloud", {
    "enableLogging": True
})

# Create a client
client = await engine.create_client("your-app-key")

Client Creation Parameters

  • app_key (str): Your application access key
  • options (dict, optional): Additional options
    • app_secret_key (str, optional): Secret key for encryption

Returns

EnSyncClient: Authenticated client instance

Publishing Events

# Basic publish
await client.publish(
    "company/service/message-type",  # Event name
    ["appId"],                      # Recipients (appIds of receiving parties)
    {"data": "your payload"}        # Event payload
)

# With optional metadata
await client.publish(
    "company/service/message-type",
    ["appId"],                      # The appId of the receiving party
    {"data": "your payload"},
    {"custom_field": "value"}       # Optional metadata
)

Publish Parameters

  • message_name (str): Name/type of the message
  • recipients (list[str]): List of recipient appIds
  • payload (dict): Event data to send
  • metadata (dict, optional): Additional metadata (not encrypted)

Replying to Events

Use the sender field from received messages to reply back:

async def handle_message(message):
    # Process the message
    print(f"Received: {message['payload']}")
    
    # Reply back to the sender
    sender_public_key = message.get('sender')
    if sender_public_key:
        await client.publish(
            message.get('messageName'),
            [sender_public_key],  # Send back to the original sender
            {"status": "received", "response": "Processing complete"}
        )

Subscribing to Events

# Subscribe to an message
subscription = await client.subscribe("orders/status/updated")

# Register message handler
async def handle_message(message):
    print(f"Order {message['payload']['order_id']} updated")
    
    # Manual acknowledgment (if auto_ack is False)
    await subscription.ack(message['idem'], message['block'])

subscription.on(handle_message)

# Subscribe with options
subscription = await client.subscribe(
    "orders/status/updated",
    auto_ack=False,              # Disable automatic acknowledgment
    app_secret_key="secret-key"  # Override default encryption key
)

Event Structure

Events received by handlers have the following structure:

{
    "idem": "message-unique-id",
    "messageName": "orders/status/updated",
    "block": 12345,
    "timestamp": None,
    "payload": {"order_id": "123", "status": "completed"},
    "sender": "sender-public-key",
    "metadata": {"custom_field": "value"}
}

Subscription Control

# Pause message processing
await subscription.pause("Maintenance in progress")

# Resume message processing
await subscription.resume()

# Defer an message (requeue for later)
await subscription.defer(
    message['idem'],
    delay_ms=5000,
    reason="Temporary unavailability"
)

# Discard an message permanently
await subscription.discard(
    message['idem'],
    reason="Invalid data"
)

# Replay a specific message
replayed_message = await subscription.replay(message['idem'])

# Unsubscribe
await subscription.unsubscribe()

Closing Connections

# Close the client connection
await client.close()

# Using context manager (automatic cleanup)
async with engine.create_client("your-app-key") as client:
    await client.publish("message/name", ["appId"], {"data": "value"})
    # Connection automatically closed

Error Handling

from ensync_sdk import EnSyncEngine
from ensync_core import EnSyncError

try:
    engine = EnSyncEngine("node.ensync.cloud")
    client = await engine.create_client("your-app-key")
    
    await client.publish(
        "orders/created",
        ["appId"],
        {"order_id": "123"}
    )
except EnSyncError as e:
    print(f"EnSync error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Debugging with Logs

Enable logging to debug connection issues:

engine = EnSyncEngine("node.ensync.cloud", {
    "enableLogging": True
})

Complete Examples

Publishing Example

import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine

load_dotenv()

async def publishing_example():
    # Create client
    engine = EnSyncEngine("node.ensync.cloud")
    client = await engine.create_client(os.environ.get("ENSYNC_APP_KEY"))
    
    # Basic publish - returns message ID
    message_id = await client.publish(
        "notifications/email/sent",
        ["appId"],  # The appId of the receiving party
        {"to": "user@example.com", "subject": "Welcome!"}
    )
    print(f"Published message: {message_id}")
    
    # With metadata
    message_id = await client.publish(
        "notifications/email/sent",
        ["appId"],
        {"to": "user@example.com", "subject": "Welcome!"},
        {"source": "email-service", "priority": "high"}
    )
    print(f"Published message with metadata: {message_id}")
    
    await client.close()

if __name__ == "__main__":
    asyncio.run(publishing_example())

Subscribing Example

import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine

load_dotenv()

async def subscribing_example():
    engine = EnSyncEngine("node.ensync.cloud")
    client = await engine.create_client(os.environ.get("ENSYNC_APP_KEY"))
    
    # Subscribe to messages
    subscription = await client.subscribe("notifications/email/sent")
    
    # Handle messages
    async def handle_email_notification(message):
        email_data = message['payload']
        print(f"Email sent to: {email_data['to']}")
        print(f"Subject: {email_data['subject']}")
        
        # Event is automatically acknowledged
    
    subscription.on(handle_email_notification)
    
    # Keep running
    try:
        await asyncio.Future()
    except KeyboardInterrupt:
        await subscription.unsubscribe()
        await client.close()

if __name__ == "__main__":
    asyncio.run(subscribing_example())

Best Practices

Connection Management

  • Use context managers for automatic cleanup
  • Handle reconnection gracefully with appropriate intervals
  • Close connections properly when shutting down

Event Design

  • Use hierarchical message names: company/service/message-type
  • Keep payloads focused and minimal
  • Use metadata for non-sensitive routing information

Security Best Practices

  • Store access keys in environment variables
  • Use app_secret_key for additional encryption layer
  • Never log or expose encryption keys
  • Validate message payloads before processing

Performance Optimization

  • Use gRPC (default) for better performance than WebSocket
  • Enable connection pooling for high-throughput scenarios
  • Batch related messages when possible
  • Use appropriate reconnect_interval based on your use case

Documentation

For complete documentation, examples, and API reference, visit:

Related Packages

  • ensync-core: Core utilities and error handling (automatically installed)
  • ensync-websocket: WebSocket alternative client

License

MIT License - see LICENSE file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ensync_sdk-0.4.2.tar.gz (21.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ensync_sdk-0.4.2-py3-none-any.whl (19.2 kB view details)

Uploaded Python 3

File details

Details for the file ensync_sdk-0.4.2.tar.gz.

File metadata

  • Download URL: ensync_sdk-0.4.2.tar.gz
  • Upload date:
  • Size: 21.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for ensync_sdk-0.4.2.tar.gz
Algorithm Hash digest
SHA256 ecffcdc15a2222a827a855304e6de4da09cba6811bf9cd2f75d1ad348983a6cb
MD5 bb007be4210de28b693dfed57ba8d6f9
BLAKE2b-256 e1998bac0224692117518afb49da5cd030d4f14e54b91249761e2f97c39dfa10

See more details on using hashes here.

File details

Details for the file ensync_sdk-0.4.2-py3-none-any.whl.

File metadata

  • Download URL: ensync_sdk-0.4.2-py3-none-any.whl
  • Upload date:
  • Size: 19.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for ensync_sdk-0.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 0278726134b0afd38f6e4235f0803f1c69ff72ff86b2f7167cf6b6fbf45420e0
MD5 714a0db29b8c29fa5d81548c6beee93d
BLAKE2b-256 37525ef67abbc54a07d0a1b986e975cbb2a1f3ba051b1b3d98e8db0ac81c7629

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page