MQTT extension for the Cognite extractor-utils framework
Project description
Cognite extractor-utils
MQTT extension
The MQTT extension for Cognite extractor-utils
provides a way
to easily write your own extractors for systems exposing an MQTT interface.
The library is currently under development, and should not be used in production environments yet.
Overview
The MQTT extension for extractor utils subscribes to MQTT topics, automatically serializes the payload into user-defined DTO classes, and handles uploading of data to CDF.
The only part of the extractor necessary to for a user to implement are
- Describing the payload schema using Python
dataclass
es - Implementing a mapping from the source data model to the CDF data model
As an example, consider this example payload:
{
"elements": [
{
"pumpId": "bridge-pump-1453",
"startTime": "2022-02-27T12:13:00",
"duration": 16,
},
{
"pumpId": "bridge-pump-254",
"startTime": "2022-02-26T16:12:23",
"duration": 124,
},
]
}
We want to make an extractor that can turn these MQTT messages into CDF events. First, we need to create some data classes describing the expected schema of the payloads:
@dataclass
class PumpEvent:
pumpId: str
startTime: str
duration: int
@dataclass
class PumpEventList:
elements: List[PumpEvent]
Then, we can create an MqttExtractor
instance, subscribe to the appropriate topic, and convert the payload into CDF
events:
extractor = MqttExtractor(
name="PumpMqttExtractor",
description="Extracting pumping events from an MQTT source",
version="1.0.0",
)
@extractor.topic(topic="mytopic", qos=1, response_type=PumpEventList)
def subscribe_pump_events(events: PumpEventList) -> Iterable[Event]:
external_id_prefix = MqttExtractor.get_current_config_file()
for pump_event in events.elements:
start_time = arrow.get(pump_event.startTime)
end_time = start_time.shift(seconds=pump_event.duration)
yield Event(
external_id=f"{external_id_prefix}{pump_event.pumpId}-{uuid.uuid4()}",
start_time=start_time.int_timestamp*1000,
end_time=end_time.int_timestamp*1000,
)
with extractor:
extractor.run()
A demo example is provided in the example.py
file.
Contributing
See the contribution guide for extractor-utils
for details on contributing.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file cognite-extractor-utils-mqtt-0.1.0.tar.gz
.
File metadata
- Download URL: cognite-extractor-utils-mqtt-0.1.0.tar.gz
- Upload date:
- Size: 8.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | fc659b0db762d25f2805613a03d25346b4cd26605be90e7ce686a78dd88b6081 |
|
MD5 | 9f70fdbe1310efb5472cd2fe5a6fa3db |
|
BLAKE2b-256 | 445bd54f4f776253ccf5b3b1f3cb56a87c6648f682cf55e6ed86a3a5521d35d6 |
File details
Details for the file cognite_extractor_utils_mqtt-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: cognite_extractor_utils_mqtt-0.1.0-py3-none-any.whl
- Upload date:
- Size: 9.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f0c25393b350ee529ac47b6bd22b16f6c9afc930f54da56dd7cb2a3c75a4c2f8 |
|
MD5 | bd8fb74bc5ef00f230e81638befb31ee |
|
BLAKE2b-256 | 4fc7087e5d922a1466c8f28243c3978d25c07b0d2f71cae230ca36d6a61c44f2 |