7 import ensenso_camera.ros2
as ros2py
11 from geometry_msgs.msg
import Transform
12 from cv_bridge
import CvBridge
13 from sensor_msgs.msg
import PointCloud2
15 pc2 = ros2py_testing.import_point_cloud2()
17 TelecentricProjection = ros2py.import_action(
"ensenso_camera_msgs",
"TelecentricProjection")
18 RequestData = ros2py.import_action(
"ensenso_camera_msgs",
"RequestData")
20 IMAGE_PATH = ros2py_testing.get_test_data_path(
"telecentric_projection/")
21 IMAGE_PATH_MONO8 = IMAGE_PATH +
"mono8.jpg"
22 IMAGE_PATH_BINARY = IMAGE_PATH +
"binary.jpg"
29 self.
node = ros2py.create_node(
"test_telecentric_projection")
31 self.
node,
"project_telecentric", TelecentricProjection
36 success = ros2py.wait_for_servers(self.
node, clients, timeout_sec=ros2py_testing.TEST_TIMEOUT, exit=
False)
37 self.assertTrue(success, msg=
"Timeout reached for servers.")
44 self.
trafo.rotation.x = 0.0
45 self.
trafo.rotation.y = -0.7071068
46 self.
trafo.rotation.z = 0.0
47 self.
trafo.rotation.w = 0.7071068
48 self.
trafo.translation.x = 0.0
49 self.
trafo.translation.y = 0.0
50 self.
trafo.translation.z = 0.0
52 self.
timeout = ros2py.Duration(ros2py_testing.TEST_TIMEOUT)
66 if os.path.isfile(IMAGE_PATH_MONO8):
67 os.remove(IMAGE_PATH_MONO8)
68 if os.path.isfile(IMAGE_PATH_BINARY):
69 os.remove(IMAGE_PATH_BINARY)
71 ros2py.shutdown(self.
node)
74 request_data_goal = RequestData.Goal()
75 request_data_goal.request_point_cloud =
True
78 result = response.get_result()
80 self.assertFalse(response.timeout(), msg=
"Requesting point cloud action timed out.")
81 self.assertTrue(response.successful(), msg=
"Requesting point cloud action not successful.")
82 self.assertEqual(result.error.code, 0, msg=
"Requesting point cloud not successful.")
86 goal = TelecentricProjection.Goal()
87 goal.view_pose = self.
trafo
89 goal.request_depth_image =
True
90 goal.include_results_in_response =
True
92 response = ros2py.send_action_goal(
95 result = response.get_result()
97 self.assertFalse(response.timeout(), msg=
"Requesting projected depth map timed out.")
98 self.assertTrue(response.successful(), msg=
"Requesting projected depth map action not successful.")
99 self.assertEqual(result.error.code, 0, msg=
"Requesting projected depth map not successful.")
110 mono8 = np.uint8(cv_image * 1000.0)
111 cv.imwrite(IMAGE_PATH_MONO8, mono8)
114 _, binary_img = cv.threshold(mono8, 10, 255, cv.THRESH_BINARY)
115 cv.imwrite(IMAGE_PATH_BINARY, binary_img)
119 height, width = binary_img.shape
120 for i
in range(0, height):
121 for j
in range(0, (width)):
122 if binary_img[i, j] != 0:
124 xyPoints.append(point)
127 vx, vy, cx, cy = cv.fitLine(np.array(xyPoints), cv.DIST_L2, 0, 0.01, 0.01)
132 goal = TelecentricProjection.Goal()
133 goal.view_pose = self.
trafo
135 goal.include_results_in_response =
True
137 response = ros2py.send_action_goal(
140 result = response.get_result()
142 self.assertFalse(response.timeout(), msg=
"Requesting projected point cloud timed out.")
143 self.assertTrue(response.successful(), msg=
"Requesting projected point cloud action not successful.")
144 self.assertEqual(result.error.code, 0, msg=
"Requesting projected point cloud not successful.")
153 goal = TelecentricProjection.Goal()
154 goal.view_pose = self.
trafo
156 goal.publish_results =
True
158 response = ros2py.send_action_goal(
161 result = response.get_result()
163 self.assertFalse(response.timeout(), msg=
"Requesting publishing of point cloud timed out.")
164 self.assertTrue(response.successful(), msg=
"Requesting publishing of point cloud action not successful.")
165 self.assertEqual(result.error.code, 0, msg=
"Requesting publishing of point cloud not successful.")
172 cloud_points = list(pc2.read_points(self.
projected_pc, skip_nans=
True))
173 self.assertTrue(len(cloud_points) != 0, msg=
" The recieved point cloud is empty.")
177 ros2py.sleep(self.
node, 4)
180 self.assertTrue(len(cloud_points) != 0, msg=
"The recieved point cloud is empty.")
184 ros2py_testing.run_ros1_test(
"test_telecentric_projection", TestTelecentricProjection)
187 if __name__ ==
"__main__":