Skip to main content

Lib to handle ease of use of pointclouds

Project description

#+title: Utility lib for naps
#+author: Virgile Daugé
#+EMAIL:virgile.dauge@loria.fr

Paramètre d'export de code :
#+begin_src emacs-lisp :tangle no :results silent
(setq org-src-preserve-indentation t)
#+end_src


* Construction du Module

#+begin_src python :tangle setup.py
# -*- coding: utf-8 -*-

from setuptools import setup, find_packages

with open('readme.org', 'r') as fh:
long_description = fh.read()

setup(
name='naps_utilities',
packages=find_packages(exclude=["examples/*"]),
version='0.2.0',
description='Lib to handle ease of use of pointclouds ',
author=u'Virgile Daugé',
author_email='virgile.dauge@loria.fr',
url='https://github.com/virgileTN/naps_utilities',
keywords=['pointclouds', 'filtering'],
install_requires=['numpy',
'numpy-quaternion'],
long_description=long_description,
long_description_content_type='text/plain',
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU General Public License (GPL)',
'Operating System :: MacOS :: MacOS X',
'Operating System :: POSIX',
'Programming Language :: Python :: 3.6',
],
)
#+end_src

#+begin_src bash :results value verbatim :exports both
mkdir naps_utilities
touch naps_utilities/__init__.py
#+end_src

* Classe transform

** Mise à dispo de la classe

#+begin_src python :tangle naps_utilities/__init__.py
from .transform import Transform
#+end_src

** Imports

#+begin_src python :tangle naps_utilities/pointcloud.py
class MultipleInputData(Exception):
"""Raised when data used to populate is not valid"""
pass
#+end_src

#+begin_src python :tangle naps_utilities/transform.py
#!/usr/bin/python
# -*- coding: utf-8 -*-

import numpy as np
import quaternion
import math

from geometry_msgs.msg import TransformStamped

#+end_src

** Corps de la fonction
#+begin_src python :tangle naps_utilities/transform.py
class Transform:
# self.matrix = None # matrix de transfo

# Constructeur
def __init__(self, mat=None, quat=None, pos=None, ros_msg=None):
u""" Constructeur depuis une matrice OU un quaternion et une position."""

# Ensure that only one populate method is selected:
conditions = [mat is not None, quat is not None and pos is not None, ros_msg is not None]
if sum(conditions) > 1:
raise MultipleInputData

if mat is not None:
self.matrix = np.copy(mat)

elif quat is not None and pos is not None:
self.from_quatpos(quat, pos)

elif ros_msg is not None:
self.from_msg(ros_msg)

else:
self.matrix = np.identity(4)

#+end_src

** Fonction de peuplement
#+begin_src python :tangle naps_utilities/transform.py
def from_quatpos(self, quat, pos):
self.matrix = np.identity(4)
# (w, x, y, z)
quat = np.asarray(quat)
npquat = quaternion.quaternion(quat[0], quat[1],
quat[2], quat[3])
self.matrix[:3, :3] = quaternion.as_rotation_matrix(npquat)
self.matrix[:3, 3] = pos
#+end_src

#+begin_src python :tangle naps_utilities/transform.py
def from_msg(self, msg):
self.from_quatpos(pos=[
msg.transform.translation.x,
msg.transform.translation.y,
msg.transform.translation.z,
], quat=[
msg.transform.rotation.w,
msg.transform.rotation.x,
msg.transform.rotation.y,
msg.transform.rotation.z,
])
#+end_src

** Vers ROS msg
#+begin_src python :tangle naps_utilities/transform.py
def to_msg(self, child_frame_id, frame_id='map'):
msg = TransformStamped()
quaternion = self.quaternion()
position = self.position()
msg.header.frame_id = frame_id
msg.child_frame_id = child_frame_id
msg.transform.translation.x = position[0]
msg.transform.translation.y = position[1]
msg.transform.translation.z = position[2]
msg.transform.rotation.x = quaternion.x
msg.transform.rotation.y = quaternion.y
msg.transform.rotation.z = quaternion.z
msg.transform.rotation.w = quaternion.w
return msg
#+end_src

** Fcontions internes
Fonctions d'affichage
#+begin_src python :tangle naps_utilities/transform.py
def __str__(self):
u"""Affichage de la transformation."""
return self.matrix.__str__()

def __repr__(self):
u"""Représentation interne de la classe."""
return self.matrix.__repr__()

#+end_src

