Ocellus API Python Client
Project description
ocellus-python-client
Autogenerated python client from byte-motion/ProtobufDefs
Installation
pip install ocellus-python-client
Build test locally
$ python3 setup.py sdist bdist_wheel
$ twine check dist/*
$ twine upload dist/*
Example Usage
from __future__ import print_function
import cv2
import sys
import datetime
import imageio
import logging
import grpc
import numpy as np
import threading
import time
import queue
import ocellus_module_service_pb2
import ocellus_module_service_pb2_grpc
import ocellus_types_pb2
def handle(parentCamera):
# mocked outputs
return ocellus_module_service_pb2.OutputData(
items=[
ocellus_types_pb2.Item(
id=1,
name="CoffeCup1",
parentCamera=parentCamera,
position=ocellus_types_pb2.Position(
local=ocellus_types_pb2.Position(
x=1.0,
y=0.5,
z=0.4
)
),
quaternion=ocellus_types_pb2.Quaternion(
local=ocellus_types_pb2.Quaternion(
x=0.3996381,
y=0.3996381,
z=0.3996381,
w=-0.721712
)
),
pathPlan=[ocellus_types_pb2.PathPoint(
deltaTime=0,
position=ocellus_types_pb2.Position(
local=ocellus_types_pb2.Position(
x=0.1,
y=0.1,
z=0.1
)
),
eulerAngles=ocellus_types_pb2.EulerAngles(
local=ocellus_types_pb2.EulerAngles(
yaw=0.1,
pitch=0.1,
roll=0.1
)
),
# Not required if eulerAngles is set
quaternion=ocellus_types_pb2.Quaternion(
local=ocellus_types_pb2.Quaternion(
x=0.3996381,
y=0.3996381,
z=0.3996381,
w=-0.721712
)
)
)
]),
ocellus_types_pb2.Item(
id=2,
name="CoffeCup2",
parentCamera=parentCamera,
position=ocellus_types_pb2.Position(
local=ocellus_types_pb2.Position(
x=1.1,
y=0.6,
z=0.5
)
),
quaternion=ocellus_types_pb2.Quaternion(
local=ocellus_types_pb2.Quaternion(
x=0.3996381,
y=0.3996381,
z=0.3996381,
w=-0.721712
)
),
pathPlan=[ocellus_types_pb2.PathPoint(
deltaTime=3000,
position=ocellus_types_pb2.Position(
local=ocellus_types_pb2.Position(
x=10.0,
y=10.0,
z=10.0
)
),
# Not required if quartinion is set
eulerAngles=ocellus_types_pb2.EulerAngles(
local=ocellus_types_pb2.EulerAngles(
yaw=0.1,
pitch=0.1,
roll=0.1
)
),
quaternion=ocellus_types_pb2.Quaternion(
local=ocellus_types_pb2.Quaternion(
x=0.3996381,
y=0.3996381,
z=0.3996381,
w=-0.721712
)
)
)
])
]
)
def event_stream(output_queue):
# Initial bind request to open stream
bindRequest = ocellus_module_service_pb2.BindRequest(
name='RemoteTestModule'
)
print("Sending bindRequest")
yield ocellus_module_service_pb2.ModuleData(bindRequest=bindRequest)
while True:
time.sleep(.1)
item = output_queue.get()
if item is not None:
print("Sending data")
yield ocellus_module_service_pb2.ModuleData(data=item)
MB = 1024 * 1024
GRPC_CHANNEL_OPTIONS = [('grpc.max_message_length', 128 * MB),
('grpc.max_receive_message_length', 128 * MB)]
def run(host):
with grpc.insecure_channel('{}:50052'.format(host), options=GRPC_CHANNEL_OPTIONS) as channel:
stub = ocellus_module_service_pb2_grpc.OcellusModuleServiceStub(
channel)
try:
lock = threading.Lock()
output_queue = queue.Queue()
gen = event_stream(output_queue)
for event in stub.Bind(gen):
print(f"Received event: {event}")
# First event after bind will not contain data, it will be an ack
if not event.HasField("data"):
output_queue.put(ocellus_module_service_pb2.OutputData())
continue
# Note that the 'rs' would be, for example, a Realsense module
# named "rs" configured before the Remote module
print(event.data.pointClouds['rs/POINTCLOUD'])
refReq = ocellus_module_service_pb2.DownloadRefRequest(
urn=event.data.pointClouds['rs/POINTCLOUD'] +
"?format=depth"
)
refResponse = stub.DownloadRef(refReq)
data = refResponse.pointCloud.mat32FC1Data
rows = refResponse.pointCloud.height
cols = refResponse.pointCloud.width
arr = np.frombuffer(data, dtype=np.float32)
arr = np.reshape(arr, (rows, cols))
arr = cv2.normalize(
arr, arr, 0, 255, cv2.NORM_MINMAX, cv2.CV_8UC1)
cv2.imwrite('./depth.jpg', arr)
print(len(refResponse.pointCloud.vertices))
refReq = ocellus_module_service_pb2.DownloadRefRequest(
urn=event.data.rgbaImages['rs/RGBA_IMAGE']
)
refResponse = stub.DownloadRef(refReq)
print(len(refResponse.data))
with open('./rbga.png', 'wb') as f:
f.write(refResponse.data)
parentCamera = event.data.cameras['rs/CAMERA']
response = handle(parentCamera)
output_queue.put(response)
except grpc._channel._Rendezvous as err:
print(err)
if __name__ == '__main__':
logging.basicConfig()
host = sys.argv[1] if 1 < len(sys.argv) else '0.0.0.0'
while True:
run(host)
time.sleep(5)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
ocellus-0.1.2.tar.gz
(13.5 kB
view hashes)
Built Distribution
ocellus-0.1.2-py3-none-any.whl
(14.8 kB
view hashes)