Skip to main content

MQTT client protocol package for Twisted

Project description

MQTT Client protocol for Twisted.

Description

twisted-mqtt is a library using the Twisted framework and implementing the MQTT protocol (v3.1 & v3.1.1) in these flavours:

  • pure subscriber
  • pure publisher
  • or a mixing of both. This is useful to subscribe and publish through the same broker using only one TCP connection.

Instalation

Just type:

sudo pip install twisted-mqtt

or from GitHub:

git clone https://github.com/astrorafael/twisted-mqtt.git
cd twisted-mqtt
sudo python setup.py install

Credits

I started writting this software after finding Adam Rudd’s MQTT.py code. A small part his code is still there. However, I soon began taking my own direction both in design and scope.

Function/methods docstrings contain quotes of the OASIS mqtt-v3.1.1 standard.

MQTT Version 3.1.1. Edited by Andrew Banks and Rahul Gupta. 29 October 2014. OASIS Standard. http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. Latest version: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html.

Usage

The APIs are described in the library defined interfaces

This library builds MQTTProtocol objects and is designed to be used rather than inherited.

Examples

These examples show my library intended usage: managed by a service. Your Twisted application should probably be designed as a collection of services and one of these would be an MQTT Service. Note that a service is simply an object that can be started by startService() and stopped by stopService().

Probably you also want your service to handle automatic reconnections to the MQTT broker and that’s where Twisted’s ClientService class comes in. A ClientService instance detects its transport has been closed and re-opens the connection to the MQTT Broker.

However, this is not enough for the MQTT protocol since the broker expects a CONNECT packet request shortly after the socket has been opened. For this reason, we must subclass ClientService to override startService(). Also we will add some MQTT connection/disconnection handling code. This requires us to obtain somehow the protocol instance built by the factory.

In the startup code, we create a ClientService instance, passing the proper MQTT protocol factory and we simply start the service. Inside startService() we invoke ClientService’s method whenConnected() that returns a Deferred. This Deferred - when fired - will invoke a user function with the protocol object been created as the parameter.

Our custom ClientService subclass defines a connectToBroker() method, receiving the protocol object just built. At minimun, we will store a reference to this protocol for further reference. If we wish to handle automatic reconnections, we should set the MQTT protocol onDisconnection attribute to a callback that will handle what to do in such cases. Our service onDisconnection() callback will simple tell us to rebuild a new protocol instance and call connectToBroker() again when done. In this way, we start the whole MQTT CONNECT thing all over again.

Finally, our custom ClientService example subclass may define a custom retry policy by customizing backoffPolicy() default arguments initialDelay, maxDelay and factor. See the twisted.application.internet.backoffPolicy() API reference for further details.

Publisher Example

A publisher is built by obtaining a factory for the MQTTFactory.PUBLISHER profile.

Your MQTT Publisher service should configure a couple of things in the connectToBroker() method:

  • The MQTT protocol onDisconnection attribute storing a callback that will be invoked when a disconnection occurs.
  • The maximun Window Size - that is - how many asynchronous PUBLISH request you will issue in a row to the library, before getting and acknowledge from the Broker (Qos=1 and 2 only). By thefault, the window size is 1 and this guarantees in-order delivery of published messages.

This example additionally starts a periodic task to publish sample data.

import sys

from twisted.internet             import reactor, task
from twisted.internet.defer       import inlineCallbacks, DeferredList
from twisted.application.internet import ClientService, backoffPolicy
from twisted.internet.endpoints   import clientFromString
from twisted.logger   import (
    Logger, LogLevel, globalLogBeginner, textFileLogObserver,
    FilteringLogObserver, LogLevelFilterPredicate)

from mqtt.client.factory import MQTTFactory

# ----------------
# Global variables
# ----------------

# Global object to control globally namespace logging
logLevelFilterPredicate = LogLevelFilterPredicate(defaultLogLevel=LogLevel.info)

BROKER = "tcp:test.mosquitto.org:1883"

# -----------------
# Utility Functions
# -----------------

def startLogging(console=True, filepath=None):
    '''
    Starts the global Twisted logger subsystem with maybe
    stdout and/or a file specified in the config file
    '''
    global logLevelFilterPredicate

    observers = []
    if console:
        observers.append( FilteringLogObserver(observer=textFileLogObserver(sys.stdout),
            predicates=[logLevelFilterPredicate] ))

    if filepath is not None and filepath != "":
        observers.append( FilteringLogObserver(observer=textFileLogObserver(open(filepath,'a')),
            predicates=[logLevelFilterPredicate] ))
    globalLogBeginner.beginLoggingTo(observers)


def setLogLevel(namespace=None, levelStr='info'):
    '''
    Set a new log level for a given namespace
    LevelStr is: 'critical', 'error', 'warn', 'info', 'debug'
    '''
    level = LogLevel.levelWithName(levelStr)
    logLevelFilterPredicate.setLogLevelForNamespace(namespace=namespace, level=level)