Conversion quaternion vers matrice de rotation, ici on utilise le
module numpy-quaternion.
#+begin_src python :tangle naps_utilities/transform.py
def quat_2_mat(self, quat, pos):
u"""Conversion quaternion vers matrix."""
self.matrix[:3, :3] = quaternion.as_rotation_matrix(quat)
self.matrix[:3, 3] = pos
#+end_src

Opérations sur la matrice de tranformation :
#+begin_src python :tangle naps_utilities/transform.py
def inverse(self):
u"""Inverse de la transformation."""
return Transform(np.linalg.inv(self.matrix))

def __invert__(self):
u"""Inverse de la transformation inplace."""
return Transform(np.linalg.inv(self.matrix))

def __sub__(self, other):
u"""Renvoie la transformation dans self du repère à l'origine de la transformation other."""
return self.composition(~other)

def __isub__(self, other):
u"""Version 'inplace' de sub."""
self = self.composition(~other)
return self

def composition(self, tr):
u"""Composition de transformations."""
return Transform(mat=np.dot(self.matrix, tr.matrix))

def __mul__(self, other):
u"""Composition de la transformation de other dans self."""
return self.composition(other)

def __imul__(self, other):
u""""Version 'inplace' de mul."""
self.matrix = self.matrix.dot(other.matrix)
return self

def relative_transform(self, other):
u"""Transformation de self dans le repère other."""
return ~other.composition(self)

def projection(self, pt):
u"""Transformation d'un point."""
if (len(pt) == 3):
return self.matrix.dot(pt + [1])
else:
return self.matrix.dot(pt)

#+end_src

Accès à la position et au quaternion indépendamment :
#+begin_src python :tangle naps_utilities/transform.py
def position(self):
u"""Extraction de la position depuis matrix."""
return self.matrix[:3, 3]

def quaternion(self):
u"""Extraction du quaternion depuis matrix."""
return quaternion.from_rotation_matrix(self.matrix)
#+end_src

** Tests

#+begin_src ipython :session testTransform :file :exports both
from naps_utilities import Transform
t = Transform(pos=[1,2,3], quat=[1,0,0,0])
t
#+end_src

#+RESULTS:
: # Out[3]:
: #+BEGIN_EXAMPLE
: array([[1., 0., 0., 1.],
: [0., 1., 0., 2.],
: [0., 0., 1., 3.],
: [0., 0., 0., 1.]])
: #+END_EXAMPLE

#+begin_src ipython :session testTransform :file :exports both
t.to_msg('truc')
#+end_src

#+RESULTS:
: # Out[5]:
: : geometry_msgs.msg.TransformStamped(header=std_msgs.msg.Header(stamp=builtin_interfaces.msg.Time(sec=0, nanosec=0), frame_id='map'), child_frame_id='truc', transform=geometry_msgs.msg.Transform(translation=geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), rotation=geometry_msgs.msg.Quaternion(x=-0.0, y=-0.0, z=-0.0, w=1.0)))

* Classe Pointcloud
** Mise à dispo de la classe

#+begin_src python :tangle naps_utilities/__init__.py
from .pointcloud import Pointcloud
#+end_src
** Dependences

#+begin_src python :tangle naps_utilities/pointcloud.py
# Nécessaires pour la lecture/écriture de fichiers
import os
import json
import pickle

#Les données sont stockées sous forme de numpy ndarray
import numpy as np

#Nécessaire pour la conversion vers/depuis ROS2
from builtin_interfaces.msg import Time
from sensor_msgs.msg import PointCloud2
from sensor_msgs.msg import PointField
from std_msgs.msg import Header
from array import array
#+end_src

#+begin_src python :tangle naps_utilities/pointcloud.py
class TransformWhileEmpty(Exception):
"""Raised when transform method is called and the poincloud is not yet
populated"""
pass
#+end_src

#+begin_src python :tangle naps_utilities/pointcloud.py
class InvalidInputData(Exception):
"""Raised when data used to populate is not valid"""
pass
#+end_src

#+begin_src python :tangle naps_utilities/pointcloud.py
class MultipleInputData(Exception):
"""Raised when data used to populate is not valid"""
pass
#+end_src
** Corps de la classe
#+begin_src python :tangle naps_utilities/pointcloud.py
class Pointcloud():
def __init__(self, ros_msg=None, points=None, keep_ring=True,
matrix=None, procrastinate=False, inpath=None):
# PoinCloud Metadata
self.metadata = {'header': None,
'height': None,
'width': None,
'fields': None,
'is_bigendian': None,
'point_step': None,
'row_step': None,
'is_dense': None,
'keep_ring': None,
'is_populated': False,
'procrastinated': True,}
# Pointcloud DATA
self.points = None
self.rings = None
self.matrix = matrix

