35 from __future__
import print_function
38 Serialization of sensor_msgs.PointCloud2 messages.
43 from collections
import namedtuple
49 from sensor_msgs.msg
import PointCloud2, PointField
52 _DATATYPES[PointField.INT8] = (
'b', 1)
53 _DATATYPES[PointField.UINT8] = (
'B', 1)
54 _DATATYPES[PointField.INT16] = (
'h', 2)
55 _DATATYPES[PointField.UINT16] = (
'H', 2)
56 _DATATYPES[PointField.INT32] = (
'i', 4)
57 _DATATYPES[PointField.UINT32] = (
'I', 4)
58 _DATATYPES[PointField.FLOAT32] = (
'f', 4)
59 _DATATYPES[PointField.FLOAT64] = (
'd', 8)
61 def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):
63 Read points from a L{sensor_msgs.PointCloud2} message.
65 @param cloud: The point cloud to read from.
66 @type cloud: L{sensor_msgs.PointCloud2}
67 @param field_names: The names of fields to read. If None, read all fields. [default: None]
68 @type field_names: iterable
69 @param skip_nans: If True, then don't return any point with a NaN value.
70 @type skip_nans: bool [default: False]
71 @param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
73 @return: Generator which yields a list of values for each point.
76 assert isinstance(cloud, roslib.message.Message)
and cloud._type ==
'sensor_msgs/PointCloud2',
'cloud is not a sensor_msgs.msg.PointCloud2'
78 width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.row_step, cloud.data, math.isnan
79 unpack_from = struct.Struct(fmt).unpack_from
84 p = unpack_from(data, (row_step * v) + (point_step * u))
93 for v
in range(height):
95 for u
in range(width):
96 p = unpack_from(data, offset)
108 yield unpack_from(data, (row_step * v) + (point_step * u))
110 for v
in range(height):
111 offset = row_step * v
112 for u
in range(width):
113 yield unpack_from(data, offset)
118 Read points from a L{sensor_msgs.PointCloud2} message. This function returns a list of namedtuples.
119 It operates on top of the read_points method. For more efficient access use read_points directly.
121 @param cloud: The point cloud to read from.
122 @type cloud: L{sensor_msgs.PointCloud2}
123 @param field_names: The names of fields to read. If None, read all fields. [default: None]
124 @type field_names: iterable
125 @param skip_nans: If True, then don't return any point with a NaN value.
126 @type skip_nans: bool [default: False]
127 @param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
129 @return: List of namedtuples containing the values for each point
132 assert isinstance(cloud, roslib.message.Message)
and cloud._type ==
'sensor_msgs/PointCloud2',
'cloud is not a sensor_msgs.msg.PointCloud2'
134 if field_names
is None:
135 field_names = [f.name
for f
in cloud.fields]
137 Point = namedtuple(
"Point", field_names)
139 return [Point._make(l)
for l
in read_points(cloud, field_names, skip_nans, uvs)]
143 Create a L{sensor_msgs.msg.PointCloud2} message.
145 @param header: The point cloud header.
146 @type header: L{std_msgs.msg.Header}
147 @param fields: The point cloud fields.
148 @type fields: iterable of L{sensor_msgs.msg.PointField}
149 @param points: The point cloud points.
150 @type points: list of iterables, i.e. one iterable for each point, with the
151 elements of each iterable being the values of the fields for
152 that point (in the same order as the fields parameter)
153 @return: The point cloud.
154 @rtype: L{sensor_msgs.msg.PointCloud2}
159 buff = ctypes.create_string_buffer(cloud_struct.size * len(points))
161 point_step, pack_into = cloud_struct.size, cloud_struct.pack_into
164 pack_into(buff, offset, *p)
167 return PointCloud2(header=header,
173 point_step=cloud_struct.size,
174 row_step=cloud_struct.size * len(points),
179 Create a L{sensor_msgs.msg.PointCloud2} message with 3 float32 fields (x, y, z).
181 @param header: The point cloud header.
182 @type header: L{std_msgs.msg.Header}
183 @param points: The point cloud points.
184 @type points: iterable
185 @return: The point cloud.
186 @rtype: L{sensor_msgs.msg.PointCloud2}
188 fields = [PointField(
'x', 0, PointField.FLOAT32, 1),
189 PointField(
'y', 4, PointField.FLOAT32, 1),
190 PointField(
'z', 8, PointField.FLOAT32, 1)]
194 fmt =
'>' if is_bigendian
else '<'
197 for field
in (f
for f
in sorted(fields, key=
lambda f: f.offset)
if field_names
is None or f.name
in field_names):
198 if offset < field.offset:
199 fmt +=
'x' * (field.offset - offset)
200 offset = field.offset
201 if field.datatype
not in _DATATYPES:
202 print(
'Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr)
204 datatype_fmt, datatype_length = _DATATYPES[field.datatype]
205 fmt += field.count * datatype_fmt
206 offset += field.count * datatype_length