Skip to main content

Prometheus-compatible exporter/monitor for watching EMQX MQTT clients.

Project description

EMQX Client Monitor

Prometheus-compatible exporter/monitor for watching EMQX MQTT clients.

This is work in progress.

EMQX doesn't expose Prometheus endpoint for watching individual client connections and relying on SYS subscribed events may not be reliable in some scenarios. This agent uses EMQX's REST API to monitor connection state of configured clients and expose Prometheus-style endpoint for further ingestion.

Primary use-case is monitoring of IoT devices, which connect to network for short periods of time to publish and receive MQTT messages, especially those that do not transmit any heartbeats.

It is intended to be deployed on Kubernetes cluster and automatically ingested to Prometheus. Some manual modes are available for convenience.

Installation and prerequisites

Local environment

PyPI:emqx-client-monitor

pipx install emqx-client-monitor

This tool requires EMQX v5 and API key + secret from any admin EMQX user.

Docker image and Helm chart

Docker image is hosted at ghcr.io/danielskowronski/emqx-client-monitor and Helm chart is in this repo at charts/emqx-client-monitor.

At minimum, following options has to be set:

config:
  targetEmqx:
    api_key: "..."
    api_secret: "..."
  monitoredClients:
    - alias: some_alias
      client_id: "some_client_id"

This will target EMQX in same namespace, configure Prometheus to scrape metrics and create EmqxClientDisconnectedTooLong alert rule.

In future, this chart will be documented better and published as OCI artifact. Some automation to get API key/secret is also considered.

Configuration

Prepare configuration file based on examples/config.yaml. By default, this program uses ~/.config/emqx-client-monitor/config.yaml, but it can be overridden with --cfg flag.

emqx

emqx:
  api_key: "01234567890abcde"
  api_secret: "exampleSecretKey1234567890ABCDEFGHIJKLMNOPQRS"
  api_url: "http://emqx:18083/api/"
  ssl: true  # or path to CA bundle
  alias: LocalEMQX

Required:

  • api_key and api_secret are outputs from EMQX itself
  • api_url must be URL to base EMQX API endpoint (ending with /api/)
  • alias is name for EMQX broker, it should be unique among multiple instances of this program being ingested by single Prometheus DB

Optional:

  • ssl is parameter passed to requests library as verify - it's either:
    • default True for validation of Root CA certs using whatever your Python trusts
    • False is insecure mode (for brave and lazy people)
    • string path, which points to CA chain; this is useful for handling private Root CA on systems where Python doesn't use system chains
  • timeout_seconds is EMQX API connection timeout (default 5)
  • attempts allows multiple attempts before failing (default 3)

monitored_clients

monitored_clients:
  - alias: QingpingCO2_Room1
    client_id: "qingping-DEADBEEF1234"
  - alias: QingpingCO2_Room2
    client_id: "qingping-DEADBEEF5678"

It's a list of clients to be monitored. Each entry contains client_id for matching MQTT client ID (it's unique on broker) and alias used as extra label.

prometheus

prometheus:
  # all values below are defaults
  port: 9671
  address: 0.0.0.0
  ttl_seconds: 15
  enable_processed_counters: true
  enable_qos_split: false
  enable_dropped_counters: false
  enable_reason_split: false
  enable_bytes_metrics: true
  enable_packet_metrics: true
  enable_dates: false
  enable_inflight_metrics: true
  enable_subscription_count: true

All values are optional:

  • port and address define where exporter binds
  • ttl_seconds define how long exporter caches data between multiple calls to exporter API
  • enable_* are flags to enable various metrics

Running

Manual check (sub-command check)

This is a sub-command for manually checking connected clients once and printing human-readable table. Flag --all can be used to ignore monitored_clients and get all clients connected ("Alias" column becomes "Client ID").

Example output:

┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┳━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━┳━━━━━━┓
┃                        ┃    Created ┃  Keep ┃  Connected ┃ Sub ┃ MsgIn ┃      RX ┃   RX ┃  TX ┃   TX ┃
┃ Alias                  ┃ (time ago) ┃ alive ┃ (time ago) ┃ Cnt ┃ Flght ┃     Msg ┃ Drop ┃ Msg ┃ Drop ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━╇━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━╇━━━━━━┩
│ HumidifierWaterRefTerm │   16 hours │   10s │   16 hours │   4 │     0 │   24279 │    0 │  16 │    0 │
│ QingpingIoTMQTT_Client │ 39 minutes │    1m │ 39 minutes │   8 │     0 │       0 │    0 │ 126 │    0 │
│ QingpingCO2_Room1      │   a second │    2m │   a second │   0 │     0 │       0 │    0 │   0 │    0 │
│ QingpingCO2_Room2      │  4 minutes │    2m │  4 minutes │   1 │     0 │       2 │    0 │   0 │    0 │
│ QingpingCO2_Room3      │    13 days │    2m │    13 days │   1 │     0 │   38455 │    0 │  69 │    0 │
│ RTL433_Room1           │    a month │    1m │   23 hours │   0 │     0 │  702456 │    0 │   0 │    0 │
│ RTL433_Room2           │    30 days │    1m │    18 days │   0 │     0 │ 5184897 │    0 │   0 │    0 │
│ ZAMEL                  │     7 days │   32s │     7 days │   0 │     0 │ 6007727 │    0 │   0 │    0 │
└────────────────────────┴────────────┴───────┴────────────┴─────┴───────┴─────────┴──────┴─────┴──────┘

Prometheus exporter (sub-command prometheus)

Labels

All metrics have labels:

  • alias for client alias from configuration
  • broker for EMQX instance alias from configuration
  • client_id for MQTT Client ID

Additionally, most metrics have:

  • direction being either rx for subscriptions and tx for published messages

Metrics

All available metrics with example data:

# HELP emqx_client_monitor_connected Is client connected
# TYPE emqx_client_monitor_connected gauge
emqx_client_monitor_connected{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.0
# HELP emqx_client_monitor_subscriptions Number of subscriptions
# TYPE emqx_client_monitor_subscriptions gauge
emqx_client_monitor_subscriptions{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.0
# HELP emqx_client_monitor_inflights Number of inflight messages
# TYPE emqx_client_monitor_inflights gauge
emqx_client_monitor_inflights{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 0.0
# HELP emqx_client_monitor_created_at Client creation time (epoch seconds)
# TYPE emqx_client_monitor_created_at gauge
emqx_client_monitor_created_at{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.763772688905e+09
# HELP emqx_client_monitor_connected_at Client last connected time (epoch seconds)
# TYPE emqx_client_monitor_connected_at gauge
emqx_client_monitor_connected_at{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.763772688906e+09
# HELP emqx_client_monitor_messages_processed_total Number of received messages processed (total)
# TYPE emqx_client_monitor_messages_processed_total counter
emqx_client_monitor_messages_processed_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 38657.0
emqx_client_monitor_messages_processed_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 69.0
# HELP emqx_client_monitor_messages_processed_by_qos_total Number of received messages processed split by QoS
# TYPE emqx_client_monitor_messages_processed_by_qos_total counter
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="0"} 38657.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="1"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="2"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="0"} 69.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="1"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="2"} 0.0
# HELP emqx_client_monitor_messages_dropped_total Number of received messages dropped (total)
# TYPE emqx_client_monitor_messages_dropped_total counter
emqx_client_monitor_messages_dropped_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 0.0
emqx_client_monitor_messages_dropped_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 0.0
# HELP emqx_client_monitor_messages_dropped_by_reason_total Number of received messages dropped split by reason
# TYPE emqx_client_monitor_messages_dropped_by_reason_total counter
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",reason="await_pubrel_timeout"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="expired"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="queue_full"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="too_large"} 0.0
# HELP emqx_client_monitor_bytes_total Number of received raw octets (bytes)
# TYPE emqx_client_monitor_bytes_total counter
emqx_client_monitor_bytes_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 1.0525736e+07
emqx_client_monitor_bytes_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 8009.0
# HELP emqx_client_monitor_packets_total Number of received MQTT packets
# TYPE emqx_client_monitor_packets_total counter
emqx_client_monitor_packets_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 38761.0
emqx_client_monitor_packets_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 173.0

Those are controlled by feature-flags:

  • emqx_client_monitor_connected is always exported, even if not returned by EMQX API
  • enable_subscription_count controls:
    • emqx_client_monitor_subscriptions
  • enable_inflight_metrics controls:
    • emqx_client_monitor_inflights
  • enable_dates controls:
    • emqx_client_monitor_created_at
    • emqx_client_monitor_connected_at
  • enable_processed_counters controls:
    • emqx_client_monitor_messages_processed_total (RX/TX)
  • enable_qos_split controls:
    • emqx_client_monitor_messages_processed_by_qos_total (RX/TX) by qos label (0/1/2)
  • enable_dropped_counters controls:
    • emqx_client_monitor_messages_dropped_total (RX/TX)
  • enable_reason_split controls:
    • emqx_client_monitor_messages_dropped_by_reason_total (RX/TX) by reason label:
      • RX await_pubrel_timeout
      • TX expired
      • TX queue_full
      • TX too_large
  • enable_bytes_metrics controls:
    • emqx_client_monitor_bytes_total (RX/TX)
  • enable_packet_metrics controls:
    • emqx_client_monitor_packets_total (RX/TX)

Important assumptions

Client (and resulting metric) uniqueness

It is based on following attributes:

  • client alias from configuration
  • MQTT client ID (from config), which must be unique per-broker (so it's also unique per server response)
  • MQTT username (from server response)
  • broker alias from configuration (this allows multiple instances of monitor to be ingested into one Prometheus)

Following fields are ignored for this purpose:

  • IP address and port number of client - those may change over time (e.g. from DHCP) and are anyway problematic with NAT; in future, they may get exposed as metric (IP converted to integer)
  • EMQX node - it may be random in round-robin cluster; in future, this may get exposed as metric via some mapping coming from agent configuration (right now it's string like "emqx@emqx-0.emqx-headless.namespace.svc.cluster.local")
  • all connection attributes like clean start, because they are client configurable

Live data from EMQX API

For now, all data is live from EMQX API. This means that once client disconnects, all gauges and counters disappear (except emqx_client_monitor_connected). By design, once client connects back, counters on EMQX API reset.

In other words, some metrics may not make much sense for clients that have TTL shorter than publish interval. For now, it's a responsibility of some other system to aggregate resetting counters into rate gauges.

Additionally, Prometheus scraping must be more frequent than shortest TTL for clients that connect and disconnect very often. Usually it's not a problem, as default scrape interval is 30s. This means that clients are going to be marked as disconnected when they have absurdly short TTL (like 10s), they connect very rarely and immediately disconnect after publishing single message.

To solve that, this program will need to implement the following:

  • EMQX API scraper running independently of Prometheus scrapes
  • internal state for keeping track of counters and reporting last-known data for disconnected clients

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

emqx_client_monitor-0.3.1.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

emqx_client_monitor-0.3.1-py3-none-any.whl (15.6 kB view details)

Uploaded Python 3

File details

Details for the file emqx_client_monitor-0.3.1.tar.gz.

File metadata

  • Download URL: emqx_client_monitor-0.3.1.tar.gz
  • Upload date:
  • Size: 15.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.1 cpython/3.14.1 HTTPX/0.28.1

File hashes

Hashes for emqx_client_monitor-0.3.1.tar.gz
Algorithm Hash digest
SHA256 86d21ebc14e18617626c69fba2a803b82e87d0779c40cc278ce3d75a7eb69d4e
MD5 382936e756bffb6f26e569a27feb20cc
BLAKE2b-256 81b8d472aac1a3e79b08aaa37fe86e358c9423c4f084b6009dbd51e02996924d

See more details on using hashes here.

File details

Details for the file emqx_client_monitor-0.3.1-py3-none-any.whl.

File metadata

File hashes

Hashes for emqx_client_monitor-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 405ba1d2bc8796e11772e7ffff0ffed6fdc49b4777d73f9ee3a285dc1f582bd3
MD5 78ee24144d15de1221844a2335f85778
BLAKE2b-256 82459d8766d64c7515ae458f46f559a3d3e532a5e66c0eb5a733dafae94e6ad3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page