conversion_utils.py
Go to the documentation of this file.
1 # Copyright 2024 Ekumen, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import numpy as np
16 import h5py
17 import yaml
18 from typing import Dict, Optional
19 from pathlib import Path
20 from scipy.stats import multivariate_normal
21 
22 from dataclasses import dataclass
23 
24 # Remove annoying matplotlib on import warnings.
25 import warnings
26 
27 warnings.filterwarnings(
28  "ignore",
29  message="Unable to import Axes3D. This may be due to multiple " "versions of",
30 )
31 import matplotlib.pyplot as plt # noqa: E402: Disable import not at top level.
32 
33 
34 @dataclass(frozen=True)
36  x: int
37  y: int
38 
39 
40 @dataclass(frozen=True)
42  """
43  Wrapper around scipy's multivariate normal distribution implementation.
44 
45  Allows for equality checks and typing hints.
46  """
47 
48  mean: np.ndarray
49  covariance: np.ndarray
50 
51  def to_scipy(self):
52  """Get its scipy representation."""
53  return multivariate_normal(mean=self.mean, cov=self.covariance)
54 
55  def is_close(self, other: "NormalDistribution", abs_tol: float = 1e-8) -> bool:
56  """Element-wise equality with tolerance."""
57  return np.allclose(self.mean, other.mean, atol=abs_tol) and np.allclose(
58  self.covariance, other.covariance, atol=abs_tol
59  )
60 
61 
62 class NDTMap:
63  def __init__(self, resolution: float) -> None:
64  """Create a new NDT map with cell size of 'resolution'."""
65  self._resolution = resolution
66  self.grid: Dict[DiscreteCell, NormalDistribution] = {}
67 
68  def add_distribution(self, cell: DiscreteCell, ndt: NormalDistribution):
69  """Add a new cell with its distribution."""
70  self.grid[cell] = ndt
71 
72  def _get_plot_bounds(self):
73  min_x = float("inf")
74  min_y = float("inf")
75 
76  max_x = -float("inf")
77  max_y = -float("inf")
78  for discrete_cell in self.grid.keys():
79  x = discrete_cell.x * self._resolution + self._resolution / 2
80  y = discrete_cell.y * self._resolution + self._resolution / 2
81  min_x = min(min_x, x)
82  max_x = max(max_x, x)
83 
84  min_y = min(min_y, y)
85  max_y = max(max_y, y)
86 
87  PADDING = 1 # [m]
88 
89  min_x -= PADDING
90  max_x += PADDING
91  min_y -= PADDING
92  max_y += PADDING
93 
94  step_x = (max_x - min_x) / 100.0
95  step_y = (max_y - min_y) / 100.0
96 
97  xx, yy = np.mgrid[min_x:max_x:step_x, min_y:max_y:step_y]
98  return (xx, yy, np.dstack((xx, yy)))
99 
100  def is_close(self, other: "NDTMap", abs_tol: float = 1e-8) -> bool:
101  """Equality with tolerance between two NDT maps."""
102  is_resolution_close = np.allclose(
103  self._resolution, other._resolution, atol=abs_tol
104  )
105  if not is_resolution_close:
106  return False
107  if len(self.grid) != len(other.grid):
108  return False
109  for cell, distribution in self.grid.items():
110  if cell not in other.grid:
111  return False
112  if not distribution.is_close(other=other.grid[cell], abs_tol=abs_tol):
113  return False
114  return True
115 
116  def plot(self, show: bool = False) -> plt.figure:
117  """
118  Plot the map using contour plots into the current figure and returns it.
119 
120  Optionally show the plot based on the 'show' parameter.
121  """
122  xx, yy, pos = self._get_plot_bounds()
123 
124  for ndt in self.grid.values():
125  plt.contour(
126  xx,
127  yy,
128  ndt.to_scipy().pdf(pos),
129  [0.1, 0.5],
130  alpha=0.2,
131  linewidths=3,
132  extend="max",
133  )
134  if show:
135  plt.show()
136  return plt.gcf()
137 
138  def to_hdf5(self, output_file_path: Path):
139  """
140  Serialize the NDT map into an HDF5 format.
141 
142  See https://docs.hdfgroup.org/hdf5/develop/_intro_h_d_f5.html for details on the format.
143  It'll create 4 datasets:
144  - "resolution": () resolution for the discrete grid (cells are resolution x
145  resolution m^2 squares).
146  - "cells": (NUM_CELLS, 2) that contains the cell coordinates.
147  - "means": (NUM_CELLS, 2) that contains the 2d mean of the normal distribution
148  of the cell.
149  - "covariances": (NUM_CELLS, 2, 2) Contains the covariance for each cell.
150  """
151  assert output_file_path.suffix in (".hdf5", ".h5")
152  output_file_path.parent.mkdir(parents=True, exist_ok=True)
153 
154  num_cells = len(self.grid.keys())
155 
156  with h5py.File(output_file_path.absolute(), "w") as fp:
157  cells_dataset = fp.create_dataset("cells", (num_cells, 2), chunks=True)
158  for idx, cell in enumerate(self.grid.keys()):
159  cells_dataset[idx] = np.asarray([cell.x, cell.y])
160 
161  means_dataset = fp.create_dataset("means", (num_cells, 2), chunks=True)
162  covariances_dataset = fp.create_dataset("covariances", (num_cells, 2, 2))
163 
164  for idx, distribution in enumerate(self.grid.values()):
165  means_dataset[idx] = distribution.mean
166  covariances_dataset[idx] = distribution.covariance
167  fp.create_dataset("resolution", data=np.asarray(self._resolution))
168 
169  @classmethod
170  def from_hdf5(cls, hdf5_file: Path):
171  """
172  Create an NDTMap instance from a path to a hdf5 file.
173 
174  See 'to_hdf5' docstring for details on the hdf5 format.
175  """
176  with h5py.File(hdf5_file.absolute(), "r") as fp:
177  resolution: float = fp["resolution"][()]
178  means: np.ndarray = fp["means"]
179  cells: np.ndarray = fp["cells"]
180  covs: np.ndarray = fp["covariances"]
181  total_cells = covs.shape[0]
182 
183  assert covs.shape[1] == 2
184  assert covs.shape[2] == 2
185  assert means.shape[1] == 2
186  assert cells.shape[0] == total_cells
187  assert cells.shape[1] == 2
188  assert means.shape[0] == total_cells
189 
190  ret = NDTMap(resolution=resolution)
191  for cell, mean, cov in zip(cells, means, covs):
192  ret.add_distribution(
193  cell=DiscreteCell(*cell),
194  ndt=NormalDistribution(mean=mean, covariance=cov),
195  )
196  return ret
197 
198 
199 @dataclass
201  resolution: float
202  origin: np.ndarray
203  grid: np.ndarray
204 
205  @staticmethod
206  def load_from_file(yaml_path: Path) -> "OccupancyGrid":
207  """Create an grid from a yaml file describing a ROS style occupancy grid."""
208  assert yaml_path.is_file(), f"file {yaml_path} doesn't exist."
209  assert yaml_path.suffix == ".yaml"
210 
211  with open(yaml_path, "rb") as fp:
212  data = yaml.safe_load(fp)
213 
214  pgm_path: Path = yaml_path.parent / data["image"]
215  origin = np.asarray(data["origin"])
216  assert np.allclose(origin[2], 0), "This tool doesn't support rotated maps."
217  assert pgm_path.is_file(), f".pgm file at {pgm_path} does not exist"
218 
219  with open(yaml_path.parent / data["image"], "rb") as pgmf:
220  grid = plt.imread(pgmf)
221 
222  return OccupancyGrid(resolution=data["resolution"], origin=origin, grid=grid)
223 
224 
225 def grid_to_point_cloud(occupancy_grid: OccupancyGrid) -> np.ndarray:
226  """
227  Convert an OccupancyGrid's occupied cells to a 2D point cloud.
228 
229  Uses the center of the cell for the conversion to reduce max error.
230  """
231  occupied_cells_indices = np.asarray(
232  np.where(occupancy_grid.grid == 0)
233  ) # in ROS maps 0 == occupied.
234  res = occupancy_grid.resolution
235 
236  # Discretized occupied cells using the center of the cell.
237  points = occupied_cells_indices * res + (res / 2)
238  # Compensate for origin
239  points[0] += occupancy_grid.origin[0]
240  points[1] += occupancy_grid.origin[1]
241  return points
242 
243 
245  points: np.ndarray, min_variance: float = 5e-3
246 ) -> Optional[NormalDistribution]:
247  """
248  Fit a normal distribution to a set of 2D points.
249 
250  A minimum variance in each dimension will be enforced to avoid singularities.
251  """
252  assert points.shape[0] == 2
253  # Literature suggests doing this check to avoid singularities.
254  # See The Three-Dimensional Normal-Distributions Transform– an Efficient
255  # Representation for Registration, Surface Analysis, and Loop Detection
256  # by Martin Magnusson, 2009 chapter 6.
257  if points.shape[1] <= 5:
258  return None
259  mean = points.mean(axis=1)
260  covariance = np.cov(points)
261 
262  # avoid singularities by enforcing a minimum variance in both dimensions.
263  covariance[0, 0] = max(covariance[0, 0], min_variance)
264  covariance[1, 1] = max(covariance[1, 1], min_variance)
265 
266  return NormalDistribution(mean=mean, covariance=covariance)
267 
268 
269 def point_cloud_to_ndt(pc: np.ndarray, cell_size=1.0) -> NDTMap:
270  """
271  Convert a 2D point cloud into a NDT map representation.
272 
273  Does so by clustering points in 2D cells of {cell_size} * {cell_size} meters^2
274  and fitting a normal distribution when applicable.
275  """
276  assert pc.shape[0] == 2
277  points_clusters: Dict[DiscreteCell, np.ndarray] = {}
278  discretized_points = np.floor(pc / cell_size).astype(np.int64)
279  cells = np.unique(discretized_points, axis=-1).T
280 
281  for cell in cells:
282  pts_in_cell = np.all(discretized_points.T == cell, axis=1)
283  points_clusters[DiscreteCell(x=cell[0], y=cell[1])] = pc[:, pts_in_cell]
284 
285  ret = NDTMap(resolution=cell_size)
286 
287  for cell, points in points_clusters.items():
288  dist = fit_normal_distribution(points)
289  if dist is not None:
290  ret.add_distribution(cell=cell, ndt=dist)
291  return ret
beluga_ros.conversion_utils.NDTMap.to_hdf5
def to_hdf5(self, Path output_file_path)
Definition: conversion_utils.py:138
beluga_ros.conversion_utils.grid_to_point_cloud
np.ndarray grid_to_point_cloud(OccupancyGrid occupancy_grid)
Definition: conversion_utils.py:225
beluga_ros.conversion_utils.NDTMap._get_plot_bounds
def _get_plot_bounds(self)
Definition: conversion_utils.py:72
beluga_ros.conversion_utils.NDTMap.is_close
bool is_close(self, "NDTMap" other, float abs_tol=1e-8)
Definition: conversion_utils.py:100
beluga_ros.conversion_utils.OccupancyGrid.load_from_file
"OccupancyGrid" load_from_file(Path yaml_path)
Definition: conversion_utils.py:206
beluga_ros.conversion_utils.point_cloud_to_ndt
NDTMap point_cloud_to_ndt(np.ndarray pc, cell_size=1.0)
Definition: conversion_utils.py:269
beluga_ros.conversion_utils.NDTMap.add_distribution
def add_distribution(self, DiscreteCell cell, NormalDistribution ndt)
Definition: conversion_utils.py:68
beluga_ros.conversion_utils.NDTMap
Definition: conversion_utils.py:62
beluga_ros.conversion_utils.NormalDistribution.to_scipy
def to_scipy(self)
Definition: conversion_utils.py:51
beluga_ros.conversion_utils.NDTMap.__init__
None __init__(self, float resolution)
Definition: conversion_utils.py:63
beluga_ros.conversion_utils.DiscreteCell
Definition: conversion_utils.py:35
beluga_ros.conversion_utils.NormalDistribution.is_close
bool is_close(self, "NormalDistribution" other, float abs_tol=1e-8)
Definition: conversion_utils.py:55
beluga_ros.conversion_utils.NDTMap.plot
plt.figure plot(self, bool show=False)
Definition: conversion_utils.py:116
beluga_ros.conversion_utils.OccupancyGrid
Definition: conversion_utils.py:200
beluga_ros.conversion_utils.fit_normal_distribution
Optional[NormalDistribution] fit_normal_distribution(np.ndarray points, float min_variance=5e-3)
Definition: conversion_utils.py:244
beluga_ros.conversion_utils.NormalDistribution
Definition: conversion_utils.py:41
beluga_ros.conversion_utils.NDTMap._resolution
_resolution
Definition: conversion_utils.py:65
beluga_ros.conversion_utils.NDTMap.from_hdf5
def from_hdf5(cls, Path hdf5_file)
Definition: conversion_utils.py:170


beluga_ros
Author(s):
autogenerated on Tue Jul 16 2024 03:00:02