Skip to main content

MQTT Endpoint Library

Project description

Endpointlib

MQTT Endpoint library

Links

How to install

From your virtual environment run:

pip install endpointlib

How to use

Basic example

This example just creates an enpoint to publish some data to the mqtt broker:

import asyncio
import random

from endpointlib.endpoint_factory import EndpointFactory
from endpointlib.helpers.loggers.logger_level import LoggerLevel
from endpointlib.helpers.loggers.logger_manager import LoggerManager
from endpointlib.helpers.loggers.logger_settings import LoggerSettings

LoggerManager.create(LoggerSettings.get_debug_settings(level=LoggerLevel.INFO))
logger = LoggerManager.get_async_logger(name='demo')

async def demo():
    endpoint = EndpointFactory.basic_endpoint(('localhost', 1883), main_callback=main_entry_point)
    await endpoint.run_forever()

async def main_entry_point(client):

    await logger.info('[main_entry_point]')

    #async loop
    while True:
        #call other async services
        data = random.randrange(10, 52, 1)
        data_topic = 'endpoint/data/sample'
        await client.publish(data_topic, data, qos=1, retain=True)
        await logger.info('Data published')
        await asyncio.sleep(5)

asyncio.run(demo(), debug=True)

Advanced example

This example creates an endpoint that monitors a tcp socket device. This implementation uses a simple echo tcp server to simulate the responses, this server is not included in the package.

import asyncio
import sys

import inspect, os, sys
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(os.path.dirname(currentdir))
sys.path.insert(0, parentdir)
from utils.demo_helper import DemoHelper

from endpointlib.endpoint_factory import EndpointFactory
from endpointlib.helpers.loggers.logger_level import LoggerLevel
from endpointlib.helpers.loggers.logger_manager import LoggerManager
from endpointlib.helpers.loggers.logger_settings import LoggerSettings

LoggerManager.create(LoggerSettings.get_debug_settings(level=LoggerLevel.INFO))
logger = LoggerManager.get_async_logger(name='demo')

async def demo():
    myCallbacks = dict()
    myCallbacks['endpoint/control/device1/operation01'] = on_operation_01
    myCallbacks['endpoint/control/device1/operation02'] = on_operation_02

    broker = ('localhost', 1883)
    cmd = ':MONITOR1234!'
    # ('host_to_monitor', 'port', 'monitor_interval', 'command_to_monitor', 'on_monitor_callback')
    monitor = ('localhost', 10001, 10, cmd, on_monitor)

    global endpoint
    endpoint = EndpointFactory.socket_monitor_endpoint(mqtt_connection=broker, socket_monitor=monitor, handlers=myCallbacks)
    await endpoint.run_forever()

async def on_monitor(status):
    await logger.info('[on_monitor]: ' + status)
    await endpoint.publish('endpoint/control/device1/response/01', '1234')

async def on_operation_01(topic, payload):
    response = await endpoint.send_to_device(':M11!')
    await logger.info('[on_operation_01]Response from device: ' + response)

async def on_operation_02(topic, payload):
    response = await endpoint.send_to_device(':R00!')
    await logger.info('[on_operation_02]Response from device: ' + response)

async def main():
    await DemoHelper.run_coro(server_port=10001, coro=demo())

asyncio.run(main(), debug=True)

Development setup

[TODO: Development setup instructions]

Built With

  • Pyhton

Author

Alfonso Franco

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

endpointlib-0.0.12.tar.gz (9.9 kB view hashes)

Uploaded Source

Built Distribution

endpointlib-0.0.12-py3-none-any.whl (15.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page