MATRIX Labs MALOS libraries
Project description
A simple Python coroutine based driver for communicating with MATRIX-MALOS services.
Installing
The package is available on Pypi, so you can easily install via pip:
pip install matrix-io-malos
Running the CLI client
The library includes a simple command line client to start reading data from your MALOS service right away.
# Get the malosclient help screen
malosclient --help
# Get IMU data to STDOUT from a locally running MALOS service
malosclient IMU
# Get HUMIDITY data to STDOUT from a remotely running MALOS service
malosclient -h 192.168.0.100 HUMIDITY
# Get FACE detection data using a serialized driver config file
malosclient --driver-config-file ~/driver_config.proto VISION
Using the MalosDriver
To use the MALOS driver works as an async generator so in your code you can do the following:
import asyncio
import sys
from matrix_io.malos.driver import IMU_PORT, UV_PORT
from matrix_io.proto.malos.v1 import driver_pb2
from matrix_io.proto.malos.v1 import sense_pb2
from matrix_io.malos.driver import MalosDriver
async def imu_data(imu_driver):
async for msg in imu_driver.get_data():
print(sense_pb2.Imu().FromString(msg))
await asyncio.sleep(1.0)
async def uv_data(uv_driver):
async for msg in uv_driver.get_data():
print(sense_pb2.UV().FromString(msg))
await asyncio.sleep(1.0)
async def error_handler(driver):
async for msg in driver.get_error():
print('Error: %s' % msg, file=sys.stderr)
await asyncio.sleep(1.0)
# Driver configuration
driver_config = driver_pb2.DriverConfig()
# Create the drivers
imu_driver = MalosDriver('localhost', IMU_PORT)
uv_driver = MalosDriver('localhost', UV_PORT)
imu_driver.configure(driver_config)
uv_driver.configure(driver_config)
# Create loop and initialize keep-alive
loop = asyncio.get_event_loop()
loop.create_task(imu_driver.start_keep_alive())
loop.create_task(uv_driver.start_keep_alive())
# Initialize data and error handlers
loop.create_task(imu_data(imu_driver))
loop.create_task(uv_data(uv_driver))
loop.create_task(error_handler(imu_driver))
loop.create_task(error_handler(uv_driver))
try:
loop.run_forever()
except KeyboardInterrupt:
print('Shutting down. Bye, bye !', file=sys.stderr)
finally:
loop.stop()
asyncio.gather(*asyncio.Task.all_tasks()).cancel()
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
matrix_io-malos-0.1.1.tar.gz
(6.6 kB
view details)
File details
Details for the file matrix_io-malos-0.1.1.tar.gz
.
File metadata
- Download URL: matrix_io-malos-0.1.1.tar.gz
- Upload date:
- Size: 6.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: Python-urllib/3.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
4f0b74f540233228bfa51e6ced80c6cdd7499268ccf06f198d0095ac819fc38e
|
|
MD5 |
f968223d29b26397436b839643135f05
|
|
BLAKE2b-256 |
2917c6eb6aa01ee4b908b1b8329eb9c536c9a4cd2d048a1076cbf6eb631631d1
|