Prometheus-compatible exporter/monitor for watching EMQX MQTT clients.
Project description
EMQX Client Monitor
Prometheus-compatible exporter/monitor for watching EMQX MQTT clients.
This is work in progress.
EMQX doesn't expose Prometheus endpoint for watching individual client connections and relying on SYS subscribed events may not be reliable in some scenarios. This agent uses EMQX's REST API to monitor connection state of configured clients and expose Prometheus-style endpoint for further ingestion.
Primary use-case is monitoring of IoT devices, which connect to network for short periods of time to publish and receive MQTT messages, especially those that do not transmit any heartbeats.
It is intended to be deployed on Kubernetes cluster and automatically ingested to Prometheus. Some manual modes are available for convenience.
Installation and prerequisites
Local environment
pipx install emqx-client-monitor
This tool requires EMQX v5 and API key + secret from any admin EMQX user.
Docker image and Helm chart
Docker image is hosted at ghcr.io/danielskowronski/emqx-client-monitor and Helm chart is in this repo at charts/emqx-client-monitor.
At minimum, following options has to be set:
config:
targetEmqx:
api_key: "..."
api_secret: "..."
monitoredClients:
- alias: some_alias
client_id: "some_client_id"
This will target EMQX in same namespace, configure Prometheus to scrape metrics and create EmqxClientDisconnectedTooLong alert rule.
In future, this chart will be documented better and published as OCI artifact. Some automation to get API key/secret is also considered.
Configuration
Prepare configuration file based on examples/config.yaml. By default, this program uses ~/.config/emqx-client-monitor/config.yaml, but it can be overridden with --cfg flag.
emqx
emqx:
api_key: "01234567890abcde"
api_secret: "exampleSecretKey1234567890ABCDEFGHIJKLMNOPQRS"
api_url: "http://emqx:18083/api/"
ssl: true # or path to CA bundle
alias: LocalEMQX
Required:
api_keyandapi_secretare outputs from EMQX itselfapi_urlmust be URL to base EMQX API endpoint (ending with/api/)aliasis name for EMQX broker, it should be unique among multiple instances of this program being ingested by single Prometheus DB
Optional:
sslis parameter passed torequestslibrary asverify- it's either:- default
Truefor validation of Root CA certs using whatever your Python trusts Falseis insecure mode (for brave and lazy people)- string path, which points to CA chain; this is useful for handling private Root CA on systems where Python doesn't use system chains
- default
timeout_secondsis EMQX API connection timeout (default5)attemptsallows multiple attempts before failing (default3)
monitored_clients
monitored_clients:
- alias: QingpingCO2_Room1
client_id: "qingping-DEADBEEF1234"
- alias: QingpingCO2_Room2
client_id: "qingping-DEADBEEF5678"
It's a list of clients to be monitored. Each entry contains client_id for matching MQTT client ID (it's unique on broker) and alias used as extra label.
prometheus
prometheus:
# all values below are defaults
port: 9671
address: 0.0.0.0
ttl_seconds: 15
enable_processed_counters: true
enable_qos_split: false
enable_dropped_counters: false
enable_reason_split: false
enable_bytes_metrics: true
enable_packet_metrics: true
enable_dates: false
enable_inflight_metrics: true
enable_subscription_count: true
All values are optional:
portandaddressdefine where exporter bindsttl_secondsdefine how long exporter caches data between multiple calls to exporter APIenable_*are flags to enable various metrics
Running
Manual check (sub-command check)
This is a sub-command for manually checking connected clients once and printing human-readable table. Flag --all can be used to ignore monitored_clients and get all clients connected ("Alias" column becomes "Client ID").
Example output:
┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┳━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━┳━━━━━━┓
┃ ┃ Created ┃ Keep ┃ Connected ┃ Sub ┃ MsgIn ┃ RX ┃ RX ┃ TX ┃ TX ┃
┃ Alias ┃ (time ago) ┃ alive ┃ (time ago) ┃ Cnt ┃ Flght ┃ Msg ┃ Drop ┃ Msg ┃ Drop ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━╇━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━╇━━━━━━┩
│ HumidifierWaterRefTerm │ 16 hours │ 10s │ 16 hours │ 4 │ 0 │ 24279 │ 0 │ 16 │ 0 │
│ QingpingIoTMQTT_Client │ 39 minutes │ 1m │ 39 minutes │ 8 │ 0 │ 0 │ 0 │ 126 │ 0 │
│ QingpingCO2_Room1 │ a second │ 2m │ a second │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │
│ QingpingCO2_Room2 │ 4 minutes │ 2m │ 4 minutes │ 1 │ 0 │ 2 │ 0 │ 0 │ 0 │
│ QingpingCO2_Room3 │ 13 days │ 2m │ 13 days │ 1 │ 0 │ 38455 │ 0 │ 69 │ 0 │
│ RTL433_Room1 │ a month │ 1m │ 23 hours │ 0 │ 0 │ 702456 │ 0 │ 0 │ 0 │
│ RTL433_Room2 │ 30 days │ 1m │ 18 days │ 0 │ 0 │ 5184897 │ 0 │ 0 │ 0 │
│ ZAMEL │ 7 days │ 32s │ 7 days │ 0 │ 0 │ 6007727 │ 0 │ 0 │ 0 │
└────────────────────────┴────────────┴───────┴────────────┴─────┴───────┴─────────┴──────┴─────┴──────┘
Prometheus exporter (sub-command prometheus)
Labels
All metrics have labels:
aliasfor client alias from configurationbrokerfor EMQX instance alias from configurationclient_idfor MQTT Client ID
Additionally, most metrics have:
directionbeing eitherrxfor subscriptions andtxfor published messages
Metrics
All available metrics with example data:
# HELP emqx_client_monitor_connected Is client connected
# TYPE emqx_client_monitor_connected gauge
emqx_client_monitor_connected{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.0
# HELP emqx_client_monitor_subscriptions Number of subscriptions
# TYPE emqx_client_monitor_subscriptions gauge
emqx_client_monitor_subscriptions{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.0
# HELP emqx_client_monitor_inflights Number of inflight messages
# TYPE emqx_client_monitor_inflights gauge
emqx_client_monitor_inflights{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 0.0
# HELP emqx_client_monitor_created_at Client creation time (epoch seconds)
# TYPE emqx_client_monitor_created_at gauge
emqx_client_monitor_created_at{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.763772688905e+09
# HELP emqx_client_monitor_connected_at Client last connected time (epoch seconds)
# TYPE emqx_client_monitor_connected_at gauge
emqx_client_monitor_connected_at{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.763772688906e+09
# HELP emqx_client_monitor_messages_processed_total Number of received messages processed (total)
# TYPE emqx_client_monitor_messages_processed_total counter
emqx_client_monitor_messages_processed_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 38657.0
emqx_client_monitor_messages_processed_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 69.0
# HELP emqx_client_monitor_messages_processed_by_qos_total Number of received messages processed split by QoS
# TYPE emqx_client_monitor_messages_processed_by_qos_total counter
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="0"} 38657.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="1"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="2"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="0"} 69.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="1"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="2"} 0.0
# HELP emqx_client_monitor_messages_dropped_total Number of received messages dropped (total)
# TYPE emqx_client_monitor_messages_dropped_total counter
emqx_client_monitor_messages_dropped_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 0.0
emqx_client_monitor_messages_dropped_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 0.0
# HELP emqx_client_monitor_messages_dropped_by_reason_total Number of received messages dropped split by reason
# TYPE emqx_client_monitor_messages_dropped_by_reason_total counter
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",reason="await_pubrel_timeout"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="expired"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="queue_full"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="too_large"} 0.0
# HELP emqx_client_monitor_bytes_total Number of received raw octets (bytes)
# TYPE emqx_client_monitor_bytes_total counter
emqx_client_monitor_bytes_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 1.0525736e+07
emqx_client_monitor_bytes_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 8009.0
# HELP emqx_client_monitor_packets_total Number of received MQTT packets
# TYPE emqx_client_monitor_packets_total counter
emqx_client_monitor_packets_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 38761.0
emqx_client_monitor_packets_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 173.0
Those are controlled by feature-flags:
emqx_client_monitor_connectedis always exported, even if not returned by EMQX APIenable_subscription_countcontrols:emqx_client_monitor_subscriptions
enable_inflight_metricscontrols:emqx_client_monitor_inflights
enable_datescontrols:emqx_client_monitor_created_atemqx_client_monitor_connected_at
enable_processed_counterscontrols:emqx_client_monitor_messages_processed_total(RX/TX)
enable_qos_splitcontrols:emqx_client_monitor_messages_processed_by_qos_total(RX/TX) byqoslabel (0/1/2)
enable_dropped_counterscontrols:emqx_client_monitor_messages_dropped_total(RX/TX)
enable_reason_splitcontrols:emqx_client_monitor_messages_dropped_by_reason_total(RX/TX) byreasonlabel:- RX
await_pubrel_timeout - TX
expired - TX
queue_full - TX
too_large
- RX
enable_bytes_metricscontrols:emqx_client_monitor_bytes_total(RX/TX)
enable_packet_metricscontrols:emqx_client_monitor_packets_total(RX/TX)
Important assumptions
Client (and resulting metric) uniqueness
It is based on following attributes:
- client
aliasfrom configuration - MQTT client ID (from config), which must be unique per-broker (so it's also unique per server response)
- MQTT username (from server response)
- broker
aliasfrom configuration (this allows multiple instances of monitor to be ingested into one Prometheus)
Following fields are ignored for this purpose:
- IP address and port number of client - those may change over time (e.g. from DHCP) and are anyway problematic with NAT; in future, they may get exposed as metric (IP converted to integer)
- EMQX node - it may be random in round-robin cluster; in future, this may get exposed as metric via some mapping coming from agent configuration (right now it's string like
"emqx@emqx-0.emqx-headless.namespace.svc.cluster.local") - all connection attributes like clean start, because they are client configurable
Live data from EMQX API
For now, all data is live from EMQX API. This means that once client disconnects, all gauges and counters disappear (except emqx_client_monitor_connected). By design, once client connects back, counters on EMQX API reset.
In other words, some metrics may not make much sense for clients that have TTL shorter than publish interval. For now, it's a responsibility of some other system to aggregate resetting counters into rate gauges.
Additionally, Prometheus scraping must be more frequent than shortest TTL for clients that connect and disconnect very often. Usually it's not a problem, as default scrape interval is 30s. This means that clients are going to be marked as disconnected when they have absurdly short TTL (like 10s), they connect very rarely and immediately disconnect after publishing single message.
To solve that, this program will need to implement the following:
- EMQX API scraper running independently of Prometheus scrapes
- internal state for keeping track of counters and reporting last-known data for disconnected clients
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file emqx_client_monitor-0.3.1.tar.gz.
File metadata
- Download URL: emqx_client_monitor-0.3.1.tar.gz
- Upload date:
- Size: 15.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: Hatch/1.16.1 cpython/3.14.1 HTTPX/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86d21ebc14e18617626c69fba2a803b82e87d0779c40cc278ce3d75a7eb69d4e
|
|
| MD5 |
382936e756bffb6f26e569a27feb20cc
|
|
| BLAKE2b-256 |
81b8d472aac1a3e79b08aaa37fe86e358c9423c4f084b6009dbd51e02996924d
|
File details
Details for the file emqx_client_monitor-0.3.1-py3-none-any.whl.
File metadata
- Download URL: emqx_client_monitor-0.3.1-py3-none-any.whl
- Upload date:
- Size: 15.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: Hatch/1.16.1 cpython/3.14.1 HTTPX/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
405ba1d2bc8796e11772e7ffff0ffed6fdc49b4777d73f9ee3a285dc1f582bd3
|
|
| MD5 |
78ee24144d15de1221844a2335f85778
|
|
| BLAKE2b-256 |
82459d8766d64c7515ae458f46f559a3d3e532a5e66c0eb5a733dafae94e6ad3
|