Skip to main content

DPSN MQTT client for managing topic subscriptions and publications

Project description

dpsn-client (Python)

Overview

dpsn-client is a Python SDK for interacting with the DPSN infrastructure. It allows you to connect to a DPSN broker using your Ethereum wallet for authentication, publish messages to topics.Explore and subscribe to existing data streams directly from the DPSN Streams Marketplace.

For more information, visit:

Installation

pip install dpsn-client

Note: If installing from source or locally, navigate to the root directory and run pip install -e .

Usage

Prerequisites

  • DPSN URL: The hostname of the DPSN MQTT broker (e.g., betanet.dpsn.org).
  • Wallet Private Key: Your Ethereum private key (starting with 0x). This wallet will be used for authentication. Ensure it's kept secure.
  • Chain Options: Specify the network (mainnet or testnet) and chain type (ethereum).

(Blockchain interaction features like purchasing topics, fetching owned topics, and getting topic prices are planned for future versions and require additional configuration not yet implemented in this Python client.)

Import the Library

from dpsn_client import DpsnClient, DPSNError

Create Client Instance

Initialize the client with your DPSN broker URL, private key, and chain options.

dpsn_url = "your-dpsn-broker.com"  # e.g., betanet.dpsn.org
private_key = "0xYOUR_PRIVATE_KEY" # Replace with your actual private key

client = DpsnClient(
    dpsn_url=dpsn_url,
    private_key=private_key,
    chain_options={
        "network": "testnet",        # or "mainnet"
        "wallet_chain_type": "ethereum"
    },
    # Optional: connection_options={'ssl': True} # Default is True
)

Understanding DPSN Topics

Topics in DPSN are data distribution channels designed for secure, permissioned data streams associated with blockchain wallets.

  • Ownership-based channels: While topic purchase happens on the blockchain (feature planned), the MQTT authentication relies on the owner's wallet.

  • Data streams: Authenticated publishers push data to topics, and subscribers receive data.

  • Authenticated channels: The client uses signatures derived from the owner's private key to authenticate actions like connecting and publishing.

  • When publishing data, the publishing client uses the same private key that purchased the topic to authenticate the request, ensuring only authorized wallets can publish to their owned topics.

(Full topic lifecycle management via smart contracts, including purchasing and ownership verification).

Setup Event Handlers

Use decorators to define handlers for asynchronous events like receiving messages or errors. Set these up before calling init().

@client.on_msg
def handle_message(message_data):
    """Handles incoming messages."""
    topic = message_data.get('topic')
    payload = message_data.get('payload')
    logger.info(f"Received message on topic '{topic}': {payload}")

@client.on_error
def handle_error(error: DPSNError):
    """Handles errors reported by the client."""
    logger.error(f"DPSN Client Error: {error}")

# You can also add handlers later if needed:
# client.on_msg += another_message_handler
# client.on_error += another_error_handler

Initialize DPSN Client

Connect to the DPSN MQTT broker. This authenticates using a signature generated from your private key.

### Publish Data

Publish a JSON-serializable message to a specific topic string. The topic must start with your wallet address (`0x...`).

> **Caution:** You must initialize the client with the private key corresponding to the wallet address used in the topic prefix.

