MQTT Endpoint Library
Project description
Endpointlib
MQTT Endpoint library
Links
How to install
From your virtual environment run:
pip install endpointlib
How to use
Basic example
This example just creates an enpoint to publish some data to the mqtt broker:
import asyncio
import random
from endpointlib.endpoint_factory import EndpointFactory
from endpointlib.helpers.loggers.logger_level import LoggerLevel
from endpointlib.helpers.loggers.logger_manager import LoggerManager
from endpointlib.helpers.loggers.logger_settings import LoggerSettings
LoggerManager.create(LoggerSettings.get_debug_settings(level=LoggerLevel.INFO))
logger = LoggerManager.get_async_logger(name='demo')
async def demo():
endpoint = EndpointFactory.basic_endpoint(('localhost', 1883), main_callback=main_entry_point)
await endpoint.run_forever()
async def main_entry_point(client):
await logger.info('[main_entry_point]')
#async loop
while True:
#call other async services
data = random.randrange(10, 52, 1)
data_topic = 'endpoint/data/sample'
await client.publish(data_topic, data, qos=1, retain=True)
await logger.info('Data published')
await asyncio.sleep(5)
asyncio.run(demo(), debug=True)
Advanced example
This example creates an endpoint that monitors a tcp socket device. This implementation uses a simple echo tcp server to simulate the responses, this server is not included in the package.
import asyncio
import sys
import inspect, os, sys
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(os.path.dirname(currentdir))
sys.path.insert(0, parentdir)
from utils.demo_helper import DemoHelper
from endpointlib.endpoint_factory import EndpointFactory
from endpointlib.helpers.loggers.logger_level import LoggerLevel
from endpointlib.helpers.loggers.logger_manager import LoggerManager
from endpointlib.helpers.loggers.logger_settings import LoggerSettings
LoggerManager.create(LoggerSettings.get_debug_settings(level=LoggerLevel.INFO))
logger = LoggerManager.get_async_logger(name='demo')
async def demo():
myCallbacks = dict()
myCallbacks['endpoint/control/device1/operation01'] = on_operation_01
myCallbacks['endpoint/control/device1/operation02'] = on_operation_02
broker = ('localhost', 1883)
cmd = ':MONITOR1234!'
# ('host_to_monitor', 'port', 'monitor_interval', 'command_to_monitor', 'on_monitor_callback')
monitor = ('localhost', 10001, 10, cmd, on_monitor)
global endpoint
endpoint = EndpointFactory.socket_monitor_endpoint(mqtt_connection=broker, socket_monitor=monitor, handlers=myCallbacks)
await endpoint.run_forever()
async def on_monitor(status):
await logger.info('[on_monitor]: ' + status)
await endpoint.publish('endpoint/control/device1/response/01', '1234')
async def on_operation_01(topic, payload):
response = await endpoint.send_to_device(':M11!')
await logger.info('[on_operation_01]Response from device: ' + response)
async def on_operation_02(topic, payload):
response = await endpoint.send_to_device(':R00!')
await logger.info('[on_operation_02]Response from device: ' + response)
async def main():
await DemoHelper.run_coro(server_port=10001, coro=demo())
asyncio.run(main(), debug=True)
Development setup
[TODO: Development setup instructions]
Built With
- Pyhton
Author
Alfonso Franco
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
endpointlib-0.0.12.tar.gz
(9.9 kB
view hashes)
Built Distribution
Close
Hashes for endpointlib-0.0.12-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3d9773a49196c5921ebdcbcef23567c413f7f06d1fa4a4b14226ba82b3675000 |
|
MD5 | cacc93ee8fbd644547a429027a05fc2f |
|
BLAKE2b-256 | 030fc3dbea2d82260499cd761728867787290ec708d7f862c0d5499b4b8ea868 |