MQTT to InfluxDB v3 bridge with flexible JSONPath transformation
Project description
mqtt2influxdb
A Python bridge between MQTT messaging and InfluxDB v3 time-series database. Subscribe to MQTT topics, process incoming messages, and write data points to InfluxDB.
Features
- Subscribe to multiple MQTT topics with wildcard support (
+,#) - Write data to InfluxDB v3 with tags and fields
- Support for both JSON and raw string payloads
- JSONPath extraction from message payloads
- Mathematical expressions for computed fields
- Cron-based scheduling for conditional writes
- HTTP forwarding of processed data
- Base64 decoding support
- Daemon mode with automatic reconnection
Requirements
- Python 3.10+
- InfluxDB v3 instance (Cloud or self-hosted)
- MQTT broker (Mosquitto, etc.)
Installation
Using uv (recommended):
uv tool install mqtt2influxdb
Using pip:
pip install mqtt2influxdb
Quick Start
- Create a configuration file
config.yml:
mqtt:
host: localhost
port: 1883
influxdb:
host: localhost
port: 8181
token: your-api-token
org: your-organization
bucket: your-bucket
points:
- measurement: temperature
topic: sensors/+/temperature
fields:
value: $.payload
tags:
sensor_id: $.topic[1]
- Run the bridge:
mqtt2influxdb -c config.yml
CLI Usage
mqtt2influxdb [OPTIONS]
Options:
-c, --config FILE Path to configuration file (YAML) [required]
-D, --debug Enable debug logging
-o, --output FILE Log output file path
-t, --test Validate configuration without running
-d, --daemon Daemon mode: retry on error
--version Show version
--help Show this message and exit
Configuration Reference
MQTT Section
mqtt:
host: localhost # Broker hostname
port: 1883 # Broker port
username: user # Optional authentication
password: pass
cafile: /path/to/ca.crt # Optional TLS
certfile: /path/to/cert
keyfile: /path/to/key
InfluxDB Section
influxdb:
host: localhost # InfluxDB hostname
port: 8181 # InfluxDB port
token: your-api-token # API token
org: your-organization # Organization name
bucket: your-bucket # Default bucket
enable_gzip: false # Optional gzip compression
Points Section
points:
- measurement: temperature
topic: node/+/thermometer/+/temperature
bucket: custom_bucket # Optional: override default bucket
schedule: '0 * * * *' # Optional: cron filter
fields:
value: $.payload
converted:
value: $.payload.raw
type: float
calculated: = 32 + ($.payload.celsius * 9 / 5)
tags:
id: $.topic[1]
channel: $.topic[3]
Type Conversion
Fields support optional type conversion:
fields:
temperature:
value: $.payload.temp
type: float
| Type | Description | Example |
|---|---|---|
float |
Floating-point number | "123" → 123.0 |
int |
Integer number | "42" → 42 |
str |
String | 123 → "123" |
bool |
Boolean | 1 → true |
booltoint |
Boolean converted to 0/1 | true → 1 |
Payload Formats
Both JSON and raw string payloads are supported:
| Payload | Parsed As | $.payload Value |
|---|---|---|
25.5 |
JSON number | 25.5 (float) |
{"temp": 25} |
JSON object | {"temp": 25} |
[1, 2, 3] |
JSON array | [1, 2, 3] |
"hello" |
JSON string | "hello" |
ON |
Raw string | "ON" |
Device ready |
Raw string | "Device ready" |
Raw strings are useful for simple MQTT messages like Tasmota power states (ON/OFF) or status messages.
JSONPath Syntax
$.payload- Entire payload (JSON or raw string)$.payload.temperature- Nested field (JSON only)$.payload.data[0]- Array index (JSON only)$.topic[n]- Topic segment (0-indexed)$.payload['pm2.5']- Field with special characters (dot, space, etc.)
Special Characters: Use bracket notation with quotes for field names containing dots, spaces, or other reserved characters:
# For payload: {"air_quality_sensor": {"pm2.5": 5}}
fields:
pm25: $.payload.air_quality_sensor['pm2.5']
Environment Variables
Use ${VAR} or ${VAR:default} syntax to substitute environment variables in any string value.
mqtt:
host: ${MQTT2INFLUXDB_MQTT_HOST:localhost}
port: ${MQTT2INFLUXDB_MQTT_PORT:1883}
username: ${MQTT2INFLUXDB_MQTT_USERNAME:}
password: ${MQTT2INFLUXDB_MQTT_PASSWORD:}
influxdb:
host: ${MQTT2INFLUXDB_INFLUXDB_HOST:localhost}
port: ${MQTT2INFLUXDB_INFLUXDB_PORT:8181}
token: ${MQTT2INFLUXDB_INFLUXDB_TOKEN}
org: ${MQTT2INFLUXDB_INFLUXDB_ORG:default}
bucket: ${MQTT2INFLUXDB_INFLUXDB_BUCKET:metrics}
${VAR}- Required variable (error if not set)${VAR:default}- Optional variable with default value${VAR:}- Optional variable with empty default
Optional HTTP Forwarding
http:
destination: https://example.com/api
action: post
username: user
password: pass
Optional Base64 Decoding
base64decode:
source: $.payload.data
target: data
Development
git clone https://github.com/hardwario/mqtt2influxdb.git
cd mqtt2influxdb
uv sync
uv run mqtt2influxdb -c config-tower.yml --debug
License
This project is licensed under the MIT License - see the LICENSE file for details.
Made with ❤ by HARDWARIO a.s. in the heart of Europe.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mqtt2influxdb-2.0.6.tar.gz.
File metadata
- Download URL: mqtt2influxdb-2.0.6.tar.gz
- Upload date:
- Size: 68.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4eb0fdca8441cfa1f42312054f095a189ecba82435b82d7da4eb207811efe83b
|
|
| MD5 |
6317437b1165026c9d16c4ed1e689560
|
|
| BLAKE2b-256 |
5560ca16c182fd4e7ea0b67b58542c54f9e563543731986643498bc6b3e47cf3
|
Provenance
The following attestation bundles were made for mqtt2influxdb-2.0.6.tar.gz:
Publisher:
publish.yml on hardwario/mqtt2influxdb
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mqtt2influxdb-2.0.6.tar.gz -
Subject digest:
4eb0fdca8441cfa1f42312054f095a189ecba82435b82d7da4eb207811efe83b - Sigstore transparency entry: 782223632
- Sigstore integration time:
-
Permalink:
hardwario/mqtt2influxdb@5d2ca8f4e209796ba59a7629e44e68d095e40758 -
Branch / Tag:
refs/tags/v2.0.6 - Owner: https://github.com/hardwario
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5d2ca8f4e209796ba59a7629e44e68d095e40758 -
Trigger Event:
push
-
Statement type:
File details
Details for the file mqtt2influxdb-2.0.6-py3-none-any.whl.
File metadata
- Download URL: mqtt2influxdb-2.0.6-py3-none-any.whl
- Upload date:
- Size: 14.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b6ef3aa61557a9c0300b6b56edaf5ce92cccfdaa2c2a4d509fc05e00c567d90
|
|
| MD5 |
bd6c3b6dcf9a3f4c2eb550dfbedb3973
|
|
| BLAKE2b-256 |
4166fc1e2bee35c44e721907b791f4590371abd66c18f954b2c07619487bd082
|
Provenance
The following attestation bundles were made for mqtt2influxdb-2.0.6-py3-none-any.whl:
Publisher:
publish.yml on hardwario/mqtt2influxdb
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mqtt2influxdb-2.0.6-py3-none-any.whl -
Subject digest:
0b6ef3aa61557a9c0300b6b56edaf5ce92cccfdaa2c2a4d509fc05e00c567d90 - Sigstore transparency entry: 782223633
- Sigstore integration time:
-
Permalink:
hardwario/mqtt2influxdb@5d2ca8f4e209796ba59a7629e44e68d095e40758 -
Branch / Tag:
refs/tags/v2.0.6 - Owner: https://github.com/hardwario
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5d2ca8f4e209796ba59a7629e44e68d095e40758 -
Trigger Event:
push
-
Statement type: