A robust mqtt library with message caching
Project description
PersistMQ
PersistMQ is a reliable on-top mqtt-module designed to support dealing with unreliable connections. This project is aimed at providing a robust, easy-to-use messaging solution.
It is mainly designed for edge devices to reliable transfer produced data even when there are power outages or longer connection troubles. The messages are cached on the file system to be reloaded again when the data transfer is possible.
Table of Contents
Features
- High Reliability: Ensures message delivery even in the event of network failures or node crashes.
- Message Caching: Messages, which are not received by the broker are cached on the file system for later retries
- Caching Methods: Cache large messages directly to pickle or leightweight to sqlite database
- Ease of Use: Simple API for integrating with your application, allowing you to move easily from paho.mqtt bare implementation.
Intended Use
✅When you should use this library:
- Long running applications which produces cyclically data (e.g. measurement devices)
- Historic data is important, not only the most recent (provide timestamp in your payload!)
- Typical message transmission time is lower than your message period (a jam could occur)
- QoS=2 is necessary
❌When you better go with others:
- High amount of messages (more than 10 per Second)
- QoS=0 is enough
Installation
For easy use, this package can be installed via pip from pypi:
pip install persistmq
As an alternative, you can clone the repository and install the required dependencies:
git clone https://github.com/DaqOpen/persistmq.git
cd persistmq
pip install .
Usage
Here is a simple example of how to use PersistMQ in your project:
import time
from pathlib import Path
from persistmq.client import PersistClient
# Create a PersistClient instance
my_robust_client = PersistClient(client_id="testclient", cache_path=Path("/tmp/mymqttcache"))
# Establish a connection to the mqtt broker
my_robust_client.connect_async(mqtt_host="localhost")
# Send some messages
for i in range(20):
my_robust_client.publish("dt/blah", f"Test Message {i:d}")
time.sleep(1)
# Stop the client process
my_robust_client.stop()
How it works
First, a separate process is spawned after the call of connect_async. This is done with the multiprocessing module. For further communication with this process, two queues are created.
The main goal is, to only put message by message into the queue of the mqtt-client, when the previous was sent successfully. This may reduce performance but allows to preserve messages when there are errors.
When publishing a message with the PersistClient, it follows this flow:
- message is queued in the multiprocessing queue
- the worker process checks the queue and consumes one message if available
- this message is then published via the underlying paho.mqtt client
- the process waits until the message has reached its destination (on_publish callback)
- if a timeout has been reached, the whole input queue (including the actual in publish stuck message) gets cached in the file system
- the caching is going on until the message has been successfully published
- When the publish was successful, this meanwhile cached message is deleted
- at the next loop, it will be checked, if cached data is available and starts publishing that
Roadmap
A quick and dirty roadmap to show what is planned for the future:
- Transmission of "Bulk" messages: In unreliable networks, the QoS=2 ping-pong of MQTT takes a long time and can generate unnecessary delay when transmitting many messages
Contributing
I welcome contributions to PersistMQ! If you'd like to contribute, please fork the repository, create a new branch, and submit a pull request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file persistmq-0.0.5.tar.gz
.
File metadata
- Download URL: persistmq-0.0.5.tar.gz
- Upload date:
- Size: 8.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ae45428cbdb1c39e42d20f0aeecb125182612e46e968e78ab8d6209c5f0e6fa6 |
|
MD5 | 54eedb1c4cebc629e51c30adf5bb8f13 |
|
BLAKE2b-256 | 223bc2df9c2f5297245bd0ab9d6b1b2faccd10e08744bc9e13ef7158cfc7c054 |
File details
Details for the file persistmq-0.0.5-py3-none-any.whl
.
File metadata
- Download URL: persistmq-0.0.5-py3-none-any.whl
- Upload date:
- Size: 7.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b014cea125135be7819b729a5b06e63987379c188d1b5d3f86f4137ae2fec2bd |
|
MD5 | e6ef5c29c9b141ea5ca00afdf2ca8bef |
|
BLAKE2b-256 | 42e877c8e3207ed00bfe0a042dce5e6f2a27c60e0b4b15048245c3f0c89d400c |