# Ensure that only one populate method is selected:
conditions = [ros_msg is not None, points is not None, inpath is not None]

if sum(conditions) > 1:
raise MultipleInputData

else:
if ros_msg is not None:
self.from_msg(ros_msg)

elif points is not None:
self.from_list(points)

elif inpath is not None:
self.load(inpath)

if self.metadata['is_populated']:
if not procrastinate:
self.filter()
if matrix is None:
self.matrix = np.identity(4)
self.metadata['procrastinated'] = False
else:
self.transform(matrix)
#+end_src
** Populate from list
#+begin_src python :tangle naps_utilities/pointcloud.py
def from_list(self, data):
self.metadata['keep_ring'] = False

self.points = np.ascontiguousarray(data, dtype=np.float32)
if self.points.shape[1] != 3 and self.points.shape[1] != 4:
raise InvalidInputData

self.metadata['nb_points'] = len(self.points)
self.metadata['height'] = 1
self.metadata['width'] = self.metadata['nb_points']

self.metadata['is_bigendian'] = False
self.metadata['point_step'] = 3 * 4
self.metadata['row_step'] = self.metadata['point_step']

self.metadata['is_dense'] = False

self.metadata['is_populated'] = True
#+end_src
** populate from ROS msg
Un certain nombre de données ne nécessitent pas de conversion :
#+begin_src python :tangle naps_utilities/pointcloud.py
def from_msg(self, msg):
#Données conservées "telles quelles"

self.metadata['height'] = msg.height
self.metadata['width'] = msg.width

self.metadata['is_bigendian'] = msg.is_bigendian
self.metadata['point_step'] = msg.point_step
self.metadata['row_step'] = msg.row_step

self.metadata['is_dense'] = msg.is_dense
#+end_src
L'atribut Header est du type std_msgs/Header:
#+begin_src python :tangle naps_utilities/pointcloud.py
def from_header(header):
return {'time': {'sec': header.stamp.sec, 'nanosec': header.stamp.nanosec},
'frame_id': header.frame_id}
self.metadata['header'] = from_header(msg.header)
#+end_src
L'attribut fields du msg ROS est une liste d'objets PointFields. Il
convient également de supprimer le fields ring, si l'on choisit de ne
pas les garder.
#+begin_src python :tangle naps_utilities/pointcloud.py
def from_pointfields(fields):
return [{'name': field.name,
'offset': field.offset,
'datatype': field.datatype,
'count': field.count}
for field in fields]

self.metadata['fields'] = from_pointfields(msg.fields)
#+end_src
Afin de préparer l'extraction, on initialise des numpy ndarray afin
que tous les points soient dans un espace contigu de la mémoire. Ici
on sépare les points en un tableau de float32 (x, y, z) et un tableau
de (ring). Cela pour faciliter l'encodage décodage (c'est plus
difficile avec des types différents imbriqués)

#+begin_src python :tangle naps_utilities/pointcloud.py
# Données converties
self.metadata['nb_points'] = msg.height * msg.width

data = np.reshape(msg.data, (-1, self.metadata['point_step']))

self.points = np.ndarray(
(self.metadata['nb_points'], 4), dtype=np.float32,
buffer=np.ascontiguousarray(data[:, :16]))

if self.metadata['keep_ring']:
self.metadata['rings'] = np.zeros(
self.metadata['nb_points'], dtype=np.uint16)

pointcloud['rings'] = np.ndarray(
(self.metadata['nb_points']), dtype=np.uint16,
buffer=np.ascontiguousarray(data[:, 16:]))
#+end_src
Mise à jour dé métadonnées si nécessaire :
#+begin_src python :tangle naps_utilities/pointcloud.py
if not self.metadata['keep_ring']:
self.metadata['fields'] = [field for field in self.metadata['fields'] if field['name'] != 'ring']
self.metadata['point_step'] = 16
self.metadata['row_step'] = self.metadata['point_step'] * len(self.metadata['fields'])
self.metadata['is_populated'] = True
#+end_src
** convert to msg
Beaucoup de symétrie avec la fonction précedante.
#+begin_src python :tangle naps_utilities/pointcloud.py
def to_msg(self):
msg = PointCloud2()
#Données conservées "telles quelles"

msg.height = self.metadata['height']
msg.width = self.metadata['width']

msg.is_bigendian = self.metadata['is_bigendian']
msg.point_step = self.metadata['point_step']
msg.row_step = self.metadata['row_step']

