$search
00001 #!/usr/bin/env python 00002 00003 """ 00004 Serialization of sensor_msgs.PointCloud2 messages. 00005 00006 Author: Tim Field 00007 """ 00008 00009 import roslib; roslib.load_manifest('sensor_msgs') 00010 00011 import ctypes 00012 import math 00013 import struct 00014 00015 from sensor_msgs.msg import PointCloud2, PointField 00016 00017 _DATATYPES = {} 00018 _DATATYPES[PointField.INT8] = ('b', 1) 00019 _DATATYPES[PointField.UINT8] = ('B', 1) 00020 _DATATYPES[PointField.INT16] = ('h', 2) 00021 _DATATYPES[PointField.UINT16] = ('H', 2) 00022 _DATATYPES[PointField.INT32] = ('i', 4) 00023 _DATATYPES[PointField.UINT32] = ('I', 4) 00024 _DATATYPES[PointField.FLOAT32] = ('f', 4) 00025 _DATATYPES[PointField.FLOAT64] = ('d', 8) 00026 00027 def read_points(cloud, field_names=None, skip_nans=False, uvs=[]): 00028 """ 00029 Read points from a L{sensor_msgs.PointCloud2} message. 00030 00031 @param cloud: The point cloud to read from. 00032 @type cloud: L{sensor_msgs.PointCloud2} 00033 @param field_names: The names of fields to read. If None, read all fields. [default: None] 00034 @type field_names: iterable 00035 @param skip_nans: If True, then don't return any point with a NaN value. 00036 @type skip_nans: bool [default: False] 00037 @param uvs: If specified, then only return the points at the given coordinates. [default: empty list] 00038 @type uvs: iterable 00039 @return: Generator which yields a list of values for each point. 00040 @rtype: generator 00041 """ 00042 assert isinstance(cloud, roslib.message.Message) and cloud._type == 'sensor_msgs/PointCloud2', 'cloud is not a sensor_msgs.msg.PointCloud2' 00043 fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names) 00044 width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.row_step, cloud.data, math.isnan 00045 unpack_from = struct.Struct(fmt).unpack_from 00046 00047 if skip_nans: 00048 if uvs: 00049 for u, v in uvs: 00050 p = unpack_from(data, (row_step * v) + (point_step * u)) 00051 has_nan = False 00052 for pv in p: 00053 if isnan(pv): 00054 has_nan = True 00055 break 00056 if not has_nan: 00057 yield p 00058 else: 00059 for v in xrange(height): 00060 offset = row_step * v 00061 for u in xrange(width): 00062 p = unpack_from(data, offset) 00063 has_nan = False 00064 for pv in p: 00065 if isnan(pv): 00066 has_nan = True 00067 break 00068 if not has_nan: 00069 yield p 00070 offset += point_step 00071 else: 00072 if uvs: 00073 for u, v in uvs: 00074 yield unpack_from(data, (row_step * v) + (point_step * u)) 00075 else: 00076 for v in xrange(height): 00077 offset = row_step * v 00078 for u in xrange(width): 00079 yield unpack_from(data, offset) 00080 offset += point_step 00081 00082 def create_cloud(header, fields, points): 00083 """ 00084 Create a L{sensor_msgs.msg.PointCloud2} message. 00085 00086 @param header: The point cloud header. 00087 @type header: L{std_msgs.msg.Header} 00088 @param fields: The point cloud fields. 00089 @type fields: iterable of L{sensor_msgs.msg.PointField} 00090 @param points: The point cloud points. 00091 @type points: list 00092 @return: The point cloud. 00093 @rtype: L{sensor_msgs.msg.PointCloud2} 00094 """ 00095 00096 cloud_struct = struct.Struct(_get_struct_fmt(False, fields)) 00097 00098 buff = ctypes.create_string_buffer(cloud_struct.size * len(points)) 00099 00100 point_step, pack_into = cloud_struct.size, cloud_struct.pack_into 00101 offset = 0 00102 for p in points: 00103 pack_into(buff, offset, *p) 00104 offset += point_step 00105 00106 return PointCloud2(header=header, 00107 height=1, 00108 width=len(points), 00109 is_dense=False, 00110 is_bigendian=False, 00111 fields=fields, 00112 point_step=cloud_struct.size, 00113 row_step=cloud_struct.size * len(points), 00114 data=buff.raw) 00115 00116 def create_cloud_xyz32(header, points): 00117 """ 00118 Create a L{sensor_msgs.msg.PointCloud2} message with 3 float32 fields (x, y, z). 00119 00120 @param header: The point cloud header. 00121 @type header: L{std_msgs.msg.Header} 00122 @param points: The point cloud points. 00123 @type points: iterable 00124 @return: The point cloud. 00125 @rtype: L{sensor_msgs.msg.PointCloud2} 00126 """ 00127 fields = [PointField('x', 0, PointField.FLOAT32, 1), 00128 PointField('y', 4, PointField.FLOAT32, 1), 00129 PointField('z', 8, PointField.FLOAT32, 1)] 00130 return create_cloud(header, fields, points) 00131 00132 def _get_struct_fmt(is_bigendian, fields, field_names=None): 00133 fmt = '>' if is_bigendian else '<' 00134 00135 offset = 0 00136 for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names): 00137 if offset < field.offset: 00138 fmt += 'x' * (field.offset - offset) 00139 offset = field.offset 00140 if field.datatype not in _DATATYPES: 00141 print >> sys.stderr, 'Skipping unknown PointField datatype [%d]' % field.datatype 00142 else: 00143 datatype_fmt, datatype_length = _DATATYPES[field.datatype] 00144 fmt += field.count * datatype_fmt 00145 offset += field.count * datatype_length 00146 00147 return fmt