Skip to main content

A robust mqtt library with message caching

Project description

PersistMQ

PersistMQ is a reliable on-top mqtt-module designed to support dealing with unreliable connections. This project is aimed at providing a robust, easy-to-use messaging solution.

It is mainly designed for edge devices to reliable transfer produced data even when there are power outages or longer connection troubles. The messages are cached on the file system to be reloaded again when the data transfer is possible.

Table of Contents

Features

  • High Reliability: Ensures message delivery even in the event of network failures or node crashes.
  • Message Caching: Messages, which are not received by the broker are cached on the file system for later retries
  • Caching Methods: Cache large messages directly to pickle or leightweight to sqlite database
  • Ease of Use: Simple API for integrating with your application, allowing you to move easily from paho.mqtt bare implementation.

Intended Use

✅When you should use this library:

  • Long running applications which produces cyclically data (e.g. measurement devices)
  • Historic data is important, not only the most recent (provide timestamp in your payload!)
  • Typical message transmission time is lower than your message period (a jam could occur)
  • QoS=2 is necessary

❌When you better go with others:

  • High amount of messages (more than 10 per Second)
  • QoS=0 is enough

Installation

For easy use, this package can be installed via pip from pypi:

pip install persistmq

As an alternative, you can clone the repository and install the required dependencies:

git clone https://github.com/DaqOpen/persistmq.git
cd persistmq
pip install .

Usage

Here is a simple example of how to use PersistMQ in your project:

import time
from pathlib import Path
from persistmq.client import PersistClient

# Create a PersistClient instance
my_robust_client = PersistClient(client_id="testclient", cache_path=Path("/tmp/mymqttcache"))
# Establish a connection to the mqtt broker
my_robust_client.connect_async(mqtt_host="localhost")

# Send some messages
for i in range(20):
    my_robust_client.publish("dt/blah", f"Test Message {i:d}")
    time.sleep(1)

# Stop the client process
my_robust_client.stop()

How it works

First, a separate process is spawned after the call of connect_async. This is done with the multiprocessing module. For further communication with this process, two queues are created.

The main goal is, to only put message by message into the queue of the mqtt-client, when the previous was sent successfully. This may reduce performance but allows to preserve messages when there are errors.

When publishing a message with the PersistClient, it follows this flow:

  1. message is queued in the multiprocessing queue
  2. the worker process checks the queue and consumes one message if available
  3. this message is then published via the underlying paho.mqtt client
  4. the process waits until the message has reached its destination (on_publish callback)
    1. if a timeout has been reached, the whole input queue (including the actual in publish stuck message) gets cached in the file system
    2. the caching is going on until the message has been successfully published
  5. When the publish was successful, this meanwhile cached message is deleted
  6. at the next loop, it will be checked, if cached data is available and starts publishing that

Roadmap

A quick and dirty roadmap to show what is planned for the future:

  • Transmission of "Bulk" messages: In unreliable networks, the QoS=2 ping-pong of MQTT takes a long time and can generate unnecessary delay when transmitting many messages

Contributing

I welcome contributions to PersistMQ! If you'd like to contribute, please fork the repository, create a new branch, and submit a pull request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

persistmq-0.0.4.tar.gz (8.1 kB view details)

Uploaded Source

Built Distribution

persistmq-0.0.4-py3-none-any.whl (7.3 kB view details)

Uploaded Python 3

File details

Details for the file persistmq-0.0.4.tar.gz.

File metadata

  • Download URL: persistmq-0.0.4.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.5

File hashes

Hashes for persistmq-0.0.4.tar.gz
Algorithm Hash digest
SHA256 21e1e1e1297004d71d3e8b12b65d916300271cde43d4b1196d0003bcd9a34e47
MD5 85adbf1f5d75debe956176dd8010aae1
BLAKE2b-256 232effa3b0d0247643ac8f378319e8002ada3cfb9ba9fad672a766f7a01f478f

See more details on using hashes here.

File details

Details for the file persistmq-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: persistmq-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 7.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.5

File hashes

Hashes for persistmq-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 fbb7be021fc795e21acdffbcdb64b01f0559a5726957aec94d75963c1e3d2dfc
MD5 6ad590d7b581e6af2b5060dd13ad1b3f
BLAKE2b-256 2c55723e13e96f7cf905f8d0962449918bf8f2ae17f5f2f67bb5c3e179db2b18

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page