Skip to main content

为lsyiot_adapter_hub提供适配器插件

Project description

lsyiot_adapter_plugin

一个用于 lsyiot_adpter_hub 数据适配器的插件系统,支持动态加载和注册各种设备数据适配器。

项目简介

本项目提供了一个灵活的适配器插件框架,允许开发者创建自定义的数据适配器来解析和转换来自不同 IoT 设备的数据格式。

核心组件

  • AdapterABC: 抽象基类,定义了适配器的基本接口
  • AdapterNegotiation: 适配器协商器,负责管理和选择适配器
  • PluginFactory: 插件工厂,负责动态加载适配器插件
  • AdapterParseError: 适配器解析异常类
  • AdapterPluginError: 适配器插件异常类

安装

pip install lsyiot_adpter_plugin

快速开始

1. 创建自定义适配器

要创建一个自定义适配器,您需要继承 AdapterABC 类并实现必要的方法:

# coding: utf-8
"""
倾角监测适配器
"""
import json
from datetime import datetime
from typing import Generator

from lsyiot_adapter_plugin.negotiation import AdapterABC, AdapterParseError


class MonitorTiltAdapter(AdapterABC):
    """倾角适配器"""

    def __init__(self):
        super().__init__()

    @property
    def adapter_code(self) -> str:
        return "MonitorTilt"

    @property
    def adapter_name(self) -> str:
        return "倾斜监测"

    def parse(self, msg, userdata=None) -> Generator[dict, None, None]:
        """
        解析消息并生成数据

        :param msg: 消息字符串
        :param userdata: 用户数据
        :return: 生成器,yield 解析后的数据字典
        """
        try:
            data = json.loads(msg)
            params = data.get("params", {})
            device_id = params.get("id", None)
            r_data = params.get("r_data", [])

            tm = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
            send_data = {}

            # 构建发送数据结构
            for item in r_data:
                name = item.get("name", "")
                device = name[0]
                value_type = name[1:]
                value = float(item.get("value", 0))

                device_code = f"{device_id}-{device}"
                if device_code not in send_data:
                    send_data[device_code] = {
                        "device_code": device_code,
                        "x_displacement": 0.0,
                        "y_displacement": 0.0,
                        "z_displacement": 0.0,
                        "height": 0.0,
                    }

                if value_type == "X-CZ":
                    send_data[device_code]["x_displacement"] = value
                elif value_type == "Y-CZ":
                    send_data[device_code]["y_displacement"] = value
                elif value_type == "Z-CZ":
                    send_data[device_code]["z_displacement"] = value
                elif value_type == "Z":
                    send_data[device_code]["height"] = value

            for item in send_data.values():
                device_code = item["device_code"]
                row = {
                    "station_id": device_code,
                    "device_id": device_code,
                    "device_sn": device_code,
                    "collection_time": tm,
                    "x_displacement": item["x_displacement"],
                    "y_displacement": item["y_displacement"],
                    "z_displacement": item["z_displacement"],
                    "height": item["height"],
                    "sensor": device_code,
                }
                yield {"topic": userdata.get("topic_name") if userdata else None, "payload": row}
        except Exception as ex:
            raise AdapterParseError(ex)

2. 使用插件工厂加载适配器

from lsyiot_adpter_plugin.plugin_factory import PluginFactory

# 初始化插件工厂
plugin_factory = PluginFactory("plugins")

# 加载所有适配器
plugin_factory.load_adapters()

# 获取协商器
negotiation = plugin_factory.negotiation

# 选择适配器
adapter = negotiation.select_adapter("MonitorTilt")

if adapter:
    # 解析数据
    sample_data = {
        "params": {
            "id": "device001",
            "r_data": [
                {"name": "AX-CZ", "value": "1.23"},
                {"name": "AY-CZ", "value": "2.34"},
                {"name": "AZ-CZ", "value": "3.45"},
                {"name": "AZ", "value": "4.56"}
            ]
        }
    }
    
    userdata = {"topic_name": "tilt_monitoring"}
    
    for result in adapter.parse(json.dumps(sample_data), userdata):
        print(result)

3. 目录结构

推荐的插件目录结构:

plugins/
├── monitor_tilt/
│   └── tilt_adapter.py
├── temperature/
│   └── temp_adapter.py
└── pressure/
    └── pressure_adapter.py

API 参考

AdapterABC

抽象基类,所有适配器都必须继承此类。

属性

  • adapter_code (str): 适配器唯一标识符
  • adapter_name (str): 适配器显示名称

方法

  • parse(msg, userdata=None): 解析消息的抽象方法

AdapterNegotiation

适配器协商器,管理已注册的适配器。

方法

  • register_adapter(adapter_instance): 注册适配器实例
  • select_adapter(adapter_code): 根据编码选择适配器

PluginFactory

插件工厂,负责动态加载适配器。

方法

  • __init__(plugins_dir): 初始化,指定插件目录
  • load_adapters(): 加载插件目录下的所有适配器

异常处理

  • AdapterParseError: 适配器解析数据时发生的错误
  • AdapterPluginError: 插件加载或管理时发生的错误

许可证

MIT License

贡献

欢迎提交 Issue 和 Pull Request!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lsyiot_adapter_plugin-1.0.1.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lsyiot_adapter_plugin-1.0.1-py3-none-any.whl (5.5 kB view details)

Uploaded Python 3

File details

Details for the file lsyiot_adapter_plugin-1.0.1.tar.gz.

File metadata

  • Download URL: lsyiot_adapter_plugin-1.0.1.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.11

File hashes

Hashes for lsyiot_adapter_plugin-1.0.1.tar.gz
Algorithm Hash digest
SHA256 fdaa95342f6c6348f2b01b478564a1d890975336a0fa1c6bbfc66c625b2f0435
MD5 5ffc5f8f92ec4ebe43d99424c1d02f00
BLAKE2b-256 03f63c15e862705381dcf1c3b2890d6d54e2b364d1f5f7039b5c6b0ffbb118c0

See more details on using hashes here.

File details

Details for the file lsyiot_adapter_plugin-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for lsyiot_adapter_plugin-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e01c1629dba52f0f81579e14e9ffb6154d9d951597d5c1e96306474ef0f969fc
MD5 b65254077c817e6ca5ca709537195d45
BLAKE2b-256 5dfc5b054be8a1e951fea2fae67e296b3ccfba3ffaeeefb1870035a2a75d3c32

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page