Skip to main content

A Python library for receiving and processing data from Murata wireless sensor units

Project description

Murata Sensor Receiver

村田製作所製無線センサユニットからのUDPデータを受信・解析するPythonライブラリ

License: MIT Python 3.8+

概要

村田製作所の無線センサユニット(電文仕様I対応)は、UDPパケットでセンサーデータを送信します。 このライブラリではUDPパケットを受信・解析します。

特徴

  • リアルタイム処理: UDPパケットの連続受信と即座の解析
  • コールバックアーキテクチャ: ユーザー定義コールバックによるデータ処理
  • テキスト解析対応: ログファイルやCSVからのセンサーデータ解析
  • データ検証: チェックサム検証
  • 依存関係なし: Python標準ライブラリのみを使用

インストール

pip install murata-sensor-receiver

クイックスタート

基本的な使用方法

from murata_sensor import MurataReceiver
import logging

# ログ設定
logging.basicConfig(level=logging.INFO)

def data_handler(sensor_data, addr):
    """受信したセンサーデータのコールバック関数"""
    print(f"Received from {addr[0]}:{addr[1]}")
    print(f"Sensor Type: {sensor_data['sensor_type']}")
    print(f"Sensor Type Code: {sensor_data['sensor_type_code']}")  # 例: '01', '09'
    print(f"Values: {sensor_data['values']}")
    print(f"RSSI: {sensor_data['info']['RSSI']} dBm")
    print("-" * 40)

def error_handler(error, raw_data, addr):
    """エラー処理のコールバック関数"""
    print(f"Error from {addr}: {error}")

# レシーバー作成
receiver = MurataReceiver(
    port=55039,
    data_callback=data_handler,
    error_callback=error_handler
)

# 受信開始
try:
    receiver.recv()
except KeyboardInterrupt:
    print("Stopping receiver...")

高度な使用方法

from murata_sensor import MurataReceiver
import logging
import json

# カスタムロガー
logger = logging.getLogger('my_app.sensor')
logger.setLevel(logging.DEBUG)

class SensorDataProcessor:
    def __init__(self):
        self.data_buffer = []
    
    def handle_sensor_data(self, sensor_data, addr):
        """センサーデータの処理と保存"""
        # タイムスタンプと送信元情報を追加
        sensor_data.update({
            'source_ip': addr[0],
            'source_port': addr[1],
            'processed_at': datetime.now().isoformat()
        })
        
        # バッファに保存
        self.data_buffer.append(sensor_data)
        
        # センサータイプに応じた処理
        sensor_type = sensor_data['sensor_type']
        
        if sensor_type == 'vibration':
            self.process_vibration_data(sensor_data)
        elif sensor_type == 'temperature_and_humidity':
            self.process_temp_humidity_data(sensor_data)
        
        # データベース保存、API送信など
        self.save_to_database(sensor_data)
    
    def handle_error(self, error, raw_data, addr):
        """エラー処理"""
        logger.error(f"Sensor error from {addr}: {error}")
        # アラート送信、ファイルログなど
    
    def process_vibration_data(self, data):
        """振動データの固有処理"""
        values = data['values']
        # 異常検知、FFT計算など
        pass
    
    def process_temp_humidity_data(self, data):
        """温湿度データの固有処理"""
        values = data['values']
        # 範囲チェック、トレンド計算など
        pass
    
    def save_to_database(self, data):
        """データベースへの保存"""
        # 使用するデータベースに応じた実装
        pass

# プロセッサの初期化
processor = SensorDataProcessor()

# カスタムプロセッサを使用したレシーバーの作成
receiver = MurataReceiver(
    port=55039,
    data_callback=processor.handle_sensor_data,
    error_callback=processor.handle_error,
    logger=logger
)

# 受信開始
receiver.recv()

非同期(asyncio)での使用

FastAPI や aiohttp など非同期フレームワークと組み合わせる場合は AsyncMurataReceiver を使用する。

import asyncio
from murata_sensor import AsyncMurataReceiver

