telecentric_projection.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import rostest
4 
5 import unittest
6 
7 import actionlib
8 from actionlib_msgs.msg import GoalStatus
9 from ensenso_camera_msgs.msg import TelecentricProjectionAction, TelecentricProjectionGoal, TelecentricProjectionResult
10 from ensenso_camera_msgs.msg import RequestDataAction, RequestDataGoal
11 
12 from geometry_msgs.msg import Transform
13 from cv_bridge import CvBridge
14 import cv2 as cv
15 import numpy as np
16 import time
17 
18 import sensor_msgs.point_cloud2 as pc2
19 from sensor_msgs.point_cloud2 import PointCloud2
20 
21 PATH = "../data/telecentric_projection"
22 IMAGE = PATH + "/image/reference_image.jpg"
23 TIMEOUT = 20
24 FRAME = "Workspace"
25 
26 
27 class TestTelecentricProjection(unittest.TestCase):
28  def setUp(self):
30  TelecentricProjectionAction)
31  self.request_data_client = actionlib.SimpleActionClient("request_data", RequestDataAction)
32  for client in [self.telecentric_projection_client, self.request_data_client]:
33  if not client.wait_for_server(rospy.Duration(TIMEOUT)):
34  self.fail(msg="Request_data action servers timed out!")
35 
36  self.pc_subscriber = rospy.Subscriber("/projected_point_cloud", PointCloud2, self.callback)
37  self.got_subscribed_cloud = False
38 
39  # Rotation, that is 90 degrees rotated to the original camera in the test
40  self.trafo = Transform()
41  self.trafo.rotation.x = 0
42  self.trafo.rotation.y = -0.7071068
43  self.trafo.rotation.z = 0
44  self.trafo.rotation.w = 0.7071068
45  self.trafo.translation.x = 0
46  self.trafo.translation.y = 0
47  self.trafo.translation.z = 0
48 
49  # Test the projected depth image
50  self.request_point_cloud()
53 
54  # Test the point cloud
57 
58  # Test the subscribed cloud
60 
62  request_data_goal = RequestDataGoal()
63  request_data_goal.request_point_cloud = True
64  self.request_data_client.send_goal(request_data_goal)
65  self.request_data_client.wait_for_result()
66 
67  result = self.request_data_client.get_result()
68  self.assertEqual(result.error.code, 0, msg="Requesting point cloud not successful")
69 
71  # Setup the view pose to be 90 degrees rotated w.r.t the original camera
72  goal = TelecentricProjectionGoal()
73  goal.view_pose = self.trafo
74  goal.frame = FRAME
75  goal.request_depth_image = True
76  goal.include_results_in_response = True
77  self.telecentric_projection_client.send_goal(goal)
78 
80  goal = TelecentricProjectionGoal()
81  goal.view_pose = self.trafo
82  goal.frame = FRAME
83  goal.include_results_in_response = True
84  self.telecentric_projection_client.send_goal(goal)
85 
88 
89  result = self.telecentric_projection_client.get_result()
90  image = result.projected_depth_map
91  bridge = CvBridge()
92 
93  # Message to cv image
94  cv_image = bridge.imgmsg_to_cv2(image, desired_encoding="passthrough")
95 
96  # Convert it to a mono image (expressed in millimeters) via numpy
97  mono8 = np.uint8(cv_image * 1000.0)
98  cv.imwrite(PATH + "/image/mono8.jpg", mono8)
99 
100  # Grey image to binary image
101  ret, binary_img = cv.threshold(mono8, 10, 255, cv.THRESH_BINARY)
102  cv.imwrite(PATH + "/image/binary.jpg", binary_img)
103 
104  # Filter out non null pixels
105  xyPoints = []
106  height, width = binary_img.shape
107  for i in range(0, height):
108  for j in range(0, (width)):
109  if binary_img[i, j] != 0:
110  point = (i, j)
111  xyPoints.append(point)
112 
113  # v is the normalized vector that is colinear to the found line estimate
114  vx, vy, cx, cy = cv.fitLine(np.array(xyPoints), cv.DIST_L2, 0, 0.01, 0.01)
115 
116  self.line_parameters = [vx, vy, cx, cy]
117 
120  result = self.telecentric_projection_client.get_result()
121  self.projected_pc = result.projected_point_cloud
122 
124  if not self.telecentric_projection_client.wait_for_result(rospy.Duration(TIMEOUT)):
125  self.fail(msg="Waiting for result times out.")
126  if self.telecentric_projection_client.get_state() != GoalStatus.SUCCEEDED:
127  self.fail(msg="Action did not succeed.")
128 
129  def callback(self, data):
130  self.got_subscribed_cloud = True
131  self.subscribed_cloud = data
132 
134  goal = TelecentricProjectionGoal()
135  goal.view_pose = self.trafo
136  goal.frame = FRAME
137  goal.publish_results = True
138  self.telecentric_projection_client.send_goal(goal)
139 
141  self.assertAlmostEqual(self.line_parameters[0], 1.0, delta=0.05)
142  self.assertAlmostEqual(self.line_parameters[1], 0.0, delta=0.05)
143 
145  cloud_points = list(pc2.read_points(self.projected_pc, skip_nans=True))
146  self.assertTrue(len(cloud_points) != 0, msg=" The recieved point cloud is empty.")
147 
149  if not self.got_subscribed_cloud:
150  # Wait 4 seconds to receive a point cloud from the subscriber.
151  time.sleep(4)
152  if not self.got_subscribed_cloud:
153  self.fail("Recieved no published point cloud!")
154  cloud_points = list(pc2.read_points(self.subscribed_cloud, skip_nans=True))
155  self.assertTrue(len(cloud_points) != 0, msg=" The recieved point cloud is empty.")
156 
157 
158 if __name__ == "__main__":
159  try:
160  rospy.init_node("test_telecentric_projection")
161  rostest.rosrun("ensenso_camera_test", "test_telecentric_projection", TestTelecentricProjection)
162  except rospy.ROSInterruptException:
163  pass


ensenso_camera_test
Author(s): Ensenso
autogenerated on Thu May 6 2021 02:50:10