Skip to main content

Prometheus-compatible exporter/monitor for watching EMQX MQTT clients.

Project description

EMQX Client Monitor

Prometheus-compatible exporter/monitor for watching EMQX MQTT clients.

This is work in progress.

EMQX doesn't expose Prometheus endpoint for watching individual client connections and relying on SYS subscribed events may not be reliable in some scenarios. This agent uses EMQX's REST API to monitor connection state of configured clients and expose Prometheus-style endpoint for further ingestion.

Primary use-case is monitoring of IoT devices, which connect to network for short periods of time to publish and receive MQTT messages, especially those that do not transmit any heartbeats.

It is intended to be deployed on Kubernetes cluster and automatically ingested to Prometheus. Some manual modes are available for convenience.

Installation and prerequisites

PyPI:emqx-client-monitor

pipx install emqx-client-monitor

This tool requires EMQX v5 and API key + secret from any admin EMQX user.

Configuration

Prepare configuration file based on examples/config.yaml. By default, this program uses ~/.config/emqx-client-monitor/config.yaml, but it can be overridden with --cfg flag.

emqx

emqx:
  api_key: "01234567890abcde"
  api_secret: "exampleSecretKey1234567890ABCDEFGHIJKLMNOPQRS"
  api_url: "http://emqx:18083/api/"
  ssl: true  # or path to CA bundle
  alias: LocalEMQX

Required:

  • api_key and api_secret are outputs from EMQX itself
  • api_url must be URL to base EMQX API endpoint (ending with /api/)
  • alias is name for EMQX broker, it should be unique among multiple instances of this program being ingested by single Prometheus DB

Optional:

  • ssl is parameter passed to requests library as verify - it's either:
    • default True for validation of Root CA certs using whatever your Python trusts
    • False is insecure mode (for brave and lazy people)
    • string path, which points to CA chain; this is useful for handling private Root CA on systems where Python doesn't use system chains
  • timeout_seconds is EMQX API connection timeout (default 5)
  • attempts allows multiple attempts before failing (default 3)

monitored_clients

monitored_clients:
  - alias: QingpingCO2_Room1
    client_id: "qingping-DEADBEEF1234"
  - alias: QingpingCO2_Room2
    client_id: "qingping-DEADBEEF5678"

It's a list of clients to be monitored. Each entry contains client_id for matching MQTT client ID (it's unique on broker) and alias used as extra label.

prometheus

prometheus:
  # all values below are defaults
  port: 9671
  address: 0.0.0.0
  ttl_seconds: 15
  enable_processed_counters: true
  enable_qos_split: false
  enable_dropped_counters: false
  enable_reason_split: false
  enable_bytes_metrics: true
  enable_packet_metrics: true
  enable_dates: false
  enable_inflight_metrics: true
  enable_subscription_count: true

All values are optional:

  • port and address define where exporter binds
  • ttl_seconds define how long exporter caches data between multiple calls to exporter API
  • enable_* are flags to enable various metrics

Running

Manual check (sub-command check)

This is a sub-command for manually checking connected clients once and printing human-readable table. Flag --all can be used to ignore monitored_clients and get all clients connected ("Alias" column becomes "Client ID").

Example output:

┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┳━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━┳━━━━━━┓
┃                        ┃    Created ┃  Keep ┃  Connected ┃ Sub ┃ MsgIn ┃      RX ┃   RX ┃  TX ┃   TX ┃
┃ Alias                  ┃ (time ago) ┃ alive ┃ (time ago) ┃ Cnt ┃ Flght ┃     Msg ┃ Drop ┃ Msg ┃ Drop ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━╇━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━╇━━━━━━┩
│ HumidifierWaterRefTerm │   16 hours │   10s │   16 hours │   4 │     0 │   24279 │    0 │  16 │    0 │
│ QingpingIoTMQTT_Client │ 39 minutes │    1m │ 39 minutes │   8 │     0 │       0 │    0 │ 126 │    0 │
│ QingpingCO2_Room1      │   a second │    2m │   a second │   0 │     0 │       0 │    0 │   0 │    0 │
│ QingpingCO2_Room2      │  4 minutes │    2m │  4 minutes │   1 │     0 │       2 │    0 │   0 │    0 │
│ QingpingCO2_Room3      │    13 days │    2m │    13 days │   1 │     0 │   38455 │    0 │  69 │    0 │
│ RTL433_Room1           │    a month │    1m │   23 hours │   0 │     0 │  702456 │    0 │   0 │    0 │
│ RTL433_Room2           │    30 days │    1m │    18 days │   0 │     0 │ 5184897 │    0 │   0 │    0 │
│ ZAMEL                  │     7 days │   32s │     7 days │   0 │     0 │ 6007727 │    0 │   0 │    0 │
└────────────────────────┴────────────┴───────┴────────────┴─────┴───────┴─────────┴──────┴─────┴──────┘

Prometheus exporter (sub-command prometheus)

Labels

All metrics have labels:

  • alias for client alias from configuration
  • broker for EMQX instance alias from configuration
  • client_id for MQTT Client ID

Additionally, most metrics have:

  • direction being either rx for subscriptions and tx for published messages

Metrics

All available metrics with example data:

