8 from actionlib_msgs.msg
import GoalStatus
9 from ensenso_camera_msgs.msg
import TelecentricProjectionAction, TelecentricProjectionGoal, TelecentricProjectionResult
10 from ensenso_camera_msgs.msg
import RequestDataAction, RequestDataGoal
12 from geometry_msgs.msg
import Transform
13 from cv_bridge
import CvBridge
21 PATH =
"../data/telecentric_projection" 22 IMAGE = PATH +
"/image/reference_image.jpg" 30 TelecentricProjectionAction)
33 if not client.wait_for_server(rospy.Duration(TIMEOUT)):
34 self.fail(msg=
"Request_data action servers timed out!")
41 self.trafo.rotation.x = 0
42 self.trafo.rotation.y = -0.7071068
43 self.trafo.rotation.z = 0
44 self.trafo.rotation.w = 0.7071068
45 self.trafo.translation.x = 0
46 self.trafo.translation.y = 0
47 self.trafo.translation.z = 0
62 request_data_goal = RequestDataGoal()
63 request_data_goal.request_point_cloud =
True 64 self.request_data_client.send_goal(request_data_goal)
65 self.request_data_client.wait_for_result()
67 result = self.request_data_client.get_result()
68 self.assertEqual(result.error.code, 0, msg=
"Requesting point cloud not successful")
72 goal = TelecentricProjectionGoal()
73 goal.view_pose = self.
trafo 75 goal.request_depth_image =
True 76 goal.include_results_in_response =
True 77 self.telecentric_projection_client.send_goal(goal)
80 goal = TelecentricProjectionGoal()
81 goal.view_pose = self.
trafo 83 goal.include_results_in_response =
True 84 self.telecentric_projection_client.send_goal(goal)
89 result = self.telecentric_projection_client.get_result()
90 image = result.projected_depth_map
94 cv_image = bridge.imgmsg_to_cv2(image, desired_encoding=
"passthrough")
97 mono8 = np.uint8(cv_image * 1000.0)
98 cv.imwrite(PATH +
"/image/mono8.jpg", mono8)
101 ret, binary_img = cv.threshold(mono8, 10, 255, cv.THRESH_BINARY)
102 cv.imwrite(PATH +
"/image/binary.jpg", binary_img)
106 height, width = binary_img.shape
107 for i
in range(0, height):
108 for j
in range(0, (width)):
109 if binary_img[i, j] != 0:
111 xyPoints.append(point)
114 vx, vy, cx, cy = cv.fitLine(np.array(xyPoints), cv.DIST_L2, 0, 0.01, 0.01)
120 result = self.telecentric_projection_client.get_result()
124 if not self.telecentric_projection_client.wait_for_result(rospy.Duration(TIMEOUT)):
125 self.fail(msg=
"Waiting for result times out.")
126 if self.telecentric_projection_client.get_state() != GoalStatus.SUCCEEDED:
127 self.fail(msg=
"Action did not succeed.")
134 goal = TelecentricProjectionGoal()
135 goal.view_pose = self.
trafo 137 goal.publish_results =
True 138 self.telecentric_projection_client.send_goal(goal)
145 cloud_points = list(pc2.read_points(self.
projected_pc, skip_nans=
True))
146 self.assertTrue(len(cloud_points) != 0, msg=
" The recieved point cloud is empty.")
153 self.fail(
"Recieved no published point cloud!")
155 self.assertTrue(len(cloud_points) != 0, msg=
" The recieved point cloud is empty.")
158 if __name__ ==
"__main__":
160 rospy.init_node(
"test_telecentric_projection")
161 rostest.rosrun(
"ensenso_camera_test",
"test_telecentric_projection", TestTelecentricProjection)
162 except rospy.ROSInterruptException:
def generate_line_estimate(self)
def test_projected_point_cloud(self)
def get_projected_point_cloud(self)
def send_goal_depth_image(self)
def send_goal_with_publishing_point_cloud(self)
def send_goal_point_cloud(self)
def wait_for_server_succeed(self)
def request_point_cloud(self)
telecentric_projection_client
def test_subscribed_point_cloud(self)
def test_found_line_paramters(self)