13 from sick_scan_api
import *
16 __ROS_VERSION = os.getenv(
"ROS_VERSION")
17 if __ROS_VERSION
is None:
20 __ROS_VERSION = int(__ROS_VERSION)
23 if __ROS_VERSION == 0:
25 from mpl_toolkits
import mplot3d
26 import matplotlib.pyplot
as plt
28 except ModuleNotFoundError:
29 print(
"import matplotlib failed, visualization disabled")
30 elif __ROS_VERSION == 1:
32 from sick_scan_api_converter
import *
33 elif __ROS_VERSION == 2:
35 from rclpy.node
import Node
36 from sick_scan_api_converter
import *
68 num_fields = pointcloud_msg.fields.size
69 msg_fields_buffer = pointcloud_msg.fields.buffer
73 for n
in range(num_fields):
75 field_offset = msg_fields_buffer[n].offset
77 field_offset_x = msg_fields_buffer[n].offset
78 elif field_name ==
"y":
79 field_offset_y = msg_fields_buffer[n].offset
80 elif field_name ==
"z":
81 field_offset_z = msg_fields_buffer[n].offset
83 cloud_data_buffer_len = (pointcloud_msg.row_step * pointcloud_msg.height)
84 assert(pointcloud_msg.data.size == cloud_data_buffer_len
and field_offset_x >= 0
and field_offset_y >= 0
and field_offset_z >= 0)
85 cloud_data_buffer = bytearray(cloud_data_buffer_len)
86 for n
in range(cloud_data_buffer_len):
87 cloud_data_buffer[n] = pointcloud_msg.data.buffer[n]
88 points_x = np.zeros(pointcloud_msg.width * pointcloud_msg.height, dtype = np.float32)
89 points_y = np.zeros(pointcloud_msg.width * pointcloud_msg.height, dtype = np.float32)
90 points_z = np.zeros(pointcloud_msg.width * pointcloud_msg.height, dtype = np.float32)
92 for row_idx
in range(pointcloud_msg.height):
93 for col_idx
in range(pointcloud_msg.width):
95 pointcloud_offset = row_idx * pointcloud_msg.row_step + col_idx * pointcloud_msg.point_step
96 points_x[point_idx] = np.frombuffer(cloud_data_buffer, dtype = np.float32, count = 1, offset = pointcloud_offset + field_offset_x)[0]
97 points_y[point_idx] = np.frombuffer(cloud_data_buffer, dtype = np.float32, count = 1, offset = pointcloud_offset + field_offset_y)[0]
98 points_z[point_idx] = np.frombuffer(cloud_data_buffer, dtype = np.float32, count = 1, offset = pointcloud_offset + field_offset_z)[0]
99 point_idx = point_idx + 1
100 return points_x, points_y, points_z
108 pointcloud_msg = pointcloud_msg.contents
109 print(
"pySickScanCartesianPointCloudMsgCallback (ROS-{}): api_handle={}, {}x{} pointcloud, {} echo(s), segment {}, {} fields, frame_id \"{}\", topic {}, timestamp {}.{:06d}".format(
110 __ROS_VERSION, api_handle, pointcloud_msg.width, pointcloud_msg.height, pointcloud_msg.num_echos , pointcloud_msg.segment_idx, pointcloud_msg.fields.size,
111 pointcloud_msg.header.frame_id, pointcloud_msg.topic, pointcloud_msg.header.timestamp_sec, pointcloud_msg.header.timestamp_nsec))
112 global api_test_settings
114 cur_timestamp = datetime.datetime.now()
115 if __ROS_VERSION == 0
and cur_timestamp >= api_test_settings.cartesian_pointcloud_timestamp_published + datetime.timedelta(seconds=0.5):
117 api_test_settings.cartesian_pointcloud_timestamp_published = cur_timestamp
118 elif __ROS_VERSION > 0
and api_test_settings.ros_pointcloud_publisher
is not None and pointcloud_msg.width > 0
and pointcloud_msg.height > 0
and cur_timestamp >= api_test_settings.cartesian_pointcloud_timestamp_published + datetime.timedelta(seconds=0.2):
121 api_test_settings.ros_pointcloud_publisher.publish(ros_pointcloud)
122 api_test_settings.cartesian_pointcloud_timestamp_published = cur_timestamp
126 pointcloud_msg = pointcloud_msg.contents
127 print(
"pySickScanPolarPointCloudMsgCallback (ROS-{}): api_handle={}, {}x{} pointcloud, {} echo(s), segment {}, {} fields, frame_id \"{}\", topic {}, timestamp {}.{:06d}".format(
128 __ROS_VERSION, api_handle, pointcloud_msg.width, pointcloud_msg.height, pointcloud_msg.num_echos , pointcloud_msg.segment_idx, pointcloud_msg.fields.size,
129 pointcloud_msg.header.frame_id, pointcloud_msg.topic, pointcloud_msg.header.timestamp_sec, pointcloud_msg.header.timestamp_nsec))
130 if __ROS_VERSION == 1:
134 global api_test_settings
135 if pointcloud_msg.segment_idx < 0:
136 api_test_settings.ros_polar_pointcloud_is_multi_segment_scanner =
True
137 if api_test_settings.ros_polar_pointcloud_publisher
is not None and pointcloud_msg.width > 0
and pointcloud_msg.height > 0:
138 cur_timestamp = datetime.datetime.now()
139 publish_polar_pointcloud =
False
140 if api_test_settings.ros_polar_pointcloud_is_multi_segment_scanner ==
False and cur_timestamp >= api_test_settings.polar_pointcloud_timestamp_published + datetime.timedelta(seconds=1.0):
141 publish_polar_pointcloud =
True
142 elif pointcloud_msg.segment_idx < 0
and cur_timestamp >= api_test_settings.polar_pointcloud_timestamp_published + datetime.timedelta(seconds=1.0):
143 publish_polar_pointcloud =
True
144 if publish_polar_pointcloud:
146 api_test_settings.ros_polar_pointcloud_publisher.publish(ros_pointcloud)
147 api_test_settings.polar_pointcloud_timestamp_published = cur_timestamp
151 imu_msg = imu_msg.contents
152 print(
"pySickScanImuMsgCallback: api_handle={}, imu message: orientation=({:.8},{:.8},{:.8},{:.8}), angular_velocity=({:.8},{:.8},{:.8}), linear_acceleration=({:.8},{:.8},{:.8})".format(
153 api_handle, imu_msg.orientation.x, imu_msg.orientation.y, imu_msg.orientation.z, imu_msg.orientation.w,
154 imu_msg.angular_velocity.x, imu_msg.angular_velocity.y, imu_msg.angular_velocity.y,
155 imu_msg.linear_acceleration.x, imu_msg.linear_acceleration.y, imu_msg.linear_acceleration.z))
159 lferec_msg = lferec_msg.contents
161 for field
in lferec_msg.fields:
162 if field.field_result_mrs == 1:
164 elif field.field_result_mrs == 2:
168 field_info = field_info +
", field {}: ({},{},{},{},{})".format(field.field_index, status, field.dist_scale_factor, field.dist_scale_offset, field.angle_scale_factor, field.angle_scale_offset)
169 print(
"pySickScanLFErecMsgCallback: api_handle={}, lferec message: {} fields{}".format(api_handle, lferec_msg.fields_number, field_info))
173 lidoutputstate_msg = lidoutputstate_msg.contents
174 state_info =
"({},{},{},{},{},{},{},{})".format(lidoutputstate_msg.output_state[0], lidoutputstate_msg.output_state[1], lidoutputstate_msg.output_state[2], lidoutputstate_msg.output_state[3],
175 lidoutputstate_msg.output_state[4], lidoutputstate_msg.output_state[5], lidoutputstate_msg.output_state[6], lidoutputstate_msg.output_state[7])
176 count_info =
"({},{},{},{},{},{},{},{})".format(lidoutputstate_msg.output_count[0], lidoutputstate_msg.output_count[1], lidoutputstate_msg.output_count[2], lidoutputstate_msg.output_count[3],
177 lidoutputstate_msg.output_count[4], lidoutputstate_msg.output_count[5], lidoutputstate_msg.output_count[6], lidoutputstate_msg.output_count[7])
178 print(
"pySickScanLIDoutputstateMsgCallback: api_handle={}, lidoutputstate message, outputstate={}, outputcount={}".format(api_handle, state_info, count_info))
182 radarscan_msg = radarscan_msg.contents
183 print(
"pySickScanRadarScanCallback: api_handle={}, radarscan message: {} targets, {} objects".format(api_handle, radarscan_msg.targets.width, radarscan_msg.objects.size))
184 if __ROS_VERSION == 1:
186 global api_test_settings
187 if api_test_settings.ros_pointcloud_publisher
is not None and radarscan_msg.targets.width > 0
and radarscan_msg.targets.height > 0:
189 api_test_settings.ros_pointcloud_publisher.publish(ros_pointcloud)
190 if api_test_settings.ros_polar_pointcloud_publisher
is not None and radarscan_msg.objects.size > 0:
192 api_test_settings.ros_polar_pointcloud_publisher.publish(ros_pointcloud)
196 ldmrsobjectarray_msg = ldmrsobjectarray_msg.contents
197 print(
"pySickScanLdmrsObjectArrayCallback: api_handle={}, ldmrsobjectarray message: {} objects".format(api_handle, ldmrsobjectarray_msg.objects.size))
201 visualizationmarker_msg = visualizationmarker_msg.contents
203 for n
in range(visualizationmarker_msg.markers.size):
204 marker = visualizationmarker_msg.markers.buffer[n]
205 marker_info = marker_info +
", marker {}: pos=({},{},{})".format(marker.id, marker.pose_position.x, marker.pose_position.y, marker.pose_position.z)
206 print(
"pySickScanVisualizationMarkerCallback: api_handle={}, visualizationmarker message: {} marker{}".format(api_handle, visualizationmarker_msg.markers.size, marker_info))
207 if __ROS_VERSION > 0:
209 global api_test_settings
210 if api_test_settings.ros_visualizationmarker_publisher
is not None and visualizationmarker_msg.markers.size > 0:
212 api_test_settings.ros_visualizationmarker_publisher.publish(ros_markers)
216 navposelandmark_msg = navposelandmark_msg.contents
217 print(
"pySickScanNavPoseLandmarkCallback: api_handle={}, NavPoseLandmark message: x={:.3}, y={:.3}, yaw={:.4}, {} reflectors".format(api_handle, navposelandmark_msg.pose_x, navposelandmark_msg.pose_y, navposelandmark_msg.pose_yaw, navposelandmark_msg.reflectors.size))
221 diagnostic_msg = diagnostic_msg.contents
222 print(
"pySickScanDiagnosticMsgCallback: api_handle={}, status_code={}, status_message={}".format(api_handle, diagnostic_msg.status_code, diagnostic_msg.status_message))
226 log_msg = log_msg.contents
227 print(
"pySickScanLogMsgCallback: api_handle={}, log_level={}, log_message={}".format(api_handle, log_msg.log_level, log_msg.log_message))
235 wait_next_message_timeout = 0.1
245 odom_msg.vel_x = +1.0
246 odom_msg.vel_y = -1.0
248 odom_msg.timestamp_sec = 12345
249 odom_msg.timestamp_nsec = 6789
251 navodom_msg.vel_x = +1.0
252 navodom_msg.vel_y = -1.0
253 navodom_msg.omega = 0.5
254 navodom_msg.timestamp = 123456789
255 navodom_msg.coordbase = 0
256 global api_test_settings
257 while api_test_settings.polling:
258 if api_test_settings.polling_functions[
"SickScanApiWaitNextCartesianPointCloudMsg"]:
260 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
262 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
263 print(
"pyrunSickScanApiTestWaitNext: success")
264 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
265 print(
"## ERROR pyrunSickScanApiTestWaitNext: SICK_SCAN_API_TIMEOUT, SickScanApiWaitNextCartesianPointCloudMsg failed, error code {} ({})".format(ret, int(ret)))
266 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
267 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextCartesianPointCloudMsg failed, error code {} ({})".format(ret, int(ret)))
269 if api_test_settings.polling_functions[
"SickScanApiWaitNextPolarPointCloudMsg"]:
271 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
273 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
274 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextPolarPointCloudMsg failed, error code {} ({})".format(ret, int(ret)))
276 if api_test_settings.polling_functions[
"SickScanApiWaitNextImuMsg"]:
278 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
280 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
281 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextImuMsg failed, error code {} ({})".format(ret, int(ret)))
283 if api_test_settings.polling_functions[
"SickScanApiWaitNextLFErecMsg"]:
285 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
287 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
288 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextLFErecMsg failed, error code {} ({})".format(ret, int(ret)))
290 if api_test_settings.polling_functions[
"SickScanApiWaitNextLIDoutputstateMsg"]:
292 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
294 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
295 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextLIDoutputstateMsg failed, error code {} ({})".format(ret, int(ret)))
297 if api_test_settings.polling_functions[
"SickScanApiWaitNextRadarScanMsg"]:
299 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
301 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
302 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextRadarScanMsg failed, error code {} ({})".format(ret, int(ret)))
304 if api_test_settings.polling_functions[
"SickScanApiWaitNextLdmrsObjectArrayMsg"]:
306 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
308 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
309 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextLdmrsObjectArrayMsg failed, error code {} ({})".format(ret, int(ret)))
311 if api_test_settings.polling_functions[
"SickScanApiWaitNextVisualizationMarkerMsg"]:
313 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
315 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
316 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextVisualizationMarkerMsg failed, error code {} ({})".format(ret, int(ret)))
318 if api_test_settings.polling_functions[
"SickScanApiWaitNextNavPoseLandmarkMsg"]:
320 if ret == int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS):
322 elif ret != int(SickScanApiErrorCodes.SICK_SCAN_API_SUCCESS)
and ret != int(SickScanApiErrorCodes.SICK_SCAN_API_TIMEOUT):
323 print(
"## ERROR pyrunSickScanApiTestWaitNext: SickScanApiWaitNextNavPoseLandmarkMsg failed, error code {} ({})".format(ret, int(ret)))
333 if __name__ ==
"__main__":
338 cli_arg_start_idx = 1
339 for n, cli_arg
in enumerate(sys.argv):
340 if cli_arg.startswith(
"_polling:="):
341 cli_arg_start_idx = max(n + 1, cli_arg_start_idx)
342 if int(cli_arg[10:]) > 0:
343 api_test_settings.polling =
True
344 if cli_arg.startswith(
"_verbose:="):
345 cli_arg_start_idx = max(n + 1, cli_arg_start_idx)
346 api_test_settings.verbose_level = int(cli_arg[10:])
347 cli_args =
" ".join(sys.argv[cli_arg_start_idx:])
348 if __ROS_VERSION == 0
and plt_enabled:
349 api_test_settings.plot_figure = plt.figure()
350 api_test_settings.plot_axes = plt.axes(projection=
"3d")
351 elif __ROS_VERSION == 1:
352 rospy.init_node(
"sick_scan_api_test_py")
353 api_test_settings.ros_pointcloud_publisher = rospy.Publisher(
"/sick_scan_xd_api_test/api_cloud", PointCloud2, queue_size=10)
354 api_test_settings.ros_polar_pointcloud_publisher = rospy.Publisher(
"/sick_scan_xd_api_test/api_cloud_polar", PointCloud2, queue_size=10)
355 api_test_settings.ros_visualizationmarker_publisher = rospy.Publisher(
"/sick_scan_xd_api_test/marker", MarkerArray, queue_size=10)
356 elif __ROS_VERSION == 2:
358 ros_node = Node(
"sick_scan_api_test_py")
359 api_test_settings.ros_pointcloud_publisher = ros_node.create_publisher(PointCloud2,
"/sick_scan_xd_api_test/api_cloud", 10)
360 api_test_settings.ros_visualizationmarker_publisher = ros_node.create_publisher(MarkerArray,
"/sick_scan_xd_api_test/marker", 10)
364 sick_scan_library =
SickScanApiLoadLibrary([
"build/Debug/",
"build_win64/Debug/",
"src/build/Debug/",
"src/build_win64/Debug/",
"src/sick_scan_xd/build/Debug/",
"src/sick_scan_xd/build_win64/Debug/",
"./",
"../"],
"sick_scan_xd_shared_lib.dll")
366 sick_scan_library =
SickScanApiLoadLibrary([
"build/",
"build_linux/",
"src/build/",
"src/build_linux/",
"src/sick_scan_xd/build/",
"src/sick_scan_xd/build_linux/",
"./",
"../"],
"libsick_scan_xd_shared_lib.so")
371 print(
"sick_scan_xd_api_test.py: initializing lidar, commandline arguments = \"{}\"".format(cli_args))
374 api_test_wait_next_thread =
None
375 if api_test_settings.polling:
376 api_test_wait_next_thread = threading.Thread(target=pyrunSickScanApiTestWaitNext, args=(sick_scan_library, api_handle))
377 api_test_wait_next_thread.start()
426 if api_test_settings.verbose_level != verbose_level:
427 print(f
"## ERROR sick_scan_xd_api_test.py: SickScanApiSetVerboseLevel(verbose_level={api_test_settings.verbose_level}) failed, running with verbose_level={verbose_level}")
429 print(f
"sick_scan_xd_api_test.py running with verbose_level={verbose_level}")
432 if __ROS_VERSION == 0:
435 if plt_enabled
and len(api_test_settings.plot_points_x) > 0
and len(api_test_settings.plot_points_y) > 0
and len(api_test_settings.plot_points_z) > 0:
436 print(
"sick_scan_xd_api_test.py plotting pointcloud by matplotlib")
437 plot_points_x = np.copy(api_test_settings.plot_points_x)
438 plot_points_y = np.copy(api_test_settings.plot_points_y)
439 plot_points_z = np.copy(api_test_settings.plot_points_z)
442 api_test_settings.plot_figure = plt.figure()
443 api_test_settings.plot_axes = plt.axes(projection=
"3d")
445 api_test_settings.plot_axes.scatter(plot_points_x, plot_points_y, plot_points_z, c=
'r', marker=
'.')
446 api_test_settings.plot_axes.set_xlabel(
"x")
447 api_test_settings.plot_axes.set_ylabel(
"y")
454 print(f
"sick_scan_xd_api_test.py: sopas_request=\"sRN SCdevicestate\", sopas_response=\"{sopas_response}\"")
457 elif __ROS_VERSION == 1:
459 while not rospy.is_shutdown():
463 while user_key !=
"\n":
465 print(
"sick_scan_xd_api_test.py running. Press ENTER to exit")
466 user_key = sys.stdin.read(1)
469 print(
"sick_scan_xd_api_test.py finishing...")
470 if api_test_settings.polling:
471 api_test_settings.polling =
False
472 if api_test_wait_next_thread
is not None:
473 api_test_wait_next_thread.join()
485 if not api_test_settings.polling:
490 print(
"sick_scan_xd_api_test.py finished.")