telecentric_projection.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import cv2 as cv
3 import numpy as np
4 import os
5 import unittest
6 
7 import ensenso_camera.ros2 as ros2py
8 
9 import ensenso_camera_test.ros2_testing as ros2py_testing
10 
11 from geometry_msgs.msg import Transform
12 from cv_bridge import CvBridge
13 from sensor_msgs.msg import PointCloud2
14 
15 pc2 = ros2py_testing.import_point_cloud2()
16 
17 TelecentricProjection = ros2py.import_action("ensenso_camera_msgs", "TelecentricProjection")
18 RequestData = ros2py.import_action("ensenso_camera_msgs", "RequestData")
19 
20 IMAGE_PATH = ros2py_testing.get_test_data_path("telecentric_projection/")
21 IMAGE_PATH_MONO8 = IMAGE_PATH + "mono8.jpg"
22 IMAGE_PATH_BINARY = IMAGE_PATH + "binary.jpg"
23 
24 FRAME = "Workspace"
25 
26 
27 class TestTelecentricProjection(unittest.TestCase):
28  def setUp(self):
29  self.node = ros2py.create_node("test_telecentric_projection")
30  self.telecentric_projection_client = ros2py.create_action_client(
31  self.node, "project_telecentric", TelecentricProjection
32  )
33  self.request_data_client = ros2py.create_action_client(self.node, "request_data", RequestData)
34 
36  success = ros2py.wait_for_servers(self.node, clients, timeout_sec=ros2py_testing.TEST_TIMEOUT, exit=False)
37  self.assertTrue(success, msg="Timeout reached for servers.")
38 
39  self.pc_subscriber = ros2py.create_subscription(self.node, PointCloud2, "/projected_point_cloud", self.callback)
40  self.got_subscribed_cloud = False
41 
42  # Rotation, that is 90 degrees rotated to the original camera in the test
43  self.trafo = Transform()
44  self.trafo.rotation.x = 0.0
45  self.trafo.rotation.y = -0.7071068
46  self.trafo.rotation.z = 0.0
47  self.trafo.rotation.w = 0.7071068
48  self.trafo.translation.x = 0.0
49  self.trafo.translation.y = 0.0
50  self.trafo.translation.z = 0.0
51 
52  self.timeout = ros2py.Duration(ros2py_testing.TEST_TIMEOUT)
53 
54  # Prepare the projected depth image
55  self.request_point_cloud()
57  self.generate_line_estimate() # --> gets/needs projected_depth_map
58 
59  # Prepare the projected point cloud
61 
62  # Test the subscribed cloud
64 
65  def tearDown(self):
66  if os.path.isfile(IMAGE_PATH_MONO8):
67  os.remove(IMAGE_PATH_MONO8)
68  if os.path.isfile(IMAGE_PATH_BINARY):
69  os.remove(IMAGE_PATH_BINARY)
70  # Unittests with several test cases require the node to be shut down before the next test case is started
71  ros2py.shutdown(self.node)
72 
74  request_data_goal = RequestData.Goal()
75  request_data_goal.request_point_cloud = True
76 
77  response = ros2py.send_action_goal(self.node, self.request_data_client, request_data_goal)
78  result = response.get_result()
79 
80  self.assertFalse(response.timeout(), msg="Requesting point cloud action timed out.")
81  self.assertTrue(response.successful(), msg="Requesting point cloud action not successful.")
82  self.assertEqual(result.error.code, 0, msg="Requesting point cloud not successful.")
83 
85  # Setup the view pose to be 90 degrees rotated w.r.t the original camera
86  goal = TelecentricProjection.Goal()
87  goal.view_pose = self.trafo
88  goal.frame = FRAME
89  goal.request_depth_image = True
90  goal.include_results_in_response = True
91 
92  response = ros2py.send_action_goal(
93  self.node, self.telecentric_projection_client, goal, timeout_sec=self.timeout
94  )
95  result = response.get_result()
96 
97  self.assertFalse(response.timeout(), msg="Requesting projected depth map timed out.")
98  self.assertTrue(response.successful(), msg="Requesting projected depth map action not successful.")
99  self.assertEqual(result.error.code, 0, msg="Requesting projected depth map not successful.")
100 
101  self.projected_depth_map = result.projected_depth_map
102 
104  bridge = CvBridge()
105 
106  # Message to cv image
107  cv_image = bridge.imgmsg_to_cv2(self.projected_depth_map, desired_encoding="passthrough")
108 
109  # Convert it to a mono image (expressed in millimeters) via numpy
110  mono8 = np.uint8(cv_image * 1000.0)
111  cv.imwrite(IMAGE_PATH_MONO8, mono8)
112 
113  # Grey image to binary image
114  _, binary_img = cv.threshold(mono8, 10, 255, cv.THRESH_BINARY)
115  cv.imwrite(IMAGE_PATH_BINARY, binary_img)
116 
117  # Filter out non null pixels
118  xyPoints = []
119  height, width = binary_img.shape
120  for i in range(0, height):
121  for j in range(0, (width)):
122  if binary_img[i, j] != 0:
123  point = (i, j)
124  xyPoints.append(point)
125 
126  # v is the normalized vector that is colinear to the found line estimate
127  vx, vy, cx, cy = cv.fitLine(np.array(xyPoints), cv.DIST_L2, 0, 0.01, 0.01)
128 
129  self.line_parameters = [vx, vy, cx, cy]
130 
132  goal = TelecentricProjection.Goal()
133  goal.view_pose = self.trafo
134  goal.frame = FRAME
135  goal.include_results_in_response = True
136 
137  response = ros2py.send_action_goal(
138  self.node, self.telecentric_projection_client, goal, timeout_sec=self.timeout
139  )
140  result = response.get_result()
141 
142  self.assertFalse(response.timeout(), msg="Requesting projected point cloud timed out.")
143  self.assertTrue(response.successful(), msg="Requesting projected point cloud action not successful.")
144  self.assertEqual(result.error.code, 0, msg="Requesting projected point cloud not successful.")
145 
146  self.projected_pc = result.projected_point_cloud
147 
148  def callback(self, msg):
149  self.got_subscribed_cloud = True
150  self.subscribed_cloud = msg
151 
153  goal = TelecentricProjection.Goal()
154  goal.view_pose = self.trafo
155  goal.frame = FRAME
156  goal.publish_results = True
157 
158  response = ros2py.send_action_goal(
159  self.node, self.telecentric_projection_client, goal, timeout_sec=self.timeout
160  )
161  result = response.get_result()
162 
163  self.assertFalse(response.timeout(), msg="Requesting publishing of point cloud timed out.")
164  self.assertTrue(response.successful(), msg="Requesting publishing of point cloud action not successful.")
165  self.assertEqual(result.error.code, 0, msg="Requesting publishing of point cloud not successful.")
166 
168  self.assertAlmostEqual(self.line_parameters[0], 1.0, delta=0.05)
169  self.assertAlmostEqual(self.line_parameters[1], 0.0, delta=0.05)
170 
172  cloud_points = list(pc2.read_points(self.projected_pc, skip_nans=True))
173  self.assertTrue(len(cloud_points) != 0, msg=" The recieved point cloud is empty.")
174 
176  if not self.got_subscribed_cloud:
177  ros2py.sleep(self.node, 4)
178  self.assertTrue(self.got_subscribed_cloud, msg="Recieved no published point cloud!")
179  cloud_points = list(pc2.read_points(self.subscribed_cloud, skip_nans=True))
180  self.assertTrue(len(cloud_points) != 0, msg="The recieved point cloud is empty.")
181 
182 
183 def main():
184  ros2py_testing.run_ros1_test("test_telecentric_projection", TestTelecentricProjection)
185 
186 
187 if __name__ == "__main__":
188  main()
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.node
node
Definition: telecentric_projection.py:29
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.request_data_client
request_data_client
Definition: telecentric_projection.py:33
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.retrieve_projected_point_cloud
def retrieve_projected_point_cloud(self)
Definition: telecentric_projection.py:131
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.test_projected_point_cloud
def test_projected_point_cloud(self)
Definition: telecentric_projection.py:171
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.callback
def callback(self, msg)
Definition: telecentric_projection.py:148
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.test_found_line_paramters
def test_found_line_paramters(self)
Definition: telecentric_projection.py:167
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.trafo
trafo
Definition: telecentric_projection.py:43
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.pc_subscriber
pc_subscriber
Definition: telecentric_projection.py:39
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.retrieve_projected_depth_map
def retrieve_projected_depth_map(self)
Definition: telecentric_projection.py:84
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.got_subscribed_cloud
got_subscribed_cloud
Definition: telecentric_projection.py:40
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.projected_depth_map
projected_depth_map
Definition: telecentric_projection.py:101
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.telecentric_projection_client
telecentric_projection_client
Definition: telecentric_projection.py:30
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.generate_line_estimate
def generate_line_estimate(self)
Definition: telecentric_projection.py:103
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.subscribed_cloud
subscribed_cloud
Definition: telecentric_projection.py:150
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.send_goal_with_publishing_point_cloud
def send_goal_with_publishing_point_cloud(self)
Definition: telecentric_projection.py:152
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.line_parameters
line_parameters
Definition: telecentric_projection.py:129
ensenso_camera_test.telecentric_projection.TestTelecentricProjection
Definition: telecentric_projection.py:27
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.request_point_cloud
def request_point_cloud(self)
Definition: telecentric_projection.py:73
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.test_subscribed_point_cloud
def test_subscribed_point_cloud(self)
Definition: telecentric_projection.py:175
ensenso_camera_test.telecentric_projection.main
def main()
Definition: telecentric_projection.py:183
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.setUp
def setUp(self)
Definition: telecentric_projection.py:28
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.projected_pc
projected_pc
Definition: telecentric_projection.py:146
ensenso_camera_test.ros2_testing
Definition: ros2_testing.py:1
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.tearDown
def tearDown(self)
Definition: telecentric_projection.py:65
ensenso_camera_test.telecentric_projection.TestTelecentricProjection.timeout
timeout
Definition: telecentric_projection.py:52


ensenso_camera_test
Author(s): Ensenso
autogenerated on Wed Apr 2 2025 02:37:52