async def main():
    receiver = AsyncMurataReceiver(port=55039)
    await receiver.start()

    try:
        async for sensor_data, addr in receiver:
            print(f"{sensor_data['sensor_type']} ({sensor_data['sensor_type_code']}): {sensor_data['values']}")
    finally:
        await receiver.stop()

asyncio.run(main())

既存の MurataReceiver をスレッドで動かす場合は run_in_thread() を使う。

from murata_sensor import MurataReceiver

def on_data(sensor_data, addr):
    print(f"受信: {sensor_data['sensor_type']}")

receiver = MurataReceiver(port=55039, data_callback=on_data)
thread = receiver.run_in_thread()

# メインスレッドで別の処理を継続できる

対応センサータイプ

センサーコード センサータイプ 説明
030301FF temperature_and_humidity 温湿度センサー
03030900 vibration 振動センサー
030310FF current_pulse 電流・パルスセンサー
030313FF voltage_pulse 電圧・パルスセンサー
030312FF CT CTセンサー
030307FF thermocouple 熱電対センサー
03031800 vibration_speed 振動(速度)センサー
03031BFF 3_current 3電流センサー
03031CFF 3_voltage 3電圧センサー
03031DFF 3_contacts 3接点センサー
0303FEFF waterproof_repeater 防水中継機
030338FF waterproof_contact_pulse 防水防塵接点パルスユニット
030339FF waterproof_analog_output 防水防塵アナログ出力無線化ユニット

解析結果のデータ構造

このライブラリは、UDP受信時およびテキスト解析時に、Pythonの辞書形式で解析結果を返します。
開発者はこの辞書をそのままDB保存・JSON変換・メッセージキュー送信などに利用できます。

UDP受信(MurataReceiver / AsyncMurataReceiver)の例

MurataReceiverdata_callback に渡される sensor_data の典型的な例(温湿度センサー):

{
    "sensor_type": "temperature_and_humidity",  # センサータイプ名
    "sensor_type_code": "01",                   # センサ種別コード [tt](16進2桁)
    "timestamp": "2024-03-17T10:30:00.123456",  # 受信時刻(ISO8601文字列)
    "values": {                                 # センサー固有の測定値
        "power-supply-voltage": {
            "value": 3.0,
            "unit": "V",
            "unit_name": "電位差(電圧)",
        },
        "temperature": {
            "value": 25.3,
            "unit": "℃",
            "unit_name": "セルシウス温度",
        },
        "humidity": {
            "value": 60.2,
            "unit": "%RH",
            "unit_name": "相対湿度",
        },
    },
    "info": {                                   # センサー情報メタデータ
        "addr": ("192.168.1.100", 55039),       # 送信元アドレス
        "unit_id": "0002",                      # ユニットID
        "message_id": "62BE",                   # メッセージID
        "RSSI": -45,                            # 受信信号強度[dBm]
        "sensor_type_code": "01",               # センサ種別コード(冗長だが info 側にも格納)
        "status": {
            "code": "FF",
            "description": "固定値(FF)",
        },
        "route": ["7FFF"],                      # 経路情報(最後の要素がゲートウェイID)
    },
    "addr": ("192.168.1.100", 55039),           # MurataReceiver が受信したアドレス
}

振動センサー(1LZ)の例(FFT有効時):

{
    "sensor_type": "vibration",
    "sensor_type_code": "09",
    "timestamp": "2024-03-17T10:31:00.456789",
    "values": {
        "power-supply-voltage": {"value": 3.34, "unit": "V", "unit_name": "電位差(電圧)"},
        "peak-frequency-1": {"value": 12, "unit": "Hz", "unit_name": "周波数"},
        "peak-acceleration-1": {"value": 0.22, "unit": "m/s2", "unit_name": "加速度"},
        "peak-frequency-2": {"value": 25, "unit": "Hz", "unit_name": "周波数"},
        "peak-acceleration-2": {"value": 0.22, "unit": "m/s2", "unit_name": "加速度"},
        # ... (ピーク3-5も同様)
        "acceleration-RMS": {"value": 0.12, "unit": "m/s2", "unit_name": "加速度実効値"},
        "kurtosis": {"value": 1.53, "unit": "-", "unit_name": "尖度"},
        "temperature": {"value": 26.52, "unit": "℃", "unit_name": "セルシウス温度"},
    },
    "info": {
        "addr": ("192.168.1.101", 55039),
        "unit_id": "1115",
        "message_id": "0464",
        "RSSI": -58,
        "sensor_type_code": "09",
        "status": {"code": "00", "description": "正常"},
        "route": ["7FFF"],
    },
    "addr": ("192.168.1.101", 55039),
}

