Skip to main content

MATRIX Labs MALOS libraries

Project description

A simple Python coroutine based driver for communicating with MATRIX-MALOS services.

Installing

The package is available on Pypi, so you can easily install via pip:

pip install matrix-io-malos

Running the CLI client

The library includes a simple command line client to start reading data from your MALOS service right away.

# Get the malosclient help screen
malosclient --help

# Get IMU data to STDOUT from a locally running MALOS service
malosclient IMU

# Get HUMIDITY data to STDOUT from a remotely running MALOS service
malosclient -h 192.168.0.100 HUMIDITY

# Get FACE detection data using a serialized driver config file
malosclient --driver-config-file ~/driver_config.proto VISION

Using the MalosDriver

To use the MALOS driver works as an async generator so in your code you can do the following:

import asyncio
import sys

from matrix_io.malos.driver import IMU_PORT, UV_PORT
from matrix_io.proto.malos.v1 import driver_pb2
from matrix_io.proto.malos.v1 import sense_pb2

from matrix_io.malos.driver import MalosDriver


async def imu_data(imu_driver):
    async for msg in imu_driver.get_data():
        print(sense_pb2.Imu().FromString(msg))
        await asyncio.sleep(1.0)


async def uv_data(uv_driver):
    async for msg in uv_driver.get_data():
        print(sense_pb2.UV().FromString(msg))
        await asyncio.sleep(1.0)


async def error_handler(driver):
    async for msg in driver.get_error():
        print('Error: %s' % msg, file=sys.stderr)
        await asyncio.sleep(1.0)


# Driver configuration
driver_config = driver_pb2.DriverConfig()

# Create the drivers
imu_driver = MalosDriver('localhost', IMU_PORT)
uv_driver = MalosDriver('localhost', UV_PORT)
imu_driver.configure(driver_config)
uv_driver.configure(driver_config)

# Create loop and initialize keep-alive
loop = asyncio.get_event_loop()
loop.create_task(imu_driver.start_keep_alive())
loop.create_task(uv_driver.start_keep_alive())

# Initialize data and error handlers
loop.create_task(imu_data(imu_driver))
loop.create_task(uv_data(uv_driver))
loop.create_task(error_handler(imu_driver))
loop.create_task(error_handler(uv_driver))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print('Shutting down. Bye, bye !', file=sys.stderr)
finally:
    loop.stop()
    asyncio.gather(*asyncio.Task.all_tasks()).cancel()

    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

matrix_io-malos-0.1.1.tar.gz (6.6 kB view details)

Uploaded Source

File details

Details for the file matrix_io-malos-0.1.1.tar.gz.

File metadata

  • Download URL: matrix_io-malos-0.1.1.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Python-urllib/3.6

File hashes

Hashes for matrix_io-malos-0.1.1.tar.gz
Algorithm Hash digest
SHA256 4f0b74f540233228bfa51e6ced80c6cdd7499268ccf06f198d0095ac819fc38e
MD5 f968223d29b26397436b839643135f05
BLAKE2b-256 2917c6eb6aa01ee4b908b1b8329eb9c536c9a4cd2d048a1076cbf6eb631631d1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page