Skip to main content

DPSN MQTT client for managing topic subscriptions and publications

Project description

dpsn-client (Python)

Overview

dpsn-client is a Python SDK for interacting with the DPSN infrastructure. It allows you to connect to a DPSN broker using your Ethereum wallet for authentication, publish messages to topics.Explore and subscribe to existing data streams directly from the DPSN Streams Marketplace.

For more information, visit:

Installation

pip install dpsn-client

Note: If installing from source or locally, navigate to the root directory and run pip install -e .

Usage

Prerequisites

  • DPSN URL: The hostname of the DPSN MQTT broker (e.g., betanet.dpsn.org).
  • Wallet Private Key: Your Ethereum private key (starting with 0x). This wallet will be used for authentication. Ensure it's kept secure.
  • Chain Options: Specify the network (mainnet or testnet) and chain type (ethereum).

(Blockchain interaction features like purchasing topics, fetching owned topics, and getting topic prices are planned for future versions and require additional configuration not yet implemented in this Python client.)

Import the Library

from dpsn_client import DpsnClient, DPSNError

Create Client Instance

Initialize the client with your DPSN broker URL, private key, and chain options.

dpsn_url = "your-dpsn-broker.com"  # e.g., betanet.dpsn.org
private_key = "0xYOUR_PRIVATE_KEY" # Replace with your actual private key

client = DpsnClient(
    dpsn_url=dpsn_url,
    private_key=private_key,
    chain_options={
        "network": "testnet",        # or "mainnet"
        "wallet_chain_type": "ethereum"
    },
    # Optional: connection_options={'ssl': True} # Default is True
)

Understanding DPSN Topics

Topics in DPSN are data distribution channels designed for secure, permissioned data streams associated with blockchain wallets.

  • Ownership-based channels: While topic purchase happens on the blockchain (feature planned), the MQTT authentication relies on the owner's wallet.

  • Data streams: Authenticated publishers push data to topics, and subscribers receive data.

  • Authenticated channels: The client uses signatures derived from the owner's private key to authenticate actions like connecting and publishing.

  • When publishing data, the publishing client uses the same private key that purchased the topic to authenticate the request, ensuring only authorized wallets can publish to their owned topics.

(Full topic lifecycle management via smart contracts, including purchasing and ownership verification).

Setup Blockchain Configuration

To interact with the DPSN contract for topic management, you need to set the blockchain configuration:

# Configure blockchain connectivity
rpc_url = "https://sepolia.infura.io/v3/YOUR_INFURA_KEY"  # Replace with your RPC provider
contract_address = "0x5dE897F4Cedf01d6c4eFD5e56A02278ddF93e54f"              # Replace with deployed contract address

# Set up the blockchain config
contract = client.set_blockchain_config(
    rpc_url=rpc_url,
    contract_address=contract_address
)

Purchasing a Topic

To publish data to DPSN, you first need to purchase a topic. This operation registers your topic on the blockchain:

try:
    # Purchase a new topic (this is a blockchain transaction that costs ETH)
    topic_name = "BTC/USDT"  # Human-readable name
    result = client.purchase_topic(
        topic_name=topic_name,
        confirmations=2,      # Number of confirmations to wait for
        timeout=120           # Timeout in seconds
    )
    
    # Save the generated topic hash for publishing
    topic_hash = result["topic"]  # Format: 0x...
    print(f"Successfully purchased topic '{topic_name}' with hash: {topic_hash}")
    print(f"Transaction hash: {result['tx_hash']}")
    
except DPSNError as e:
    print(f"Failed to purchase topic: {e}")

Fetching Your Owned Topics

You can retrieve all topics owned by your wallet:

try:
    topics = client.fetch_owned_topics()
    print(f"Your wallet owns {len(topics)} topics:")
    for topic in topics:
        print(f"- {topic['name']}: {topic['topic']} (registered at {topic['registered_at']})")
except DPSNError as e:
    print(f"Failed to fetch topics: {e}")

Get Topic Price

Check the current price required to register a topic:

try:
    price_wei = client.get_topic_price()
    price_eth = float(client.web3.from_wei(price_wei, 'ether'))
    print(f"Current topic price: {price_eth} ETH ({price_wei} wei)")
except DPSNError as e:
    print(f"Failed to get topic price: {e}")

Setup Event Handlers

Use decorators to define handlers for asynchronous events like receiving messages or errors. Set these up before calling init().

def handle_message(message_data):
    """Handles incoming messages."""
    topic = message_data.get('topic')
    payload = message_data.get('payload')
    logger.info(f"Received message on topic '{topic}': {payload}")

def handle_error(error: DPSNError):
    """Handles errors reported by the client."""
    logger.error(f"DPSN Client Error: {error}")

# You can also add handlers later if needed:
# client.on_msg += another_message_handler
# client.on_error += another_error_handler

Initialize DPSN Client

Connect to the DPSN MQTT broker. This authenticates using a signature generated from your private key.

# Initialize the client connection
try:
    client.init()
    print("Connected to DPSN MQTT broker successfully")
except DPSNError as e:
    print(f"Failed to connect: {e}")

Publish Data

Publish a JSON-serializable message to a specific topic string. The topic must start with your wallet address (0x...).

Caution: You must initialize the client with the private key corresponding to the wallet address used in the topic prefix.

# Example topic
publish_topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker"
data_to_publish = {"ticker": "BTC/USDT", "price":83387.13000000, "timestamp": time.time()}

try:
    #Publishing to topic
    client.publish(publish_topic, data_to_publish, options={'qos': 1})
    #Message published successfully