無効値の処理:

センサーデータに無効値(ペイロード内でFFFFFF##)が含まれる場合、valueフィールドにNoneが設定されます。

# FFT無効時の振動センサー例(ピーク値が無効)
{
    "sensor_type": "vibration",
    "values": {
        "power-supply-voltage": {"value": 3.03, "unit": "V", "unit_name": "電位差(電圧)"},
        "peak-frequency-1": {"value": None, "unit": "Hz", "unit_name": "周波数"},  # 無効
        "peak-acceleration-1": {"value": None, "unit": "m/s2", "unit_name": "加速度"},  # 無効
        # ... (ピーク2-5も無効)
        "acceleration-RMS": {"value": 0.0, "unit": "m/s2", "unit_name": "加速度実効値"},
        "kurtosis": {"value": 0.0, "unit": "-", "unit_name": "尖度"},
        "temperature": {"value": 25, "unit": "℃", "unit_name": "セルシウス温度"}
    }
}

テキスト解析(parse_text_line)の例

parse_text_line() はテキスト1行から同様の情報を辞書で返します:

from murata_sensor import parse_text_line

line = "2024/09/20 16:26:11 192.168.1.100/55061:ERXDATA 1115 0000 0464 F000 4D 7A 03030900..."
result = parse_text_line(line)

print(result["sensor_type"])       # 例: 'vibration'
print(result["sensor_type_code"])  # 例: '09'
print(result["timestamp"])         # datetime(2024, 9, 20, 16, 26, 11)
print(result["source_ip"])         # '192.168.1.100'
print(result["values"])            # センサー固有の値辞書
print(result["info"])              # MurataSensorBase.info と同等の情報

戻り値の構造(概要):

{
    "timestamp": datetime | None,       # 受信タイムスタンプ(なければ None)
    "source_ip": str | None,            # 送信元IPアドレス
    "source_port": int | None,          # 送信元ポート番号
    "sensor": MurataSensorBase,         # 解析済みセンサーオブジェクト
    "sensor_type": str,                 # センサータイプ名
    "sensor_type_code": str | None,     # センサ種別コード [tt](16進2桁)
    "values": dict,                     # センサー値
    "info": dict,                       # センサー情報
    "raw_data": bytes,                  # 元のERXDATA文字列のバイト列
}

APIリファレンス

MurataReceiver

村田製作所製センサーデータの受信と処理を行うメインクラス

コンストラクタ

MurataReceiver(port, buffer_size=1024, data_callback=None, error_callback=None, logger=None)

パラメータ:

  • port (int): リッスンするUDPポート番号
  • buffer_size (int, optional): 受信バッファサイズ(デフォルト: 1024)
  • data_callback (callable, optional): センサーデータのコールバック関数
  • error_callback (callable, optional): エラー処理のコールバック関数
  • logger (logging.Logger, optional): カスタムロガーインスタンス

メソッド

recv()

センサーデータの連続受信を開始します(ブロッキング)。

run_in_thread(daemon=True)

別スレッドで受信を開始し、スレッドオブジェクトを返します。メインスレッドで他の処理を継続したい場合に使用します。

get_latest_data(addr)

指定したアドレスからの最新のセンサーデータを取得します。

get_sensor_history(addr)

指定したアドレスからのセンサーデータ履歴を取得します。

コールバック関数

データコールバック

def data_callback(sensor_data: dict, addr: tuple) -> None:
    """
    Args:
        sensor_data (dict): 処理済みセンサーデータ(以下の構造):
            {
                'sensor_type': str,      # センサータイプ名
                'timestamp': str,        # ISO形式のタイムスタンプ
                'values': dict,          # センサー固有の値
                'info': dict,           # センサー情報(RSSI、ユニットIDなど)
                'addr': tuple           # 送信元アドレス(ip, port)
            }
        addr (tuple): 送信元アドレス(ip_address, port)
    """

エラーコールバック

def error_callback(error: Exception, raw_data: bytes, addr: tuple) -> None:
    """
    Args:
        error (Exception): 発生した例外
        raw_data (bytes): 生のUDPパケットデータ
        addr (tuple): 送信元アドレス(ip_address, port)
    """

AsyncMurataReceiver

asyncio 対応の非同期 UDP 受信クラス。async for でセンサーデータを取得できる。

コンストラクタ

AsyncMurataReceiver(port, buffer_size=1024, logger=None)

メソッド

async start()

受信を開始する。UDP ソケットをバインドし、バックグラウンドで受信を開始する。

async stop()

受信を停止する。

async for での利用

async for sensor_data, addr in receiver: で受信データをイテレートできる。start() 呼び出し後に使用する。

使用例

詳細な使用例は examples/ ディレクトリを参照してください:

ファイル 説明
simple_stdout.py UDP受信→標準出力(最もシンプルな例)
sqlite_storage.py UDP受信→SQLiteデータベース保存
file_output.py UDP受信→CSV/JSONファイル出力
mqtt_publish.py UDP受信→MQTTブローカーへ送信
parse_text.py テキスト行からセンサーデータを解析
callback_closure.py コールバック: クロージャパターン
callback_class.py コールバック: クラスメソッドパターン
callback_partial.py コールバック: functools.partialパターン
callback_lambda.py コールバック: ラムダ式パターン
async_receiver.py 非同期受信(AsyncMurataReceiver と async for)
threaded_receiver.py スレッドで受信(run_in_thread)
async_durable_processing.py 長時間稼働・大量受信・DB保存・欠損なし(キュー+ワーカー)
async_fastapi_receiver.py FastAPI と同時に裏で UDP 受信(要: pip install fastapi uvicorn)
debug_sender.py テスト用UDPパケット送信ツール

エラーハンドリング

このライブラリは、カスタム例外にエラー処理を提供します:

  • FailedCheckSum: メッセージのチェックサム検証失敗
  • FailedCheckSumPayload: ペイロードのチェックサム検証失敗
  • MurataExceptionBase: すべての村田センサー関連エラーの基底例外
from murata_sensor import MurataReceiver, FailedCheckSum, FailedCheckSumPayload

def error_handler(error, raw_data, addr):
    if isinstance(error, FailedCheckSum):
        print(f"Checksum error from {addr}")
    elif isinstance(error, FailedCheckSumPayload):
        print(f"Payload checksum error from {addr}")
    else:
        print(f"Unknown error from {addr}: {error}")

receiver = MurataReceiver(port=55039, error_callback=error_handler)

ドキュメント

設計書・API仕様

開発

開発環境セットアップ

# リポジトリのクローン
git clone https://github.com/wazyc/murata-sensor-receiver.git
cd murata-sensor-receiver

# 開発用依存関係のインストール
pip install -e ".[dev]"

# テスト実行
pytest

# コードフォーマット
black src/murata_sensor/

# 型チェック
mypy src/murata_sensor/

テスト

# カバレッジ付きですべてのテストを実行
pytest --cov=murata_sensor

# 特定のテストファイルを実行
pytest tests/test_murata_receiver.py

# HTMLカバレッジレポートの生成
pytest --cov=murata_sensor --cov-report=html

ライセンス

MIT LICENSE

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

murata_sensor_receiver-0.3.0-py3-none-any.whl (25.0 kB view details)

Uploaded Python 3

File details

Details for the file murata_sensor_receiver-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for murata_sensor_receiver-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1ffb5cc96bf34418d8a8890aa71d133222028f10ca277b6a4774ef77eacfd2c3
MD5 5a1243d3dfcc2114a9292be484899005
BLAKE2b-256 278fd32aef241907d6a2de7d9062bdd2160f21e8521f4aa332777e6a182c2ad3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page