msg.is_dense = self.metadata['is_dense']
#+end_src

Conversion vers Header ROS:
#+begin_src python :tangle naps_utilities/pointcloud.py
def to_header(header_data):
return Header(stamp=Time(
sec=header_data['time']['sec'],
nanosec=header_data['time']['nanosec']),
frame_id=header_data['frame_id'])
msg.header = to_header(self.metadata['header'])
#+end_src
Conversion vers Pointfield:
#+begin_src python :tangle naps_utilities/pointcloud.py
def to_pointfields(pointfields_data):
return [PointField(name=field['name'],
offset=field['offset'],
datatype=field['datatype'],
count=field['count']) for field in pointfields_data]
msg.fields = to_pointfields(self.metadata['fields'])
#+end_src
Deux cas, selon la valeur de 'keep_ring':

Si on garde les rings, il faut concatener les deux tableaux et en
faire un array de uint8.
#+begin_src python :tangle naps_utilities/pointcloud.py
if self.metadata['keep_ring']:
msg.data = array('B', np.concatenate(
(self.points.view(dtype=np.uint8),
self.rings.reshape((self.metadata['nb_points'], -1)).view(dtype=np.uint8)),
axis=1).ravel().tolist())
#+end_src
Sinon, il suffi de créer une liste de uint8 à partir des points au
niveau des données.
#+begin_src python :tangle naps_utilities/pointcloud.py
else:
msg.data = array('B', self.points.view(dtype=np.uint8).ravel().tolist())
return msg
#+end_src
** filter pointcloud

Il y a deux cas a traiter, si l'on garde les rings auquel cas il faut
les filter aussi.
#+begin_src python :tangle naps_utilities/pointcloud.py
def filter(self, threshold=10):
if self.metadata['keep_ring']:
concat = np.concatenate((self.points, self.rings.reshape((len(points), 1))), axis=1)
concat = concat[np.logical_and(
np.logical_not(np.isnan(concat).any(axis=1)),
concat[:,3]>=threshold)]
self.points = np.ascontiguousarray(concat[:,:4], dtype=np.float32)
self.rings = np.ascontiguousarray(concat[:,4:], dtype=np.uint16)
#+end_src
Après avoir été filtré, le poincloud ne peut plus être structuré dans
un tableau 2D.
#+begin_src python :tangle naps_utilities/pointcloud.py
else:
self.points = self.points[np.logical_and(
np.logical_not(np.isnan(self.points).any(axis=1)),
self.points[:,3]>=threshold)]
self.metadata['nb_points'] = len(self.points)
self.metadata['height'] = 1
self.metadata['width'] = self.metadata['nb_points']
#+end_src

** transform pointcloud
#+begin_src python :tangle naps_utilities/pointcloud.py
def transform(self, matrix):
if self.metadata['is_populated']:
self.points[:,:3] = np.transpose(
matrix @ np.concatenate((self.points[:,:3].transpose(),
np.ones((1, self.metadata['nb_points'])))))[:,:3]
self.matrix = matrix
self.metadata['procrastinated'] = False
else:
raise TransformWhileEmpty("Populate pointcloud before applying transform to it")
#+end_src
** add points
#+begin_src python :tangle naps_utilities/pointcloud.py
def update(self, pointcloud):
if self.metadata['keep_ring']:
if pointcloud.metadata['keep_ring']:
self.rings = np.ascontiguousarray(np.concatenate((self.rings, pointcloud.rings)))
else:
return False
self.points = np.ascontiguousarray(np.concatenate((self.points, pointcloud.points)))
self.metadata['nb_points'] = len(self.points)
self.metadata['height'] = 1
self.metadata['width'] = self.metadata['nb_points']
return True
#+end_src

** Export/Import
#+begin_src python :tangle naps_utilities/pointcloud.py
def save(self, path):
self.save_npz(path)
#+end_src
*** npz
#+begin_src python :tangle naps_utilities/pointcloud.py
def save_npz(self, path):
save_path = os.path.expanduser(path)
# with open('{}_meta.json'.format(save_path), 'w') as outfile:
# json.dump(self.metadata, outfile, indent=4)

np.savez_compressed('{}'.format(save_path), meta=[self.metadata], points=self.points, rings=self.rings, matrix=self.matrix)
#+end_src

#+begin_src python :tangle naps_utilities/pointcloud.py
def load(self, path):
load_path = os.path.expanduser(path)
# with open('{}_meta.json'.format(load_path), 'r') as infile:
# self.metadata = json.load(infile)