```python
# Example topic
publish_topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker"
data_to_publish = {"ticker": "BTC/USDT", "price":83387.13000000, "timestamp": time.time()}

try:
    #Publishing to topic
    client.publish(publish_topic, data_to_publish, options={'qos': 1})
    #Message published successfully
except DPSNError as e:
    logger.error(f"Failed to publish: {e}")
except ValueError as e:
    logger.error(f"Publish error (invalid topic format?): {e}")

Subscribing to Topics

Subscribe to a topic pattern to receive messages.

subscribe_topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker" # Subscribe to topic 

try:
    #Subscribing to topic
    client.subscribe(subscribe_topic, options={'qos': 1})
    #Successfully subscribed to the topic

    # Keep the script running to receive messages
    logger.info("Waiting for messages... Press Ctrl+C to exit.")
    while True:
        time.sleep(1)

except DPSNError as e:
    logger.error(f"Failed to subscribe: {e}")
except KeyboardInterrupt:
    logger.info("Interrupted by user.")

Unsubscribing from Topics

Stop receiving messages from a specific topic.

# Unsubscribe ongoing topic subscription
try:
    client.unsubscribe(subscribe_topic)
except DPSNError as e:
    print(f"Failed to unsubscribe: {e}")

Disconnect

Disconnect cleanly from the MQTT broker.

# Disconnect client connection
try:
    client.disconnect()
except DPSNError as e:
    print(f"Error during disconnect: {e}")

API Reference

Class DpsnClient

Manages the connection, authentication, and messaging with the DPSN client. Inherits from events.Events.

Constructor

DpsnClient(
    dpsn_url: str,
    private_key: str,
    chain_options: Dict[str, Any],
    connection_options: Dict[str, Any] = None
)
  • dpsn_url: Hostname of the DPSN broker.
  • private_key: Ethereum wallet private key (0x...).
  • chain_options: Dictionary, requires {"network": "mainnet"|"testnet", "wallet_chain_type": "ethereum"}.
  • connection_options: Optional dictionary, e.g., {"ssl": True} (default).

Methods

  • init(options: Dict[str, Any] = None) -> mqtt.Client: Connects and authenticates to the broker. options can include retry_options. Raises DPSNError on failure.
  • publish(topic: str, message: Any, options: Dict[str, Any] = None) -> None: Publishes a JSON-serializable message to topic. topic must start with self.wallet_address. options can include qos (default 1) and retain (default False). Raises DPSNError or ValueError.
  • subscribe(topic: str, options: Dict[str, Any] = None) -> None: Subscribes to topic. options can include qos (default 1). Raises DPSNError.
  • unsubscribe(topic: str) -> None: Unsubscribes from topic. Raises DPSNError.
  • disconnect() -> None: Disconnects from the broker. Raises DPSNError.
  • generate_topic_hash(topic_name: str) -> str: Generates a unique hash for a given topic_name using a nonce and keccak256. (Note: This is currently a utility function and not directly used for publish/subscribe which rely on the wallet address prefix).

Events

Register handlers using decorators (@client.event_name) or addition (client.event_name += handler_func).

  • on_msg(message_data: Dict): Triggered when a message is received on a subscribed topic. message_data is {'topic': str, 'payload': Any} (payload is decoded JSON if possible, else string).
  • on_error(error: DPSNError): Triggered when an operational error occurs within the client (e.g., message handling failure, connection issue post-init).

Properties

  • wallet_address: The Ethereum address derived from the provided private key.
  • connected: Boolean indicating the current connection state.

Class DPSNError

Custom exception class for client-specific errors.

  • code: A DPSN_ERROR_CODES enum member.
  • message: Descriptive error message string.
  • status: Optional connection status hint ('connected' or 'disconnected').

Enum DPSN_ERROR_CODES

Defines specific error types:

  • CONNECTION_ERROR (400)
  • UNAUTHORIZED (401)
  • PUBLISH_ERROR (402)
  • INITIALIZATION_FAILED (403)
  • CLIENT_NOT_INITIALIZED (404)
  • CLIENT_NOT_CONNECTED (405)
  • SUBSCRIBE_ERROR (406)
  • SUBSCRIBE_NO_GRANT (407) - Currently unused
  • SUBSCRIBE_SETUP_ERROR (408) - Currently unused
  • DISCONNECT_ERROR (409)
  • BLOCKCHAIN_CONFIG_ERROR (410) - Currently unused
  • INVALID_PRIVATE_KEY (411)
  • ETHERS_ERROR (412) - Currently unused
  • MESSAGE_HANDLING_ERROR (414)

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dpsn_client-1.0.0.post1.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dpsn_client-1.0.0.post1-py3-none-any.whl (7.8 kB view details)

Uploaded Python 3

File details

Details for the file dpsn_client-1.0.0.post1.tar.gz.

File metadata

  • Download URL: dpsn_client-1.0.0.post1.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.6

File hashes

Hashes for dpsn_client-1.0.0.post1.tar.gz
Algorithm Hash digest
SHA256 335acbd7bc329c67eb08d6ea20329a5850beac732e82ab13c039e7bb076bc49f
MD5 c35745b527cd7e7e1340b4762d72974d
BLAKE2b-256 5d2055daf39ec9897c2cc57060b0faafdd2f5818c559c9cb760b4de83218c733

See more details on using hashes here.

File details

Details for the file dpsn_client-1.0.0.post1-py3-none-any.whl.

File metadata

File hashes

Hashes for dpsn_client-1.0.0.post1-py3-none-any.whl
Algorithm Hash digest
SHA256 e749003cbb19dcf023b3c501266c383c9623330f8799ab8061fe00d72fa4f837
MD5 60480a37716acbcf78ca4847fd5ff2a8
BLAKE2b-256 bea57490ef3db4fd7622b87abc80d043007a8a978aa3b747dcfb5b457fc61bb5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page