Skip to main content

Channels Worker providing async (qmqtt) MQTT proxy

Project description

channels-mqtt-proxy

A Channels 3 compatible MQTT worker

This worker is a standard Channels Consumer which contains a fully async MQTT server allowing channels messages to be used to publish, subscribe to and receive MQTT messages

The proxy understands mqtt.subscribe and mqtt.publish so you can change the topics dynamically.

When an MQTT subscribe is performed it is done on behalf of a channels group and all Channels Consumers in that group will receive mqtt messages as mqtt.message

The overview is :

MQTT <> Channels-MQTT-Proxy (in runworker) <> Redis/Channels-layer <> ASGI applications (in Daphne/runserver) <> Websockets/HTTP <> Browser

Installation

pip install chanmqttproxy

Usage

In Channels the asgi application handles all types of connection routing. Websocket and http connections are listened for by a suitable server (eg daphne run by manage.py runserver) which will instantiate classes and run objects to handle them. The 'mqtt' Consumer does not accept incoming connections, just channel messages. So it must be started as a worker which handles the Channel messages; the MQTT client connection is then created inside the MqttConsumer worker when the first channel message is received.

Setup

The code may look familiar if you've used the Channels Chat tutorial :)

In fact it will add the ability to monitor the chat on the chat/ MQTT channel and messages sent to that channel will appear on all clients.

In site/asgi.py:

	from chanmqttproxy import MqttConsumer
	from channels.routing import ChannelNameRouter
	
	application = ProtocolTypeRouter({
		"channel": ChannelNameRouter({
			"mqtt": MqttConsumer.as_asgi()
		}),
		... # rest of http/websocket routes
	})

To define the MQTT broker, in site/settings.py:

# Local mqtt settings
MQTT_HOST = "mqtt.example.com"
MQTT_USER = "mqtt-test"
MQTT_PASSWORD = "mqtt-test"
MQTT_VERSION = 311  # defaults to 50

At this point you have a working async Channels/MQTT bridge

To subscribe to a topic in your AsyncConsumer

    async def connect(self):
        ... # existing group_add() calls

        # Join mqtt group
        await self.channel_layer.group_add(
            "mqttgroup",
            self.channel_name
        )
        # Ensure MQTT messages come to the room
        # This simplistic approach subscribes the room every
        # time a websocket connects but that's OK
        await self.channel_layer.send(
            "mqtt",
            {
                "type": "mqtt_subscribe",
                "topic": f"chat/{self.room_name}",
                "group": "mqttgroup",
        })

        await self.accept()  # existing accept() call

To handle messages from a topic in your AsyncConsumer

    # Receive message from mqtt group and send to websocket
    async def mqtt_message(self, event):
        message = event['message']
        topic = messagep["topic"]
        payload = messagep["payload"]

        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': payload
        }))

To publish messages to a topic in your AsyncConsumer

   # Receive message from WebSocket
    async def receive(self, text_data):

        ... # existing group_send etc

        # Publish on mqtt too
        await self.channel_layer.send(
            "mqtt",
            {
                "type": "mqtt_publish",
                "publish": {  # These form the kwargs for mqtt.publish
                    "topic":  f"chat/{self.room_name}_out",
                    "payload": message,
                    "qos": 2,
                    "retain": False,
                    }
        })

For debug logging I use this at the end of settings.py:

import logging.config

LOGGING_CONFIG = None

logging.config.dictConfig({
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'console': {
            # exact format is not important, this is the minimum information
            'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'console',
        },
    },
    'loggers': {
    # root logger
        'chanmqttproxy': {
            'level': 'DEBUG',
            'handlers': ['console'],
        },
        'mysite': {
            'level': 'DEBUG',
            'handlers': ['console'],
        },
        'chat': {
            'level': 'DEBUG',
            'handlers': ['console'],
        },
    },
})

The complete code for the Channels Chat tutorial application (up to part 3) with the channels-mqtt-proxy additions is here: https://github.com/lbt/channels-mqtt-proxy/tree/main/examples