with np.load(load_path, allow_pickle=True) as data:
self.metadata = data['meta'][0]
if 'matrix' in data:
self.matrix = data['matrix']
if 'points' in data:
self.points = data['points']
if 'rings' in data:
self.rings = data['rings']
#+end_src
*** xyz
#+begin_src python :tangle naps_utilities/pointcloud.py
def save_xyz(self, path):
save_path = os.path.expanduser(path)
np.savetxt('{}.xyz'.format(save_path), self.points)
#with open('{}.xyz'.format(save_path), 'w') as outfile:
# json.dump(self.metadata, outfile, indent=4)
#+end_src

* Tests
#+begin_src ipython :session session01 :file :exports both
import numpy as np
from naps_utilities import Pointcloud
points = [[0, 0, 0, 4],
[1, 0, 0, 5],
[0, 1, 0, 105],
[0, 0, 1, 452],]
p = Pointcloud(points=points)
#+end_src

#+RESULTS:
: # Out[9]:

#+begin_src ipython :session session01 :file :exports both
p = Pointcloud(points=points)
#+end_src
* Build et distribution
#+begin_src bash :results value verbatim :exports both
python setup.py bdist_wheel sdist
#+end_src

#+RESULTS:
#+begin_example
running bdist_wheel
running build
running build_py
installing to build/bdist.linux-x86_64/wheel
running install
running install_lib
creating build/bdist.linux-x86_64/wheel
creating build/bdist.linux-x86_64/wheel/naps_utilities
copying build/lib/naps_utilities/pointcloud.py -> build/bdist.linux-x86_64/wheel/naps_utilities
copying build/lib/naps_utilities/__init__.py -> build/bdist.linux-x86_64/wheel/naps_utilities
running install_egg_info
running egg_info
writing naps_utilities.egg-info/PKG-INFO
writing dependency_links to naps_utilities.egg-info/dependency_links.txt
writing requirements to naps_utilities.egg-info/requires.txt
writing top-level names to naps_utilities.egg-info/top_level.txt
reading manifest file 'naps_utilities.egg-info/SOURCES.txt'
writing manifest file 'naps_utilities.egg-info/SOURCES.txt'
Copying naps_utilities.egg-info to build/bdist.linux-x86_64/wheel/naps_utilities-0.1.1-py3.7.egg-info
running install_scripts
creating build/bdist.linux-x86_64/wheel/naps_utilities-0.1.1.dist-info/WHEEL
creating 'dist/naps_utilities-0.1.1-py3-none-any.whl' and adding 'build/bdist.linux-x86_64/wheel' to it
adding 'naps_utilities/__init__.py'
adding 'naps_utilities/pointcloud.py'
adding 'naps_utilities-0.1.1.dist-info/METADATA'
adding 'naps_utilities-0.1.1.dist-info/WHEEL'
adding 'naps_utilities-0.1.1.dist-info/top_level.txt'
adding 'naps_utilities-0.1.1.dist-info/RECORD'
removing build/bdist.linux-x86_64/wheel
running sdist
running check
creating naps_utilities-0.1.1
creating naps_utilities-0.1.1/naps_utilities
creating naps_utilities-0.1.1/naps_utilities.egg-info
copying files to naps_utilities-0.1.1...
copying setup.py -> naps_utilities-0.1.1
copying naps_utilities/__init__.py -> naps_utilities-0.1.1/naps_utilities
copying naps_utilities/pointcloud.py -> naps_utilities-0.1.1/naps_utilities
copying naps_utilities.egg-info/PKG-INFO -> naps_utilities-0.1.1/naps_utilities.egg-info
copying naps_utilities.egg-info/SOURCES.txt -> naps_utilities-0.1.1/naps_utilities.egg-info
copying naps_utilities.egg-info/dependency_links.txt -> naps_utilities-0.1.1/naps_utilities.egg-info
copying naps_utilities.egg-info/requires.txt -> naps_utilities-0.1.1/naps_utilities.egg-info
copying naps_utilities.egg-info/top_level.txt -> naps_utilities-0.1.1/naps_utilities.egg-info
Writing naps_utilities-0.1.1/setup.cfg
Creating tar archive
removing 'naps_utilities-0.1.1' (and everything under it)
#+end_example


#+begin_src bash :results value verbatim :exports both
twine upload dist/*
#+end_src

#+RESULTS:


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for naps-utilities, version 0.2.0
Filename, size File type Python version Upload date Hashes
Filename, size naps_utilities-0.2.0-py3-none-any.whl (10.6 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size naps_utilities-0.2.0.tar.gz (12.1 kB) File type Source Python version None Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page