# -----------------------
# MQTT Publishing Service
# -----------------------

class MQTTService(ClientService):

    def __init(self, endpoint, factory):
        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())


    def startService(self):
        log.info("starting MQTT Client Publisher Service")
        # invoke whenConnected() inherited method
        self.whenConnected().addCallback(self.connectToBroker)
        ClientService.startService(self)


    @inlineCallbacks
    def connectToBroker(self, protocol):
        '''
        Connect to MQTT broker
        '''
        self.protocol                 = protocol
        self.protocol.onDisconnection = self.onDisconnection
        # We are issuing 3 publish in a row
        # if order matters, then set window size to 1
        # Publish requests beyond window size are enqueued
        self.protocol.setWindowSize(3)
        self.task = task.LoopingCall(self.publish)
        self.task.start(5.0)
        try:
            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
        except Exception as e:
            log.error("Connecting to {broker} raised {excp!s}",
               broker=BROKER, excp=e)
        else:
            log.info("Connected to {broker}", broker=BROKER)


    def onDisconnection(self, reason):
        '''
        get notfied of disconnections
        and get a deferred for a new protocol object (next retry)
        '''
        log.debug(" >< Connection was lost ! ><, reason={r}", r=reason)
        self.whenConnected().addCallback(self.connectToBroker)


    def publish(self):


        def _logFailure(failure):
            log.debug("reported {message}", message=failure.getErrorMessage())
            return failure

        def _logAll(*args):
            log.debug("all publihing complete args={args!r}",args=args)

        log.debug(" >< Starting one round of publishing >< ")
        d1 = self.protocol.publish(topic="foo/bar/baz1", qos=0, message="hello world 0")
        d1.addErrback(_logFailure)
        d2 = self.protocol.publish(topic="foo/bar/baz2", qos=1, message="hello world 1")
        d2.addErrback(_logFailure)
        d3 = self.protocol.publish(topic="foo/bar/baz3", qos=2, message="hello world 2")
        d3.addErrback(_logFailure)
        dlist = DeferredList([d1,d2,d3], consumeErrors=True)
        dlist.addCallback(_logAll)
        return dlist



if __name__ == '__main__':
    import sys
    log = Logger()
    startLogging()
    setLogLevel(namespace='mqtt',     levelStr='debug')
    setLogLevel(namespace='__main__', levelStr='debug')

    factory    = MQTTFactory(profile=MQTTFactory.PUBLISHER)
    myEndpoint = clientFromString(reactor, BROKER)
    serv       = MQTTService(myEndpoint, factory)
    serv.startService()
    reactor.run()

Subscriber Example

A subscriber is built by obtaining a factory for the MQTTFactory.SUBSCRIBER profile.

Your MQTT Subscriber service should configure the following things in the connectToBroker() method:

  • The MQTT protocol onDisconnection attribute storing a callback that will be invoked when a disconnection occurs.
  • The maximun Window Size - that is - how many asynchronous SUBSCRIBE or UNSUBSCRIBE request you will issue in a row to the library, before getting and acknowledge from the Broker.
  • The MQTT protocol onPublish attribute storing a callback that will be fired whenever a new PUBLISH packed is delivered to the subscriber.
import sys

from twisted.internet.defer       import inlineCallbacks, DeferredList
from twisted.internet             import reactor
from twisted.internet.endpoints   import clientFromString
from twisted.application.internet import ClientService, backoffPolicy

from twisted.logger   import (
    Logger, LogLevel, globalLogBeginner, textFileLogObserver,
    FilteringLogObserver, LogLevelFilterPredicate)

from mqtt.client.factory import MQTTFactory

# ----------------
# Global variables
# ----------------

# Global object to control globally namespace logging
logLevelFilterPredicate = LogLevelFilterPredicate(defaultLogLevel=LogLevel.info)

BROKER = "tcp:test.mosquitto.org:1883"

# -----------------
# Utility Functions
# -----------------

def startLogging(console=True, filepath=None):
    '''
    Starts the global Twisted logger subsystem with maybe
    stdout and/or a file specified in the config file
    '''
    global logLevelFilterPredicate

    observers = []
    if console:
        observers.append( FilteringLogObserver(observer=textFileLogObserver(sys.stdout),
            predicates=[logLevelFilterPredicate] ))

    if filepath is not None and filepath != "":
        observers.append( FilteringLogObserver(observer=textFileLogObserver(open(filepath,'a')),
            predicates=[logLevelFilterPredicate] ))
    globalLogBeginner.beginLoggingTo(observers)


