Skip to main content

MQTT extension for the Cognite extractor-utils framework

Project description

Cognite extractor-utils MQTT extension

The MQTT extension for Cognite extractor-utils provides a way to easily write your own extractors for systems exposing an MQTT interface.

The library is currently under development, and should not be used in production environments yet.

Overview

The MQTT extension for extractor utils subscribes to MQTT topics, automatically serializes the payload into user-defined DTO classes, and handles uploading of data to CDF.

The only part of the extractor necessary to for a user to implement are

  • Describing the payload schema using Python dataclasses
  • Implementing a mapping from the source data model to the CDF data model

As an example, consider this example payload:

{
    "elements": [
        {
            "pumpId": "bridge-pump-1453",
            "startTime": "2022-02-27T12:13:00",
            "duration": 16,
        },
        {
            "pumpId": "bridge-pump-254",
            "startTime": "2022-02-26T16:12:23",
            "duration": 124,
        },
    ]
}

We want to make an extractor that can turn these MQTT messages into CDF events. First, we need to create some data classes describing the expected schema of the payloads:

@dataclass
class PumpEvent:
    pumpId: str
    startTime: str
    duration: int

@dataclass
class PumpEventList:
    elements: List[PumpEvent]

Then, we can create an MqttExtractor instance, subscribe to the appropriate topic, and convert the payload into CDF events:

extractor = MqttExtractor(
    name="PumpMqttExtractor",
    description="Extracting pumping events from an MQTT source",
    version="1.0.0",
)

@extractor.topic(topic="mytopic", qos=1, response_type=PumpEventList)
def subscribe_pump_events(events: PumpEventList) -> Iterable[Event]:
    external_id_prefix = MqttExtractor.get_current_config_file()

    for pump_event in events.elements:
        start_time = arrow.get(pump_event.startTime)
        end_time = start_time.shift(seconds=pump_event.duration)

        yield Event(
            external_id=f"{external_id_prefix}{pump_event.pumpId}-{uuid.uuid4()}",
            start_time=start_time.int_timestamp*1000,
            end_time=end_time.int_timestamp*1000,
        )

with extractor:
    extractor.run()

A demo example is provided in the example.py file.

Contributing

See the contribution guide for extractor-utils for details on contributing.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cognite-extractor-utils-mqtt-0.1.0.tar.gz (8.3 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file cognite-extractor-utils-mqtt-0.1.0.tar.gz.

File metadata

  • Download URL: cognite-extractor-utils-mqtt-0.1.0.tar.gz
  • Upload date:
  • Size: 8.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10

File hashes

Hashes for cognite-extractor-utils-mqtt-0.1.0.tar.gz
Algorithm Hash digest
SHA256 fc659b0db762d25f2805613a03d25346b4cd26605be90e7ce686a78dd88b6081
MD5 9f70fdbe1310efb5472cd2fe5a6fa3db
BLAKE2b-256 445bd54f4f776253ccf5b3b1f3cb56a87c6648f682cf55e6ed86a3a5521d35d6

See more details on using hashes here.

File details

Details for the file cognite_extractor_utils_mqtt-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: cognite_extractor_utils_mqtt-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 9.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10

File hashes

Hashes for cognite_extractor_utils_mqtt-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f0c25393b350ee529ac47b6bd22b16f6c9afc930f54da56dd7cb2a3c75a4c2f8
MD5 bd8fb74bc5ef00f230e81638befb31ee
BLAKE2b-256 4fc7087e5d922a1466c8f28243c3978d25c07b0d2f71cae230ca36d6a61c44f2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page