MATRIX Labs MALOS libraries
Project description
A simple Python coroutine based driver for communicating with MATRIX-MALOS services.
Installing
The package is available on Pypi, so you can easily install via pip:
pip install matrix-io-malos
Running the CLI client
The library includes a simple command line client to start reading data from your MALOS service right away.
# Get the malosclient help screen malosclient --help # Get IMU data to STDOUT from a locally running MALOS service malosclient IMU # Get HUMIDITY data to STDOUT from a remotely running MALOS service malosclient -h 192.168.0.100 HUMIDITY # Get FACE detection data using a serialized driver config file malosclient --driver-config-file ~/driver_config.proto VISION
Using the MalosDriver
To use the MALOS driver works as an async generator so in your code you can do the following:
import asyncio import sys from matrix_io.malos.driver import IMU_PORT, UV_PORT from matrix_io.proto.malos.v1 import driver_pb2 from matrix_io.proto.malos.v1 import sense_pb2 from matrix_io.malos.driver import MalosDriver async def imu_data(imu_driver): async for msg in imu_driver.get_data(): print(sense_pb2.Imu().FromString(msg)) await asyncio.sleep(1.0) async def uv_data(uv_driver): async for msg in uv_driver.get_data(): print(sense_pb2.UV().FromString(msg)) await asyncio.sleep(1.0) async def error_handler(driver): async for msg in driver.get_error(): print('Error: %s' % msg, file=sys.stderr) await asyncio.sleep(1.0) # Driver configuration driver_config = driver_pb2.DriverConfig() # Create the drivers imu_driver = MalosDriver('localhost', IMU_PORT) uv_driver = MalosDriver('localhost', UV_PORT) imu_driver.configure(driver_config) uv_driver.configure(driver_config) # Create loop and initialize keep-alive loop = asyncio.get_event_loop() loop.create_task(imu_driver.start_keep_alive()) loop.create_task(uv_driver.start_keep_alive()) # Initialize data and error handlers loop.create_task(imu_data(imu_driver)) loop.create_task(uv_data(uv_driver)) loop.create_task(error_handler(imu_driver)) loop.create_task(error_handler(uv_driver)) try: loop.run_forever() except KeyboardInterrupt: print('Shutting down. Bye, bye !', file=sys.stderr) finally: loop.stop() asyncio.gather(*asyncio.Task.all_tasks()).cancel() loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size matrix_io-malos-0.1.1.tar.gz (6.6 kB) | File type Source | Python version None | Upload date | Hashes View |