def setLogLevel(namespace=None, levelStr='info'):
    '''
    Set a new log level for a given namespace
    LevelStr is: 'critical', 'error', 'warn', 'info', 'debug'
    '''
    level = LogLevel.levelWithName(levelStr)
    logLevelFilterPredicate.setLogLevelForNamespace(namespace=namespace, level=level)

# -----------------------
# MQTT Subscriber Service
# ------------------------

class MQTTService(ClientService):


    def __init(self, endpoint, factory):
        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())


    def startService(self):
        log.info("starting MQTT Client Subscriber Service")
        # invoke whenConnected() inherited method
        self.whenConnected().addCallback(self.connectToBroker)
        ClientService.startService(self)


    @inlineCallbacks
    def connectToBroker(self, protocol):
        '''
        Connect to MQTT broker
        '''
        self.protocol                 = protocol
        self.protocol.onPublish       = self.onPublish
        self.protocol.onDisconnection = self.onDisconnection
        self.protocol.setWindowSize(3)
        try:
            yield self.protocol.connect("TwistedMQTT-subs", keepalive=60)
            yield self.subscribe()
        except Exception as e:
            log.error("Connecting to {broker} raised {excp!s}",
               broker=BROKER, excp=e)
        else:
            log.info("Connected and subscribed to {broker}", broker=BROKER)


    def subscribe(self):

        def _logFailure(failure):
            log.debug("reported {message}", message=failure.getErrorMessage())
            return failure

        def _logGrantedQoS(value):
            log.debug("response {value!r}", value=value)
            return True

        def _logAll(*args):
            log.debug("all subscriptions complete args={args!r}",args=args)

        d1 = self.protocol.subscribe("foo/bar/baz1", 2 )
        d1.addCallbacks(_logGrantedQoS, _logFailure)

        d2 = self.protocol.subscribe("foo/bar/baz2", 2 )
        d2.addCallbacks(_logGrantedQoS, _logFailure)

        d3 = self.protocol.subscribe("foo/bar/baz3", 2 )
        d3.addCallbacks(_logGrantedQoS, _logFailure)

        dlist = DeferredList([d1,d2,d3], consumeErrors=True)
        dlist.addCallback(_logAll)
        return dlist


    def onPublish(self, topic, payload, qos, dup, retain, msgId):
        '''
        Callback Receiving messages from publisher
        '''
        log.debug("msg={payload}", payload=payload)


    def onDisconnection(self, reason):
        '''
        get notfied of disconnections
        and get a deferred for a new protocol object (next retry)
        '''
        log.debug(" >< Connection was lost ! ><, reason={r}", r=reason)
        self.whenConnected().addCallback(self.connectToBroker)


if __name__ == '__main__':
    import sys
    log = Logger()
    startLogging()
    setLogLevel(namespace='mqtt',     levelStr='debug')
    setLogLevel(namespace='__main__', levelStr='debug')

    factory    = MQTTFactory(profile=MQTTFactory.SUBSCRIBER)
    myEndpoint = clientFromString(reactor, BROKER)
    serv       = MQTTService(myEndpoint, factory)
    serv.startService()
    reactor.run()

Publisher/Subscriber Example

A Publisher/Subscriber example is no more than a mix of the previous examples, not forgetting to set the MQTT factory profile to MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER.

Design Notes

There is a separate MQTTProtocol in each module implementing a different profile (subscriber, publiser, publisher/subscriber). The MQTTBaseProtocol and the various MQTTProtocol classes implement a State Pattern to avoid the “if spaghetti code” in the connection states. A basic state machine is built into the MQTTBaseProtocol and the ConnectedState is patched according to the profile.

Previous 0.1.x implementations used two separate (subclases, publisher) and with separate logic for both roles. The publisher/subscriber was a mixin class implemented by delegation that managed the connection state and forwarded all client requests and network events to the proper delegate.

However, this approach had some quirks and issues with sharing state. It has been re-written to a single publisher/subscriber class that manages everything.

To maintain the former API, separate subclasses has been derived to implement a pure subscriber or publisher roles. The subclassing simply patches the state machine in order to honor only the methods for a given role.

Limitations

The current implementation has the following limitations:

  • This library does not claim to be full comformant to the standard.
  • There is a limited form of session persistance for the publisher. Pending acknowledges for PUBLISH and PUBREL are kept in RAM and outlive the connection and the protocol object while Twisted is running. However, they are not stored in a persistent medium.

TODO

I wrote this library for my pet projects and learn Twisted. However, it goes a long way from an apparently looking good library to an industrial-strength, polished product. I don’t simply have the time, energy and knowledge to do so.

Some areas in which this can be improved:

  • Bug fixing
  • Include a thorough test battery.
  • Improve documentation.
  • etc.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for twisted-mqtt, version 0.3.6
Filename, size File type Python version Upload date Hashes
Filename, size twisted-mqtt-0.3.6.tar.gz (44.6 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page