Skip to main content

Ocellus API Python Client

Project description

ocellus-python-client

Autogenerated python client from byte-motion/ProtobufDefs

Installation

pip install ocellus-python-client

Build test locally

$ python3 setup.py sdist bdist_wheel
$ twine check dist/*
$ twine upload dist/*

Example Usage

from __future__ import print_function

import cv2
import sys
import datetime
import imageio
import logging
import grpc
import numpy as np
import threading
import time
import queue
import ocellus_module_service_pb2
import ocellus_module_service_pb2_grpc
import ocellus_types_pb2


def handle(parentCamera):
    # mocked outputs
    return ocellus_module_service_pb2.OutputData(
        items=[
            ocellus_types_pb2.Item(
                id=1,
                name="CoffeCup1",
                parentCamera=parentCamera,
                position=ocellus_types_pb2.Position(
                    local=ocellus_types_pb2.Position(
                        x=1.0,
                        y=0.5,
                        z=0.4
                    )
                ),
                quaternion=ocellus_types_pb2.Quaternion(
                    local=ocellus_types_pb2.Quaternion(
                        x=0.3996381,
                        y=0.3996381,
                        z=0.3996381,
                        w=-0.721712
                    )
                ),
                pathPlan=[ocellus_types_pb2.PathPoint(
                    deltaTime=0,
                    position=ocellus_types_pb2.Position(
                        local=ocellus_types_pb2.Position(
                            x=0.1,
                            y=0.1,
                            z=0.1
                        )
                    ),
                    eulerAngles=ocellus_types_pb2.EulerAngles(
                        local=ocellus_types_pb2.EulerAngles(
                            yaw=0.1,
                            pitch=0.1,
                            roll=0.1
                        )
                    ),
                    # Not required if eulerAngles is set
                    quaternion=ocellus_types_pb2.Quaternion(
                        local=ocellus_types_pb2.Quaternion(
                            x=0.3996381,
                            y=0.3996381,
                            z=0.3996381,
                            w=-0.721712
                        )
                    )
                )
            ]),
            ocellus_types_pb2.Item(
                id=2,
                name="CoffeCup2",
                parentCamera=parentCamera,
                position=ocellus_types_pb2.Position(
                    local=ocellus_types_pb2.Position(
                        x=1.1,
                        y=0.6,
                        z=0.5
                    )
                ),
                quaternion=ocellus_types_pb2.Quaternion(
                    local=ocellus_types_pb2.Quaternion(
                        x=0.3996381,
                        y=0.3996381,
                        z=0.3996381,
                        w=-0.721712
                    )
                ),
                pathPlan=[ocellus_types_pb2.PathPoint(
                    deltaTime=3000,
                    position=ocellus_types_pb2.Position(
                        local=ocellus_types_pb2.Position(
                            x=10.0,
                            y=10.0,
                            z=10.0
                        )
                    ),
                    # Not required if quartinion is set
                    eulerAngles=ocellus_types_pb2.EulerAngles(
                        local=ocellus_types_pb2.EulerAngles(
                            yaw=0.1,
                            pitch=0.1,
                            roll=0.1
                        )
                    ),
                    quaternion=ocellus_types_pb2.Quaternion(
                        local=ocellus_types_pb2.Quaternion(
                            x=0.3996381,
                            y=0.3996381,
                            z=0.3996381,
                            w=-0.721712
                        )
                    )
                )
            ])
        ]
    )


def event_stream(output_queue):
    # Initial bind request to open stream
    bindRequest = ocellus_module_service_pb2.BindRequest(
        name='RemoteTestModule'
    )
    print("Sending bindRequest")
    yield ocellus_module_service_pb2.ModuleData(bindRequest=bindRequest)

    while True:
        time.sleep(.1)
        item = output_queue.get()
        if item is not None:
            print("Sending data")
            yield ocellus_module_service_pb2.ModuleData(data=item)


MB = 1024 * 1024
GRPC_CHANNEL_OPTIONS = [('grpc.max_message_length', 128 * MB),
                        ('grpc.max_receive_message_length', 128 * MB)]


def run(host):
    with grpc.insecure_channel('{}:50052'.format(host), options=GRPC_CHANNEL_OPTIONS) as channel:
        stub = ocellus_module_service_pb2_grpc.OcellusModuleServiceStub(
            channel)
        try:
            lock = threading.Lock()
            output_queue = queue.Queue()
            gen = event_stream(output_queue)
            for event in stub.Bind(gen):
                print(f"Received event: {event}")
                # First event after bind will not contain data, it will be an ack
                if not event.HasField("data"):
                    output_queue.put(ocellus_module_service_pb2.OutputData())
                    continue
                # Note that the 'rs' would be, for example, a Realsense module
                # named "rs" configured before the Remote module
                print(event.data.pointClouds['rs/POINTCLOUD'])

                refReq = ocellus_module_service_pb2.DownloadRefRequest(
                    urn=event.data.pointClouds['rs/POINTCLOUD'] +
                    "?format=depth"
                )
                refResponse = stub.DownloadRef(refReq)
                data = refResponse.pointCloud.mat32FC1Data
                rows = refResponse.pointCloud.height
                cols = refResponse.pointCloud.width

                arr = np.frombuffer(data, dtype=np.float32)
                arr = np.reshape(arr, (rows, cols))
                arr = cv2.normalize(
                    arr, arr, 0, 255, cv2.NORM_MINMAX, cv2.CV_8UC1)
                cv2.imwrite('./depth.jpg', arr)

                print(len(refResponse.pointCloud.vertices))
                refReq = ocellus_module_service_pb2.DownloadRefRequest(
                    urn=event.data.rgbaImages['rs/RGBA_IMAGE']
                )
                refResponse = stub.DownloadRef(refReq)
                print(len(refResponse.data))
                with open('./rbga.png', 'wb') as f:
                    f.write(refResponse.data)

                parentCamera = event.data.cameras['rs/CAMERA']
                response = handle(parentCamera)
                output_queue.put(response)

        except grpc._channel._Rendezvous as err:
            print(err)


if __name__ == '__main__':
    logging.basicConfig()
    host = sys.argv[1] if 1 < len(sys.argv) else '0.0.0.0'
    while True:
        run(host)
        time.sleep(5)

Project details


Release history Release notifications | RSS feed

This version

0.1.2

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ocellus-0.1.2.tar.gz (13.5 kB view hashes)

Uploaded Source

Built Distribution

ocellus-0.1.2-py3-none-any.whl (14.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page