except DPSNError as e:
    logger.error(f"Failed to publish: {e}")
except ValueError as e:
    logger.error(f"Publish error (invalid topic format?): {e}")

Subscribing to Topics

Subscribe to a topic pattern to receive messages.

subscribe_topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker" # Subscribe to topic 

try:
    #Subscribing to topic
    client.subscribe(subscribe_topic, options={'qos': 1})
    #Successfully subscribed to the topic

    # Keep the script running to receive messages
    logger.info("Waiting for messages... Press Ctrl+C to exit.")
    while True:
        time.sleep(1)

except DPSNError as e:
    logger.error(f"Failed to subscribe: {e}")
except KeyboardInterrupt:
    logger.info("Interrupted by user.")

Unsubscribing from Topics

Stop receiving messages from a specific topic.

# Unsubscribe ongoing topic subscription
try:
    client.unsubscribe(subscribe_topic)
except DPSNError as e:
    print(f"Failed to unsubscribe: {e}")

Disconnect

Disconnect cleanly from the MQTT broker.

# Disconnect client connection
try:
    client.disconnect()
except DPSNError as e:
    print(f"Error during disconnect: {e}")

API Reference

Class DpsnClient

Manages the connection, authentication, and messaging with the DPSN client. Inherits from events.Events.

Constructor

DpsnClient(
    dpsn_url: str,
    private_key: str,
    chain_options: Dict[str, Any],
    connection_options: Dict[str, Any] = None
)
  • dpsn_url: Hostname of the DPSN broker.
  • private_key: Ethereum wallet private key (0x...).
  • chain_options: Dictionary, requires {"network": "mainnet"|"testnet", "wallet_chain_type": "ethereum"}.
  • connection_options: Optional dictionary, e.g., {"ssl": True} (default).

Methods

  • init(options: Dict[str, Any] = None) -> mqtt.Client: Connects and authenticates to the broker. options can include retry_options. Raises DPSNError on failure.
  • publish(topic: str, message: Any, options: Dict[str, Any] = None) -> None: Publishes a JSON-serializable message to topic. topic must start with self.wallet_address. options can include qos (default 1) and retain (default False). Raises DPSNError or ValueError.
  • subscribe(topic: str, options: Dict[str, Any] = None) -> None: Subscribes to topic. options can include qos (default 1). Raises DPSNError.
  • unsubscribe(topic: str) -> None: Unsubscribes from topic. Raises DPSNError.
  • disconnect() -> None: Disconnects from the broker. Raises DPSNError.
  • generate_topic_hash(topic_name: str) -> str: Generates a unique hash for a given topic_name using a nonce and keccak256. (Note: This is currently a utility function and not directly used for publish/subscribe which rely on the wallet address prefix).

Events

Register handlers using decorators (@client.event_name) or addition (client.event_name += handler_func).

  • on_msg(message_data: Dict): Triggered when a message is received on a subscribed topic. message_data is {'topic': str, 'payload': Any} (payload is decoded JSON if possible, else string).
  • on_error(error: DPSNError): Triggered when an operational error occurs within the client (e.g., message handling failure, connection issue post-init).

Properties

  • wallet_address: The Ethereum address derived from the provided private key.
  • connected: Boolean indicating the current connection state.

Class DPSNError

Custom exception class for client-specific errors.

  • code: A DPSN_ERROR_CODES enum member.
  • message: Descriptive error message string.
  • status: Optional connection status hint ('connected' or 'disconnected').

Enum DPSN_ERROR_CODES

Defines specific error types:

  • CONNECTION_ERROR (400)
  • UNAUTHORIZED (401)
  • PUBLISH_ERROR (402)
  • INITIALIZATION_FAILED (403)
  • CLIENT_NOT_INITIALIZED (404)
  • CLIENT_NOT_CONNECTED (405)
  • SUBSCRIBE_ERROR (406)
  • SUBSCRIBE_NO_GRANT (407) - Currently unused
  • SUBSCRIBE_SETUP_ERROR (408) - Currently unused
  • DISCONNECT_ERROR (409)
  • BLOCKCHAIN_CONFIG_ERROR (410) - Currently unused
  • INVALID_PRIVATE_KEY (411)
  • ETHERS_ERROR (412) - Currently unused
  • MESSAGE_HANDLING_ERROR (414)

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dpsn_client-1.0.3.tar.gz (16.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dpsn_client-1.0.3-py3-none-any.whl (12.7 kB view details)

Uploaded Python 3

File details

Details for the file dpsn_client-1.0.3.tar.gz.

File metadata

  • Download URL: dpsn_client-1.0.3.tar.gz
  • Upload date:
  • Size: 16.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.6

File hashes

Hashes for dpsn_client-1.0.3.tar.gz
Algorithm Hash digest
SHA256 f3074eddb279771f68e256b91f42856d03e0bc69104a73493ad3f9e2d1cb4a82
MD5 c9511918d5c6b6640838d25dcec4efb7
BLAKE2b-256 26dd4da6285baebb107d7617eeeedfccf5355d8d4723ba35a134b97f57038af9

See more details on using hashes here.

File details

Details for the file dpsn_client-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: dpsn_client-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 12.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.6

File hashes

Hashes for dpsn_client-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 4813500a0feeeccef77ac4eb5dd9c7251f892ce116b0e830b76ef721e3ad02f6
MD5 facecaa0b035b74902279aa77007832f
BLAKE2b-256 0ff4aa4dcc791fe1d8bd9286af00caba8f3e1a83d240c45484323872bfe00b72

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page