point_cloud2.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2008, Willow Garage, Inc.
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
10 # are met:
11 #
12 # * Redistributions of source code must retain the above copyright
13 # notice, this list of conditions and the following disclaimer.
14 # * Redistributions in binary form must reproduce the above
15 # copyright notice, this list of conditions and the following
16 # disclaimer in the documentation and/or other materials provided
17 # with the distribution.
18 # * Neither the name of Willow Garage, Inc. nor the names of its
19 # contributors may be used to endorse or promote products derived
20 # from this software without specific prior written permission.
21 #
22 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 # POSSIBILITY OF SUCH DAMAGE.
34 
35 from __future__ import print_function
36 
37 """
38 Serialization of sensor_msgs.PointCloud2 messages.
39 
40 Author: Tim Field
41 """
42 
43 from collections import namedtuple
44 import ctypes
45 import math
46 import struct
47 
48 import roslib.message
49 from sensor_msgs.msg import PointCloud2, PointField
50 
51 _DATATYPES = {}
52 _DATATYPES[PointField.INT8] = ('b', 1)
53 _DATATYPES[PointField.UINT8] = ('B', 1)
54 _DATATYPES[PointField.INT16] = ('h', 2)
55 _DATATYPES[PointField.UINT16] = ('H', 2)
56 _DATATYPES[PointField.INT32] = ('i', 4)
57 _DATATYPES[PointField.UINT32] = ('I', 4)
58 _DATATYPES[PointField.FLOAT32] = ('f', 4)
59 _DATATYPES[PointField.FLOAT64] = ('d', 8)
60 
61 def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):
62  """
63  Read points from a L{sensor_msgs.PointCloud2} message.
64 
65  @param cloud: The point cloud to read from.
66  @type cloud: L{sensor_msgs.PointCloud2}
67  @param field_names: The names of fields to read. If None, read all fields. [default: None]
68  @type field_names: iterable
69  @param skip_nans: If True, then don't return any point with a NaN value.
70  @type skip_nans: bool [default: False]
71  @param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
72  @type uvs: iterable
73  @return: Generator which yields a list of values for each point.
74  @rtype: generator
75  """
76  assert isinstance(cloud, roslib.message.Message) and cloud._type == 'sensor_msgs/PointCloud2', 'cloud is not a sensor_msgs.msg.PointCloud2'
77  fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names)
78  width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.row_step, cloud.data, math.isnan
79  unpack_from = struct.Struct(fmt).unpack_from
80 
81  if skip_nans:
82  if uvs:
83  for u, v in uvs:
84  p = unpack_from(data, (row_step * v) + (point_step * u))
85  has_nan = False
86  for pv in p:
87  if isnan(pv):
88  has_nan = True
89  break
90  if not has_nan:
91  yield p
92  else:
93  for v in range(height):
94  offset = row_step * v
95  for u in range(width):
96  p = unpack_from(data, offset)
97  has_nan = False
98  for pv in p:
99  if isnan(pv):
100  has_nan = True
101  break
102  if not has_nan:
103  yield p
104  offset += point_step
105  else:
106  if uvs:
107  for u, v in uvs:
108  yield unpack_from(data, (row_step * v) + (point_step * u))
109  else:
110  for v in range(height):
111  offset = row_step * v
112  for u in range(width):
113  yield unpack_from(data, offset)
114  offset += point_step
115 
116 def read_points_list(cloud, field_names=None, skip_nans=False, uvs=[]):
117  """
118  Read points from a L{sensor_msgs.PointCloud2} message. This function returns a list of namedtuples.
119  It operates on top of the read_points method. For more efficient access use read_points directly.
120 
121  @param cloud: The point cloud to read from.
122  @type cloud: L{sensor_msgs.PointCloud2}
123  @param field_names: The names of fields to read. If None, read all fields. [default: None]
124  @type field_names: iterable
125  @param skip_nans: If True, then don't return any point with a NaN value.
126  @type skip_nans: bool [default: False]
127  @param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
128  @type uvs: iterable
129  @return: List of namedtuples containing the values for each point
130  @rtype: list
131  """
132  assert isinstance(cloud, roslib.message.Message) and cloud._type == 'sensor_msgs/PointCloud2', 'cloud is not a sensor_msgs.msg.PointCloud2'
133 
134  if field_names is None:
135  field_names = [f.name for f in cloud.fields]
136 
137  Point = namedtuple("Point", field_names)
138 
139  return [Point._make(l) for l in read_points(cloud, field_names, skip_nans, uvs)]
140 
141 def create_cloud(header, fields, points):
142  """
143  Create a L{sensor_msgs.msg.PointCloud2} message.
144 
145  @param header: The point cloud header.
146  @type header: L{std_msgs.msg.Header}
147  @param fields: The point cloud fields.
148  @type fields: iterable of L{sensor_msgs.msg.PointField}
149  @param points: The point cloud points.
150  @type points: list of iterables, i.e. one iterable for each point, with the
151  elements of each iterable being the values of the fields for
152  that point (in the same order as the fields parameter)
153  @return: The point cloud.
154  @rtype: L{sensor_msgs.msg.PointCloud2}
155  """
156 
157  cloud_struct = struct.Struct(_get_struct_fmt(False, fields))
158 
159  buff = ctypes.create_string_buffer(cloud_struct.size * len(points))
160 
161  point_step, pack_into = cloud_struct.size, cloud_struct.pack_into
162  offset = 0
163  for p in points:
164  pack_into(buff, offset, *p)
165  offset += point_step
166 
167  return PointCloud2(header=header,
168  height=1,
169  width=len(points),
170  is_dense=False,
171  is_bigendian=False,
172  fields=fields,
173  point_step=cloud_struct.size,
174  row_step=cloud_struct.size * len(points),
175  data=buff.raw)
176 
177 def create_cloud_xyz32(header, points):
178  """
179  Create a L{sensor_msgs.msg.PointCloud2} message with 3 float32 fields (x, y, z).
180 
181  @param header: The point cloud header.
182  @type header: L{std_msgs.msg.Header}
183  @param points: The point cloud points.
184  @type points: iterable
185  @return: The point cloud.
186  @rtype: L{sensor_msgs.msg.PointCloud2}
187  """
188  fields = [PointField('x', 0, PointField.FLOAT32, 1),
189  PointField('y', 4, PointField.FLOAT32, 1),
190  PointField('z', 8, PointField.FLOAT32, 1)]
191  return create_cloud(header, fields, points)
192 
193 def _get_struct_fmt(is_bigendian, fields, field_names=None):
194  fmt = '>' if is_bigendian else '<'
195 
196  offset = 0
197  for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names):
198  if offset < field.offset:
199  fmt += 'x' * (field.offset - offset)
200  offset = field.offset
201  if field.datatype not in _DATATYPES:
202  print('Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr)
203  else:
204  datatype_fmt, datatype_length = _DATATYPES[field.datatype]
205  fmt += field.count * datatype_fmt
206  offset += field.count * datatype_length
207 
208  return fmt
def read_points_list(cloud, field_names=None, skip_nans=False, uvs=[])
def create_cloud_xyz32(header, points)
def _get_struct_fmt(is_bigendian, fields, field_names=None)
def create_cloud(header, fields, points)
def read_points(cloud, field_names=None, skip_nans=False, uvs=[])
Definition: point_cloud2.py:61


sensor_msgs
Author(s): Tully Foote
autogenerated on Mon Feb 28 2022 22:12:14