# HELP emqx_client_monitor_connected Is client connected
# TYPE emqx_client_monitor_connected gauge
emqx_client_monitor_connected{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.0
# HELP emqx_client_monitor_subscriptions Number of subscriptions
# TYPE emqx_client_monitor_subscriptions gauge
emqx_client_monitor_subscriptions{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.0
# HELP emqx_client_monitor_inflights Number of inflight messages
# TYPE emqx_client_monitor_inflights gauge
emqx_client_monitor_inflights{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 0.0
# HELP emqx_client_monitor_created_at Client creation time (epoch seconds)
# TYPE emqx_client_monitor_created_at gauge
emqx_client_monitor_created_at{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.763772688905e+09
# HELP emqx_client_monitor_connected_at Client last connected time (epoch seconds)
# TYPE emqx_client_monitor_connected_at gauge
emqx_client_monitor_connected_at{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.763772688906e+09
# HELP emqx_client_monitor_messages_processed_total Number of received messages processed (total)
# TYPE emqx_client_monitor_messages_processed_total counter
emqx_client_monitor_messages_processed_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 38657.0
emqx_client_monitor_messages_processed_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 69.0
# HELP emqx_client_monitor_messages_processed_by_qos_total Number of received messages processed split by QoS
# TYPE emqx_client_monitor_messages_processed_by_qos_total counter
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="0"} 38657.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="1"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="2"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="0"} 69.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="1"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="2"} 0.0
# HELP emqx_client_monitor_messages_dropped_total Number of received messages dropped (total)
# TYPE emqx_client_monitor_messages_dropped_total counter
emqx_client_monitor_messages_dropped_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 0.0
emqx_client_monitor_messages_dropped_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 0.0
# HELP emqx_client_monitor_messages_dropped_by_reason_total Number of received messages dropped split by reason
# TYPE emqx_client_monitor_messages_dropped_by_reason_total counter
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",reason="await_pubrel_timeout"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="expired"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="queue_full"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="too_large"} 0.0
# HELP emqx_client_monitor_bytes_total Number of received raw octets (bytes)
# TYPE emqx_client_monitor_bytes_total counter
emqx_client_monitor_bytes_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 1.0525736e+07
emqx_client_monitor_bytes_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 8009.0
# HELP emqx_client_monitor_packets_total Number of received MQTT packets
# TYPE emqx_client_monitor_packets_total counter
emqx_client_monitor_packets_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 38761.0
emqx_client_monitor_packets_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 173.0

Those are controlled by feature-flags:

  • emqx_client_monitor_connected is always exported, even if not returned by EMQX API
  • enable_subscription_count controls:
    • emqx_client_monitor_subscriptions
  • enable_inflight_metrics controls:
    • emqx_client_monitor_inflights
  • enable_dates controls:
    • emqx_client_monitor_created_at
    • emqx_client_monitor_connected_at
  • enable_processed_counters controls:
    • emqx_client_monitor_messages_processed_total (RX/TX)
  • enable_qos_split controls:
    • emqx_client_monitor_messages_processed_by_qos_total (RX/TX) by qos label (0/1/2)
  • enable_dropped_counters controls:
    • emqx_client_monitor_messages_dropped_total (RX/TX)
  • enable_reason_split controls:
    • emqx_client_monitor_messages_dropped_by_reason_total (RX/TX) by reason label:
      • RX await_pubrel_timeout
      • TX expired
      • TX queue_full
      • TX too_large
  • enable_bytes_metrics controls:
    • emqx_client_monitor_bytes_total (RX/TX)
  • enable_packet_metrics controls:
    • emqx_client_monitor_packets_total (RX/TX)

Important assumptions

Client (and resulting metric) uniqueness

It is based on following attributes:

  • client alias from configuration
  • MQTT client ID (from config), which must be unique per-broker (so it's also unique per server response)
  • MQTT username (from server response)
  • broker alias from configuration (this allows multiple instances of monitor to be ingested into one Prometheus)

Following fields are ignored for this purpose:

  • IP address and port number of client - those may change over time (e.g. from DHCP) and are anyway problematic with NAT; in future, they may get exposed as metric (IP converted to integer)
  • EMQX node - it may be random in round-robin cluster; in future, this may get exposed as metric via some mapping coming from agent configuration (right now it's string like "emqx@emqx-0.emqx-headless.namespace.svc.cluster.local")
  • all connection attributes like clean start, because they are client configurable

Live data from EMQX API

For now, all data is live from EMQX API. This means that once client disconnects, all gauges and counters disappear (except emqx_client_monitor_connected). By design, once client connects back, counters on EMQX API reset.

In other words, some metrics may not make much sense for clients that have TTL shorter than publish interval. For now, it's a responsibility of some other system to aggregate resetting counters into rate gauges.

Additionaly, Prometheus scraping must be more frequent than shortest TTL for clients that connect and disconnect very often. Usually it's not a problem, as default scrape interval is 30s. This means that clients are going to be marked as disconnected when they have absurdly short TTL (like 10s), they connect very rarely and immediately disconnect after publishing single message.

To solve that, this program will need to implement the following:

  • EMQX API scraper running independently of Prometheus scrapes
  • internal state for keeping track of counters and reporting last-known data for disconnected clients

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

emqx_client_monitor-0.2.0.tar.gz (11.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

emqx_client_monitor-0.2.0-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file emqx_client_monitor-0.2.0.tar.gz.

File metadata

  • Download URL: emqx_client_monitor-0.2.0.tar.gz
  • Upload date:
  • Size: 11.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.1 cpython/3.14.0 HTTPX/0.28.1

File hashes

Hashes for emqx_client_monitor-0.2.0.tar.gz
Algorithm Hash digest
SHA256 88ad4cf430542c595824813f2eda81708a560244ae2a82802543cfbcc5bb710c
MD5 e1beb42b8914e5c1d9e97b5e875e72d0
BLAKE2b-256 39388ec1cf00f11c4604a93dd38b28e5bd975dd849344d19a0a87fc10d1b3ae0

See more details on using hashes here.

File details

Details for the file emqx_client_monitor-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for emqx_client_monitor-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3d4f8b54fbee8623c23b2406effc8fc0435aed0b7424bde2be4783749acd9892
MD5 9d41243fdb4efce70650fab852696bc5
BLAKE2b-256 a1d6a6bab5ad15fa2f3771dfba05bfe229339ec5dc0f4716e6ae1f9985fc7b5c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page