sick_scan_api_converter.py
Go to the documentation of this file.
1 #
2 # Data conversion utilities for sick_scan_api
3 #
4 import ctypes
5 import math
6 import numpy as np
7 import os
8 import sys
9 import sick_scan_api
10 from sick_scan_api import *
11 
12 # set __ROS_VERSION to 0 (no ROS), 1 (publish ROS-1 pointclouds), or 2 (ROS-2)
13 __ROS_VERSION = os.getenv("ROS_VERSION")
14 if __ROS_VERSION is None:
15  __ROS_VERSION = 0
16 else:
17  __ROS_VERSION = int(__ROS_VERSION)
18 if __ROS_VERSION == 1:
19  import rospy
20  from rospy import Duration
21  from sensor_msgs import point_cloud2
22 elif __ROS_VERSION == 2:
23  from rclpy.duration import Duration
24 from std_msgs.msg import ColorRGBA
25 from geometry_msgs.msg import Point
26 from sensor_msgs.msg import PointCloud2, PointField
27 from visualization_msgs.msg import Marker, MarkerArray
28 
29 # Convert a cartesian SickScanPointCloudMsg to ros sensor_msgs.msg.PointCloud2
31 
32  if __ROS_VERSION == 0:
33  print("## ERROR: SickScanApiConvertPointCloudToROS not implemented (ROS_VERSION = {})".format(__ROS_VERSION))
34  return None
35  # Copy pointcloud header and dimensions
36  ros_pointcloud = PointCloud2()
37  if __ROS_VERSION == 1:
38  ros_pointcloud.header.seq = api_pointcloud.header.seq
39  ros_pointcloud.header.stamp.secs = api_pointcloud.header.timestamp_sec
40  ros_pointcloud.header.stamp.nsecs = api_pointcloud.header.timestamp_nsec
41  elif __ROS_VERSION == 2:
42  ros_pointcloud.header.stamp.sec = api_pointcloud.header.timestamp_sec
43  ros_pointcloud.header.stamp.nanosec = api_pointcloud.header.timestamp_nsec
44  ros_pointcloud.header.frame_id = ctypesCharArrayToString(api_pointcloud.header.frame_id)
45  ros_pointcloud.width = api_pointcloud.width
46  ros_pointcloud.height = api_pointcloud.height
47  ros_pointcloud.is_bigendian = (api_pointcloud.is_bigendian > 0)
48  ros_pointcloud.is_dense = (api_pointcloud.is_dense > 0)
49  ros_pointcloud.point_step = api_pointcloud.point_step
50  ros_pointcloud.row_step = api_pointcloud.row_step
51 
52  # Copy pointcloud fields
53  num_fields = api_pointcloud.fields.size
54  msg_fields_buffer = api_pointcloud.fields.buffer
55  ros_pointcloud.fields = [PointField()] * num_fields
56  for n in range(num_fields):
57  ros_pointcloud.fields[n] = PointField(name = ctypesCharArrayToString(msg_fields_buffer[n].name), offset = msg_fields_buffer[n].offset, datatype = msg_fields_buffer[n].datatype, count = msg_fields_buffer[n].count)
58 
59  # Copy pointcloud data
60  cloud_data_buffer_len = (ros_pointcloud.row_step * ros_pointcloud.height) # length of cloud data in byte
61  assert(api_pointcloud.data.size == cloud_data_buffer_len)
62  cloud_data_buffer = bytearray(cloud_data_buffer_len)
63  for n in range(cloud_data_buffer_len):
64  cloud_data_buffer[n] = api_pointcloud.data.buffer[n]
65  cloud_data = np.frombuffer(cloud_data_buffer, dtype = np.uint8, count = cloud_data_buffer_len)
66 
67  ros_pointcloud.data = cloud_data.tostring()
68  return ros_pointcloud
69 
70 # Convert a polar SickScanPointCloudMsg to ros sensor_msgs.msg.PointCloud2
72 
73  if __ROS_VERSION != 1:
74  print("## ERROR: SickScanApiConvertPointCloudToROS not implemented (ROS_VERSION = {})".format(__ROS_VERSION))
75  return None
76  # Copy pointcloud header
77  ros_pointcloud = PointCloud2()
78  ros_pointcloud.header.seq = api_pointcloud.header.seq
79  ros_pointcloud.header.stamp.secs = api_pointcloud.header.timestamp_sec
80  ros_pointcloud.header.stamp.nsecs = api_pointcloud.header.timestamp_nsec
81  ros_pointcloud.header.frame_id = ctypesCharArrayToString(api_pointcloud.header.frame_id)
82  ros_pointcloud.width = api_pointcloud.width
83  ros_pointcloud.height = api_pointcloud.height
84  ros_pointcloud.is_bigendian = api_pointcloud.is_bigendian
85  ros_pointcloud.is_dense = api_pointcloud.is_dense
86 
87  # Create pointcloud fields
88  num_fields = api_pointcloud.fields.size
89  msg_fields_buffer = api_pointcloud.fields.buffer
90  field_offset_range = -1
91  field_offset_azimuth = -1
92  field_offset_elevation = -1
93  field_offset_intensity = -1
94  for n in range(num_fields):
95  point_field = PointField(ctypesCharArrayToString(msg_fields_buffer[n].name), msg_fields_buffer[n].offset, msg_fields_buffer[n].datatype, msg_fields_buffer[n].count)
96  if point_field.name == "range" and point_field.datatype == PointField.FLOAT32:
97  field_offset_range = msg_fields_buffer[n].offset
98  elif point_field.name == "azimuth" and point_field.datatype == PointField.FLOAT32:
99  field_offset_azimuth = msg_fields_buffer[n].offset
100  elif point_field.name == "elevation" and point_field.datatype == PointField.FLOAT32:
101  field_offset_elevation = msg_fields_buffer[n].offset
102  elif point_field.name == "intensity" and point_field.datatype == PointField.FLOAT32:
103  field_offset_intensity = msg_fields_buffer[n].offset
104  ros_pointcloud.fields = [ PointField("x", 0, PointField.FLOAT32, 1), PointField("y", 4, PointField.FLOAT32, 1), PointField("z", 8, PointField.FLOAT32, 1), PointField("intensity", 12, PointField.FLOAT32, 1) ]
105  ros_pointcloud.point_step = 16
106  ros_pointcloud.row_step = ros_pointcloud.point_step * ros_pointcloud.width
107 
108  # Copy pointcloud data
109  polar_cloud_data_buffer_len = (api_pointcloud.row_step * api_pointcloud.height) # length of polar cloud data in byte
110  assert(api_pointcloud.data.size == polar_cloud_data_buffer_len and field_offset_range >= 0 and field_offset_azimuth >= 0 and field_offset_elevation >= 0)
111  polar_cloud_data_buffer = bytearray(polar_cloud_data_buffer_len)
112  for n in range(polar_cloud_data_buffer_len):
113  polar_cloud_data_buffer[n] = api_pointcloud.data.buffer[n]
114  cartesian_point_cloud_buffer = np.zeros(4 * ros_pointcloud.width * ros_pointcloud.height, dtype = np.float32)
115  cartesian_point_cloud_offset = 0
116  for row_idx in range(api_pointcloud.height):
117  for col_idx in range(api_pointcloud.width):
118  # Get lidar point in polar coordinates (range, azimuth and elevation)
119  polar_point_offset = row_idx * api_pointcloud.row_step + col_idx * api_pointcloud.point_step
120  point_range = np.frombuffer(polar_cloud_data_buffer, dtype = np.float32, count = 1, offset = polar_point_offset + field_offset_range)[0]
121  point_azimuth = np.frombuffer(polar_cloud_data_buffer, dtype = np.float32, count = 1, offset = polar_point_offset + field_offset_azimuth)[0]
122  point_elevation = np.frombuffer(polar_cloud_data_buffer, dtype = np.float32, count = 1, offset = polar_point_offset + field_offset_elevation)[0]
123  point_intensity = 0
124  if field_offset_intensity >= 0:
125  point_intensity = np.frombuffer(polar_cloud_data_buffer, dtype = np.float32, count = 1, offset = polar_point_offset + field_offset_intensity)[0]
126  # Convert from polar to cartesian coordinates
127  point_x = point_range * math.cos(point_elevation) * math.cos(point_azimuth)
128  point_y = point_range * math.cos(point_elevation) * math.sin(point_azimuth)
129  point_z = point_range * math.sin(point_elevation)
130  # print("point {},{}: offset={}, range={}, azimuth={}, elevation={}, intensity={}, x={}, y={}, z={}".format(col_idx, row_idx, polar_point_offset,
131  # point_range, point_azimuth * 180 / math.pi, point_elevation * 180 / math.pi, point_intensity, point_x, point_y, point_z))
132  cartesian_point_cloud_buffer[cartesian_point_cloud_offset + 0] = point_x
133  cartesian_point_cloud_buffer[cartesian_point_cloud_offset + 1] = point_y
134  cartesian_point_cloud_buffer[cartesian_point_cloud_offset + 2] = point_z
135  cartesian_point_cloud_buffer[cartesian_point_cloud_offset + 3] = point_intensity
136  cartesian_point_cloud_offset = cartesian_point_cloud_offset + 4
137 
138  ros_pointcloud.data = cartesian_point_cloud_buffer.tostring()
139  return ros_pointcloud
140 
141 # Convert radar objects to ros sensor_msgs.msg.PointCloud2
142 def SickScanApiConvertRadarObjectsToROS(header, radar_objects):
143 
144  if __ROS_VERSION != 1:
145  print("## ERROR: SickScanApiConvertPointCloudToROS not implemented (ROS_VERSION = {})".format(__ROS_VERSION))
146  return None
147  # Copy pointcloud header
148  ros_pointcloud = PointCloud2()
149  ros_pointcloud.header.seq = header.seq
150  ros_pointcloud.header.stamp.secs = header.timestamp_sec
151  ros_pointcloud.header.stamp.nsecs = header.timestamp_nsec
152  ros_pointcloud.header.frame_id = ctypesCharArrayToString(header.frame_id)
153  ros_pointcloud.width = radar_objects.size
154  ros_pointcloud.height = 1
155  ros_pointcloud.is_bigendian = False
156  ros_pointcloud.is_dense = True
157 
158  # Set field description
159  ros_pointcloud.fields = [
160  PointField("x", 0, PointField.FLOAT32, 1), PointField("y", 4, PointField.FLOAT32, 1), PointField("z", 8, PointField.FLOAT32, 1),
161  PointField("vx", 12, PointField.FLOAT32, 1), PointField("vy", 16, PointField.FLOAT32, 1), PointField("vz", 20, PointField.FLOAT32, 1) ]
162  ros_pointcloud.point_step = 24
163  ros_pointcloud.row_step = ros_pointcloud.point_step * ros_pointcloud.width
164 
165  # Copy radar object data
166  ros_point_cloud_buffer = np.zeros(6 * ros_pointcloud.width * ros_pointcloud.height, dtype = np.float32)
167  ros_point_cloud_offset = 0
168  for n in range(radar_objects.size):
169  ros_point_cloud_buffer[ros_point_cloud_offset + 0] = radar_objects.buffer[n].object_box_center_position.x
170  ros_point_cloud_buffer[ros_point_cloud_offset + 1] = radar_objects.buffer[n].object_box_center_position.y
171  ros_point_cloud_buffer[ros_point_cloud_offset + 2] = radar_objects.buffer[n].object_box_center_position.z
172  ros_point_cloud_buffer[ros_point_cloud_offset + 3] = radar_objects.buffer[n].velocity_linear.x
173  ros_point_cloud_buffer[ros_point_cloud_offset + 4] = radar_objects.buffer[n].velocity_linear.y
174  ros_point_cloud_buffer[ros_point_cloud_offset + 5] = radar_objects.buffer[n].velocity_linear.z
175 
176  ros_pointcloud.data = ros_point_cloud_buffer.tostring()
177  return ros_pointcloud
178 
179 # Convert SickScanVisualizationMarkerBuffer to ros visualization_msgs.msg.MarkerArray
181 
182  if __ROS_VERSION == 0:
183  print("## ERROR: SickScanApiConvertPointCloudToROS not implemented (ROS_VERSION = {})".format(__ROS_VERSION))
184  return None
185  ros_marker = MarkerArray()
186  for n in range(sick_markers.size):
187  sick_marker = sick_markers.buffer[n]
188  marker = Marker()
189  # Copy marker
190  if __ROS_VERSION == 1:
191  marker.header.seq = sick_marker.header.seq
192  marker.header.stamp.secs = sick_marker.header.timestamp_sec
193  marker.header.stamp.nsecs = sick_marker.header.timestamp_nsec
194  marker.lifetime = rospy.Duration(sick_marker.lifetime_sec, sick_marker.lifetime_nsec)
195  elif __ROS_VERSION == 2:
196  marker.header.stamp.sec = sick_marker.header.timestamp_sec
197  marker.header.stamp.nanosec = sick_marker.header.timestamp_nsec
198  # marker.lifetime = Duration(seconds = sick_marker.lifetime_sec, nanoseconds = sick_marker.lifetime_nsec)
199  marker.header.frame_id = ctypesCharArrayToString(sick_marker.header.frame_id)
200  marker.ns= ctypesCharArrayToString(sick_marker.ns)
201  marker.id = sick_marker.id
202  marker.type = sick_marker.type
203  marker.action = sick_marker.action
204  marker.pose.position.x = sick_marker.pose_position.x
205  marker.pose.position.y = sick_marker.pose_position.y
206  marker.pose.position.z = sick_marker.pose_position.z
207  marker.pose.orientation.x = sick_marker.pose_orientation.x
208  marker.pose.orientation.y = sick_marker.pose_orientation.y
209  marker.pose.orientation.z = sick_marker.pose_orientation.z
210  marker.pose.orientation.w = sick_marker.pose_orientation.w
211  marker.scale.x = sick_marker.scale.x
212  marker.scale.y = sick_marker.scale.y
213  marker.scale.z = sick_marker.scale.z
214  marker.color.r = sick_marker.color.r
215  marker.color.g = sick_marker.color.g
216  marker.color.b = sick_marker.color.b
217  marker.color.a = sick_marker.color.a
218  marker.frame_locked = (sick_marker.frame_locked > 0)
219  marker.text = ctypesCharArrayToString(sick_marker.text)
220  marker.mesh_resource = ctypesCharArrayToString(sick_marker.mesh_resource)
221  marker.mesh_use_embedded_materials = (sick_marker.mesh_use_embedded_materials > 0)
222  for m in range(sick_marker.points.size):
223  sick_point = sick_marker.points.buffer[m]
224  ros_point = Point(x = sick_point.x, y = sick_point.y, z = sick_point.z)
225  marker.points.append(ros_point)
226  for m in range(sick_marker.colors.size):
227  sick_color = sick_marker.colors.buffer[m]
228  ros_color = ColorRGBA(r = sick_color.r, g = sick_color.g, b = sick_color.b, a = sick_color.a)
229  marker.colors.append(ros_color)
230  ros_marker.markers.append(marker)
231  return ros_marker
sensor_msgs::PointCloud2
::sensor_msgs::PointCloud2_< std::allocator< void > > PointCloud2
Definition: PointCloud2.h:90
roswrap::console::print
ROSCONSOLE_DECL void print(FilterBase *filter, void *logger, Level level, const char *file, int line, const char *function, const char *fmt,...) ROSCONSOLE_PRINTF_ATTRIBUTE(7
Don't call this directly. Use the ROS_LOG() macro instead.
api.sick_scan_api.ctypesCharArrayToString
def ctypesCharArrayToString(char_array)
Definition: sick_scan_api.py:882
sick_scan_api_converter.SickScanApiConvertPointCloudToROS
def SickScanApiConvertPointCloudToROS(api_pointcloud)
Definition: sick_scan_api_converter.py:30
visualization_msgs::Marker
::visualization_msgs::Marker_< std::allocator< void > > Marker
Definition: Marker.h:193
visualization_msgs::MarkerArray
::visualization_msgs::MarkerArray_< std::allocator< void > > MarkerArray
Definition: MarkerArray.h:50
sick_scan_api_converter.SickScanApiConvertRadarObjectsToROS
def SickScanApiConvertRadarObjectsToROS(header, radar_objects)
Definition: sick_scan_api_converter.py:142
sick_scan_api_converter.SickScanApiConvertPolarPointCloudToROS
def SickScanApiConvertPolarPointCloudToROS(api_pointcloud)
Definition: sick_scan_api_converter.py:71
sick_scan_api_converter.SickScanApiConvertMarkerArrayToROS
def SickScanApiConvertMarkerArrayToROS(sick_markers)
Definition: sick_scan_api_converter.py:180
std_msgs::ColorRGBA
::std_msgs::ColorRGBA_< std::allocator< void > > ColorRGBA
Definition: ColorRGBA.h:63
PointField
ros_sensor_msgs::PointField PointField
Definition: include/sick_scansegment_xd/common.h:127


sick_scan_xd
Author(s): Michael Lehning , Jochen Sprickerhof , Martin Günther
autogenerated on Fri Oct 25 2024 02:47:10