Skip to main content

MQTT to InfluxDB v3 bridge with flexible JSONPath transformation

Project description

mqtt2influxdb

PyPI GitHub Actions GitHub Actions GitHub Release GitHub License

A Python bridge between MQTT messaging and InfluxDB v3 time-series database. Subscribe to MQTT topics, process incoming messages, and write data points to InfluxDB.


Features

  • Subscribe to multiple MQTT topics with wildcard support (+, #)
  • Write data to InfluxDB v3 with tags and fields
  • Support for both JSON and raw string payloads
  • JSONPath extraction from message payloads
  • Mathematical expressions for computed fields
  • Cron-based scheduling for conditional writes
  • HTTP forwarding of processed data
  • Base64 decoding support
  • Daemon mode with automatic reconnection

Requirements

  • Python 3.10+
  • InfluxDB v3 instance (Cloud or self-hosted)
  • MQTT broker (Mosquitto, etc.)

Installation

Using uv (recommended):

uv tool install mqtt2influxdb

Using pip:

pip install mqtt2influxdb

Quick Start

  1. Create a configuration file config.yml:
mqtt:
  host: localhost
  port: 1883

influxdb:
  host: localhost
  port: 8181
  token: your-api-token
  org: your-organization
  bucket: your-bucket

points:
  - measurement: temperature
    topic: sensors/+/temperature
    fields:
      value: $.payload
    tags:
      sensor_id: $.topic[1]
  1. Run the bridge:
mqtt2influxdb -c config.yml

CLI Usage

mqtt2influxdb [OPTIONS]

Options:
  -c, --config FILE  Path to configuration file (YAML)  [required]
  -D, --debug        Enable debug logging
  -o, --output FILE  Log output file path
  -t, --test         Validate configuration without running
  -d, --daemon       Daemon mode: retry on error
  --version          Show version
  --help             Show this message and exit

Configuration Reference

MQTT Section

mqtt:
  host: localhost          # Broker hostname
  port: 1883               # Broker port
  username: user           # Optional authentication
  password: pass
  cafile: /path/to/ca.crt  # Optional TLS
  certfile: /path/to/cert
  keyfile: /path/to/key

InfluxDB Section

influxdb:
  host: localhost          # InfluxDB hostname
  port: 8181               # InfluxDB port
  token: your-api-token    # API token
  org: your-organization   # Organization name
  bucket: your-bucket      # Default bucket
  enable_gzip: false       # Optional gzip compression

Points Section

points:
  - measurement: temperature
    topic: node/+/thermometer/+/temperature
    bucket: custom_bucket   # Optional: override default bucket
    schedule: '0 * * * *'   # Optional: cron filter
    fields:
      value: $.payload
      converted:
        value: $.payload.raw
        type: float
      calculated: = 32 + ($.payload.celsius * 9 / 5)
    tags:
      id: $.topic[1]
      channel: $.topic[3]

Type Conversion

Fields support optional type conversion:

fields:
  temperature:
    value: $.payload.temp
    type: float
Type Description Example
float Floating-point number "123"123.0
int Integer number "42"42
str String 123"123"
bool Boolean 1true
booltoint Boolean converted to 0/1 true1

Payload Formats

Both JSON and raw string payloads are supported:

Payload Parsed As $.payload Value
25.5 JSON number 25.5 (float)
{"temp": 25} JSON object {"temp": 25}
[1, 2, 3] JSON array [1, 2, 3]
"hello" JSON string "hello"
ON Raw string "ON"
Device ready Raw string "Device ready"

Raw strings are useful for simple MQTT messages like Tasmota power states (ON/OFF) or status messages.

JSONPath Syntax

  • $.payload - Entire payload (JSON or raw string)
  • $.payload.temperature - Nested field (JSON only)
  • $.payload.data[0] - Array index (JSON only)
  • $.topic[n] - Topic segment (0-indexed)
  • $.payload['pm2.5'] - Field with special characters (dot, space, etc.)

Special Characters: Use bracket notation with quotes for field names containing dots, spaces, or other reserved characters:

# For payload: {"air_quality_sensor": {"pm2.5": 5}}
fields:
  pm25: $.payload.air_quality_sensor['pm2.5']

Environment Variables

Use ${VAR} or ${VAR:default} syntax to substitute environment variables in any string value.

mqtt:
  host: ${MQTT2INFLUXDB_MQTT_HOST:localhost}
  port: ${MQTT2INFLUXDB_MQTT_PORT:1883}
  username: ${MQTT2INFLUXDB_MQTT_USERNAME:}
  password: ${MQTT2INFLUXDB_MQTT_PASSWORD:}

influxdb:
  host: ${MQTT2INFLUXDB_INFLUXDB_HOST:localhost}
  port: ${MQTT2INFLUXDB_INFLUXDB_PORT:8181}
  token: ${MQTT2INFLUXDB_INFLUXDB_TOKEN}
  org: ${MQTT2INFLUXDB_INFLUXDB_ORG:default}
  bucket: ${MQTT2INFLUXDB_INFLUXDB_BUCKET:metrics}
  • ${VAR} - Required variable (error if not set)
  • ${VAR:default} - Optional variable with default value
  • ${VAR:} - Optional variable with empty default

Optional HTTP Forwarding

http:
  destination: https://example.com/api
  action: post
  username: user
  password: pass

Optional Base64 Decoding

base64decode:
  source: $.payload.data
  target: data

Development

git clone https://github.com/hardwario/mqtt2influxdb.git
cd mqtt2influxdb
uv sync
uv run mqtt2influxdb -c config-tower.yml --debug

License

This project is licensed under the MIT License - see the LICENSE file for details.


Made with ❤ by HARDWARIO a.s. in the heart of Europe.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mqtt2influxdb-2.0.6.tar.gz (68.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mqtt2influxdb-2.0.6-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file mqtt2influxdb-2.0.6.tar.gz.

File metadata

  • Download URL: mqtt2influxdb-2.0.6.tar.gz
  • Upload date:
  • Size: 68.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mqtt2influxdb-2.0.6.tar.gz
Algorithm Hash digest
SHA256 4eb0fdca8441cfa1f42312054f095a189ecba82435b82d7da4eb207811efe83b
MD5 6317437b1165026c9d16c4ed1e689560
BLAKE2b-256 5560ca16c182fd4e7ea0b67b58542c54f9e563543731986643498bc6b3e47cf3

See more details on using hashes here.

Provenance

The following attestation bundles were made for mqtt2influxdb-2.0.6.tar.gz:

Publisher: publish.yml on hardwario/mqtt2influxdb

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mqtt2influxdb-2.0.6-py3-none-any.whl.

File metadata

  • Download URL: mqtt2influxdb-2.0.6-py3-none-any.whl
  • Upload date:
  • Size: 14.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mqtt2influxdb-2.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 0b6ef3aa61557a9c0300b6b56edaf5ce92cccfdaa2c2a4d509fc05e00c567d90
MD5 bd6c3b6dcf9a3f4c2eb550dfbedb3973
BLAKE2b-256 4166fc1e2bee35c44e721907b791f4590371abd66c18f954b2c07619487bd082

See more details on using hashes here.

Provenance

The following attestation bundles were made for mqtt2influxdb-2.0.6-py3-none-any.whl:

Publisher: publish.yml on hardwario/mqtt2influxdb

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page