18 from typing
import Dict, Optional
19 from pathlib
import Path
20 from scipy.stats
import multivariate_normal
22 from dataclasses
import dataclass
27 warnings.filterwarnings(
29 message=
"Unable to import Axes3D. This may be due to multiple " "versions of",
31 import matplotlib.pyplot
as plt
34 @dataclass(frozen=
True)
40 @dataclass(frozen=
True)
43 Wrapper around scipy's multivariate normal distribution implementation.
45 Allows for equality checks and typing hints.
49 covariance: np.ndarray
52 """Get its scipy representation."""
53 return multivariate_normal(mean=self.mean, cov=self.covariance)
55 def is_close(self, other:
"NormalDistribution", abs_tol: float = 1e-8) -> bool:
56 """Element-wise equality with tolerance."""
57 return np.allclose(self.mean, other.mean, atol=abs_tol)
and np.allclose(
58 self.covariance, other.covariance, atol=abs_tol
64 """Create a new NDT map with cell size of 'resolution'."""
66 self.grid: Dict[DiscreteCell, NormalDistribution] = {}
69 """Add a new cell with its distribution."""
78 for discrete_cell
in self.grid.keys():
94 step_x = (max_x - min_x) / 100.0
95 step_y = (max_y - min_y) / 100.0
97 xx, yy = np.mgrid[min_x:max_x:step_x, min_y:max_y:step_y]
98 return (xx, yy, np.dstack((xx, yy)))
100 def is_close(self, other:
"NDTMap", abs_tol: float = 1e-8) -> bool:
101 """Equality with tolerance between two NDT maps."""
102 is_resolution_close = np.allclose(
105 if not is_resolution_close:
107 if len(self.grid) != len(other.grid):
109 for cell, distribution
in self.grid.items():
110 if cell
not in other.grid:
112 if not distribution.is_close(other=other.grid[cell], abs_tol=abs_tol):
116 def plot(self, show: bool =
False) -> plt.figure:
118 Plot the map using contour plots into the current figure and returns it.
120 Optionally show the plot based on the 'show' parameter.
124 for ndt
in self.grid.values():
128 ndt.to_scipy().pdf(pos),
140 Serialize the NDT map into an HDF5 format.
142 See https://docs.hdfgroup.org/hdf5/develop/_intro_h_d_f5.html for details on the format.
143 It'll create 4 datasets:
144 - "resolution": () resolution for the discrete grid (cells are resolution x
145 resolution m^2 squares).
146 - "cells": (NUM_CELLS, 2) that contains the cell coordinates.
147 - "means": (NUM_CELLS, 2) that contains the 2d mean of the normal distribution
149 - "covariances": (NUM_CELLS, 2, 2) Contains the covariance for each cell.
151 assert output_file_path.suffix
in (
".hdf5",
".h5")
152 output_file_path.parent.mkdir(parents=
True, exist_ok=
True)
154 num_cells = len(self.grid.keys())
156 with h5py.File(output_file_path.absolute(),
"w")
as fp:
157 cells_dataset = fp.create_dataset(
"cells", (num_cells, 2), chunks=
True)
158 for idx, cell
in enumerate(self.grid.keys()):
159 cells_dataset[idx] = np.asarray([cell.x, cell.y])
161 means_dataset = fp.create_dataset(
"means", (num_cells, 2), chunks=
True)
162 covariances_dataset = fp.create_dataset(
"covariances", (num_cells, 2, 2))
164 for idx, distribution
in enumerate(self.grid.values()):
165 means_dataset[idx] = distribution.mean
166 covariances_dataset[idx] = distribution.covariance
167 fp.create_dataset(
"resolution", data=np.asarray(self.
_resolution))
172 Create an NDTMap instance from a path to a hdf5 file.
174 See 'to_hdf5' docstring for details on the hdf5 format.
176 with h5py.File(hdf5_file.absolute(),
"r")
as fp:
177 resolution: float = fp[
"resolution"][()]
178 means: np.ndarray = fp[
"means"]
179 cells: np.ndarray = fp[
"cells"]
180 covs: np.ndarray = fp[
"covariances"]
181 total_cells = covs.shape[0]
183 assert covs.shape[1] == 2
184 assert covs.shape[2] == 2
185 assert means.shape[1] == 2
186 assert cells.shape[0] == total_cells
187 assert cells.shape[1] == 2
188 assert means.shape[0] == total_cells
190 ret =
NDTMap(resolution=resolution)
191 for cell, mean, cov
in zip(cells, means, covs):
192 ret.add_distribution(
207 """Create an grid from a yaml file describing a ROS style occupancy grid."""
208 assert yaml_path.is_file(), f
"file {yaml_path} doesn't exist."
209 assert yaml_path.suffix ==
".yaml"
211 with open(yaml_path,
"rb")
as fp:
212 data = yaml.safe_load(fp)
214 pgm_path: Path = yaml_path.parent / data[
"image"]
215 origin = np.asarray(data[
"origin"])
216 assert np.allclose(origin[2], 0),
"This tool doesn't support rotated maps."
217 assert pgm_path.is_file(), f
".pgm file at {pgm_path} does not exist"
219 with open(yaml_path.parent / data[
"image"],
"rb")
as pgmf:
220 grid = plt.imread(pgmf)
222 return OccupancyGrid(resolution=data[
"resolution"], origin=origin, grid=grid)
227 Convert an OccupancyGrid's occupied cells to a 2D point cloud.
229 Uses the center of the cell for the conversion to reduce max error.
231 occupied_cells_indices = np.asarray(
232 np.where(occupancy_grid.grid == 0)
234 res = occupancy_grid.resolution
237 points = occupied_cells_indices * res + (res / 2)
239 points[0] += occupancy_grid.origin[0]
240 points[1] += occupancy_grid.origin[1]
245 points: np.ndarray, min_variance: float = 5e-3
246 ) -> Optional[NormalDistribution]:
248 Fit a normal distribution to a set of 2D points.
250 A minimum variance in each dimension will be enforced to avoid singularities.
252 assert points.shape[0] == 2
257 if points.shape[1] <= 5:
259 mean = points.mean(axis=1)
260 covariance = np.cov(points)
263 covariance[0, 0] = max(covariance[0, 0], min_variance)
264 covariance[1, 1] = max(covariance[1, 1], min_variance)
271 Convert a 2D point cloud into a NDT map representation.
273 Does so by clustering points in 2D cells of {cell_size} * {cell_size} meters^2
274 and fitting a normal distribution when applicable.
276 assert pc.shape[0] == 2
277 points_clusters: Dict[DiscreteCell, np.ndarray] = {}
278 discretized_points = np.floor(pc / cell_size).astype(np.int64)
279 cells = np.unique(discretized_points, axis=-1).T
282 pts_in_cell = np.all(discretized_points.T == cell, axis=1)
283 points_clusters[
DiscreteCell(x=cell[0], y=cell[1])] = pc[:, pts_in_cell]
285 ret =
NDTMap(resolution=cell_size)
287 for cell, points
in points_clusters.items():
290 ret.add_distribution(cell=cell, ndt=dist)