Usage

Now run both of these (in different consoles)

./manage.py runserver
./manage.py runworker mqtt

Use your mqtt listener to listen to the topic chat/lobby_out and publish to the topic chat/lobby

Notice that if you use chat/ for both topics then when the proxy client publishes to the MQTT topic the message appears twice. This is because even if you're the one that publishes a message, if you're subscribed to the topic, you will receive it too.

If you make changes to the code note that the Channels runworker does not auto-reload and will still hold old subscribe/group information.

TODO/Issues

Connect to MQTT at startup

The MqttConsumer is only instantiated after the first message arrives rather than when the worker starts. This means it may not be connected so the await self.mqttproxy.connected.wait() is required on every subscribe/publish (which is not a lot of overhead but...

Unsubscribing and no-more-clients

It's not clear how to issue an unsubscribe or deal with all clients disconnecting. If this is done in the AsyncConsumer disconnect() then it needs a client-count which is probably unreliable. Currently MQTT messages will always be sent to the Channels group and it handles member timeout as per https://channels.readthedocs.io/en/stable/channel_layer_spec.html#capacity and https://channels.readthedocs.io/en/stable/channel_layer_spec.html#persistence

retain'ed messages

On initial subscribtion all retained messages are dropped. This is not ideal when retained messages are used to indicate 'last known state' for clients.

However this is an MQTT concept and doesn't carry over to Channels unless we handle retention and continue to store messages for each Channel client that subscribes - and then somehow only send retained messages to that new client and not the existing clients who've seen them once..

Ideally there would be a mechanism for the app to pre-subscribe and send retained (and new) messages to code that could update the 'current state' model which would then be maintained and used to initialise new browser clients.

MqttConsumer in worker doesn't exit

There doesn't seem to be a way to tell the worker to exit on Ctrl-C if we trap it to clean up the MQTT connections. Also note that in some situations the Ctrl-C fails. Eg if the broker doesn't support V5.0 and fallback to V311 is underway.

Thanks

Thanks to Gurtam for https://github.com/wialon/gmqtt which is a great asyncio MQTT client that I use extensively in my HA systems and integrate with PyQt too.

The Channels tutorial was really helpful in understanding the concepts.

Also to Xavier Lesa for https://github.com/xavierlesa/channels-asgi-mqtt which is based on the paho-mqtt synchronous library and inspired me to write this.

Project details


Release history Release notifications | RSS feed

This version

0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chanmqttproxy-0.1.tar.gz (20.2 kB view details)

Uploaded Source

Built Distribution

chanmqttproxy-0.1-py3-none-any.whl (20.8 kB view details)

Uploaded Python 3

File details

Details for the file chanmqttproxy-0.1.tar.gz.

File metadata

  • Download URL: chanmqttproxy-0.1.tar.gz
  • Upload date:
  • Size: 20.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.2

File hashes

Hashes for chanmqttproxy-0.1.tar.gz
Algorithm Hash digest
SHA256 b040f68e32638d35b560891df7be35a7638ed56ce464ddf78823715260c67b4e
MD5 e0863d2fc045fd74fbd8d14f6654d563
BLAKE2b-256 a57ae3634525978406064c05f1396c212e2465bb5f80cbde38c76e65dbef43f9

See more details on using hashes here.

File details

Details for the file chanmqttproxy-0.1-py3-none-any.whl.

File metadata

  • Download URL: chanmqttproxy-0.1-py3-none-any.whl
  • Upload date:
  • Size: 20.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.2

File hashes

Hashes for chanmqttproxy-0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 812226edafe8b3e3378f065326b43132c52cbacbfd85d69bf355563914f2d51e
MD5 a185ffcfc0c5aece76b2771cc79d86e3
BLAKE2b-256 5039496c12e8c3737fc36b8ab50ba004261f3e36b37eccaf81dc68ac554c41a0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page