5 from scipy.spatial.transform
import Rotation
7 import ensenso_camera.ros2
as ros2py
9 from ensenso_camera_msgs.msg
import Parameter
13 from sensor_msgs.msg
import PointCloud2, PointField
16 pc2 = ros2py_testing.import_point_cloud2()
18 RequestData = ros2py.import_action(
"ensenso_camera_msgs",
"RequestData")
19 SetParameter = ros2py.import_action(
"ensenso_camera_msgs",
"SetParameter")
27 TARGET_FRAME =
"Workspace"
32 self.
node = ros2py.create_node(
"test_scale_disparity_map")
41 success = ros2py.wait_for_servers(self.
node, clients, timeout_sec=ros2py_testing.TEST_TIMEOUT, exit=
False)
42 self.assertTrue(success, msg=
"Timeout reached for servers.")
50 self.
node, PointCloud2,
"point_cloud_dip", self.
callback_dip, ros2py.get_qos_profile(
"point_cloud_dip")
54 request_data_goal = RequestData.Goal()
55 request_data_goal.request_rectified_images =
True
56 request_data_goal.request_disparity_map =
True
57 request_data_goal.request_depth_image =
True
58 request_data_goal.request_point_cloud =
True
59 request_data_goal.publish_results =
True
62 result = response.get_result()
64 self.assertFalse(response.timeout(), msg=
"Requesting data action timed out.")
65 self.assertTrue(response.successful(), msg=
"Requesting data action not successful.")
66 self.assertEqual(result.error.code, 0, msg=
"Requesting data not successful.")
78 point_cloud.header.frame_id,
79 point_cloud.header.stamp,
81 except Exception
as e:
82 self.
node.get_logger().error(e)
86 q = tf.transform.rotation
87 x = tf.transform.translation
88 t[:3, :3] = Rotation.from_quat([q.x, q.y, q.z, q.w]).as_dcm()
89 t[:3, 3] = [x.x, x.y, x.z]
91 points_msg = list(pc2.read_points(point_cloud))
92 points_map = np.ones((len(points_msg), 4))
93 points_map[:, :3] = points_msg
94 points_map = np.dot(t, points_map.T).T
97 PointField(name=
"x", offset=0, datatype=PointField.FLOAT32, count=1),
98 PointField(name=
"y", offset=4, datatype=PointField.FLOAT32, count=1),
99 PointField(name=
"z", offset=8, datatype=PointField.FLOAT32, count=1),
102 pcl_map_header = point_cloud.header
103 pcl_map_header.frame_id =
"map"
104 return pc2.create_cloud(pcl_map_header, fields, points_map[:, :3])
107 goal = SetParameter.Goal(parameters=parameters)
110 result = response.get_result()
111 self.assertTrue(response.successful())
112 self.assertEqual(result.error.code, 0)
116 self.
set_parameters([Parameter(key=Parameter.SCALING, float_value=0.5)])
122 ros2py.sleep(self.
node, 2)
125 self.assertIsNotNone(self.
point_cloud_ens, msg=
"Received no Ensenso point cloud")
126 self.assertIsNotNone(self.
point_cloud_dip, msg=
"Received no reference point cloud")
132 msg=f
"Expected Ensenso point cloud in frame '{TARGET_FRAME}', got '{frame}'!",
139 points_ens = list(pc2.read_points(self.
point_cloud_ens, skip_nans=
True))
140 points_dip = list(pc2.read_points(self.
point_cloud_dip, skip_nans=
True))
141 self.assertGreater(len(points_ens), 0, msg=
"The Ensenso point cloud is empty")
142 self.assertGreater(len(points_dip), 0, msg=
"The reference point cloud is empty")
144 def average_values(points, axis=0):
145 return np.average(np.array(points)[:, axis : axis + 1]) * 1000.0
147 self.assertAlmostEqual(
148 average_values(points_ens, axis=0),
149 average_values(points_dip, axis=0),
151 msg=
"x values differ",
154 self.assertAlmostEqual(
155 average_values(points_ens, axis=1),
156 average_values(points_dip, axis=1),
158 msg=
"y values differ",
161 self.assertAlmostEqual(
162 average_values(points_ens, axis=2),
163 average_values(points_dip, axis=2),
165 msg=
"z values differ",
170 ros2py_testing.run_ros1_test(
"test_scale_disparity_map", TestScaleDisparityMap)
173 if __name__ ==
"__main__":