scale_disparity_map.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import numpy as np
3 import unittest
4 
5 from scipy.spatial.transform import Rotation
6 
7 import ensenso_camera.ros2 as ros2py
8 
9 from ensenso_camera_msgs.msg import Parameter
10 
11 import ensenso_camera_test.ros2_testing as ros2py_testing
12 
13 from sensor_msgs.msg import PointCloud2, PointField
14 
15 
16 pc2 = ros2py_testing.import_point_cloud2()
17 
18 RequestData = ros2py.import_action("ensenso_camera_msgs", "RequestData")
19 SetParameter = ros2py.import_action("ensenso_camera_msgs", "SetParameter")
20 
21 
22 # In ROS2 the depth_image_proc/point_cloud_xyz node has a fixed QOS profile and the message is not received
23 # after a single request, hence we request the data until we received a point cloud from the node.
24 MAX_TRIES = 10
25 
26 # The corresponding test launch file opens a camera that is linked to "Workspace".
27 TARGET_FRAME = "Workspace"
28 
29 
30 class TestScaleDisparityMap(unittest.TestCase):
31  def setUp(self):
32  self.node = ros2py.create_node("test_scale_disparity_map")
33 
34  self.tf_buffer = ros2py_testing.create_tf_buffer()
35  self.tf_listener = ros2py_testing.create_tf_listener(self.tf_buffer, self.node)
36 
37  self.request_data_client = ros2py.create_action_client(self.node, "request_data", RequestData)
38  self.set_parameter_client = ros2py.create_action_client(self.node, "set_parameter", SetParameter)
39 
40  clients = [self.request_data_client, self.set_parameter_client]
41  success = ros2py.wait_for_servers(self.node, clients, timeout_sec=ros2py_testing.TEST_TIMEOUT, exit=False)
42  self.assertTrue(success, msg="Timeout reached for servers.")
43 
44  self.point_cloud_ens = None
45  self.point_cloud_dip = None
46 
47  # The Ensenso point cloud and the reference point cloud calculated by depth_image_proc/point_cloud_xyz
48  self.pc_subscriber1 = ros2py.create_subscription(self.node, PointCloud2, "point_cloud", self.callback_ens)
49  self.pc_subscriber2 = ros2py.create_subscription(
50  self.node, PointCloud2, "point_cloud_dip", self.callback_dip, ros2py.get_qos_profile("point_cloud_dip")
51  )
52 
53  def request_data(self):
54  request_data_goal = RequestData.Goal()
55  request_data_goal.request_rectified_images = True
56  request_data_goal.request_disparity_map = True
57  request_data_goal.request_depth_image = True
58  request_data_goal.request_point_cloud = True
59  request_data_goal.publish_results = True
60 
61  response = ros2py.send_action_goal(self.node, self.request_data_client, request_data_goal)
62  result = response.get_result()
63 
64  self.assertFalse(response.timeout(), msg="Requesting data action timed out.")
65  self.assertTrue(response.successful(), msg="Requesting data action not successful.")
66  self.assertEqual(result.error.code, 0, msg="Requesting data not successful.")
67 
68  def callback_ens(self, msg):
69  self.point_cloud_ens = msg
70 
71  def callback_dip(self, msg):
72  self.point_cloud_dip = msg
73 
74  def transform_point_cloud_to_target_frame(self, point_cloud, target_frame):
75  try:
76  tf = self.tf_buffer.lookup_transform(
77  target_frame,
78  point_cloud.header.frame_id,
79  point_cloud.header.stamp,
80  )
81  except Exception as e:
82  self.node.get_logger().error(e)
83 
84  # See https://robotics.stackexchange.com/questions/109924/
85  t = np.eye(4)
86  q = tf.transform.rotation
87  x = tf.transform.translation
88  t[:3, :3] = Rotation.from_quat([q.x, q.y, q.z, q.w]).as_dcm()
89  t[:3, 3] = [x.x, x.y, x.z]
90 
91  points_msg = list(pc2.read_points(point_cloud))
92  points_map = np.ones((len(points_msg), 4))
93  points_map[:, :3] = points_msg
94  points_map = np.dot(t, points_map.T).T
95 
96  fields = [
97  PointField(name="x", offset=0, datatype=PointField.FLOAT32, count=1),
98  PointField(name="y", offset=4, datatype=PointField.FLOAT32, count=1),
99  PointField(name="z", offset=8, datatype=PointField.FLOAT32, count=1),
100  ]
101 
102  pcl_map_header = point_cloud.header
103  pcl_map_header.frame_id = "map"
104  return pc2.create_cloud(pcl_map_header, fields, points_map[:, :3])
105 
106  def set_parameters(self, parameters=[]):
107  goal = SetParameter.Goal(parameters=parameters)
108  response = ros2py.send_action_goal(self.node, self.set_parameter_client, goal)
109 
110  result = response.get_result()
111  self.assertTrue(response.successful())
112  self.assertEqual(result.error.code, 0)
113 
115  # Enable disparity map scaling.
116  self.set_parameters([Parameter(key=Parameter.SCALING, float_value=0.5)])
117 
118  # Wait until we received a reference point cloud.
119  tries = 0
120  while self.point_cloud_dip is None and tries < MAX_TRIES:
121  self.request_data()
122  ros2py.sleep(self.node, 2)
123  tries += 1
124 
125  self.assertIsNotNone(self.point_cloud_ens, msg="Received no Ensenso point cloud")
126  self.assertIsNotNone(self.point_cloud_dip, msg="Received no reference point cloud")
127 
128  frame = self.point_cloud_ens.header.frame_id
129  self.assertEqual(
130  frame,
131  TARGET_FRAME,
132  msg=f"Expected Ensenso point cloud in frame '{TARGET_FRAME}', got '{frame}'!",
133  )
134 
135  # Since cmdComputeDisparityMap returns the point cloud in the camera's linked coordinate system, the reference
136  # point cloud has to be transformed to that coordinate system as well.
138 
139  points_ens = list(pc2.read_points(self.point_cloud_ens, skip_nans=True))
140  points_dip = list(pc2.read_points(self.point_cloud_dip, skip_nans=True))
141  self.assertGreater(len(points_ens), 0, msg="The Ensenso point cloud is empty")
142  self.assertGreater(len(points_dip), 0, msg="The reference point cloud is empty")
143 
144  def average_values(points, axis=0):
145  return np.average(np.array(points)[:, axis : axis + 1]) * 1000.0
146 
147  self.assertAlmostEqual(
148  average_values(points_ens, axis=0),
149  average_values(points_dip, axis=0),
150  places=3,
151  msg="x values differ",
152  )
153 
154  self.assertAlmostEqual(
155  average_values(points_ens, axis=1),
156  average_values(points_dip, axis=1),
157  places=3,
158  msg="y values differ",
159  )
160 
161  self.assertAlmostEqual(
162  average_values(points_ens, axis=2),
163  average_values(points_dip, axis=2),
164  places=3,
165  msg="z values differ",
166  )
167 
168 
169 def main():
170  ros2py_testing.run_ros1_test("test_scale_disparity_map", TestScaleDisparityMap)
171 
172 
173 if __name__ == "__main__":
174  main()
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.request_data
def request_data(self)
Definition: scale_disparity_map.py:53
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.setUp
def setUp(self)
Definition: scale_disparity_map.py:31
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.point_cloud_dip
point_cloud_dip
Definition: scale_disparity_map.py:45
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.pc_subscriber2
pc_subscriber2
Definition: scale_disparity_map.py:49
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.tf_listener
tf_listener
Definition: scale_disparity_map.py:35
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.callback_dip
def callback_dip(self, msg)
Definition: scale_disparity_map.py:71
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.callback_ens
def callback_ens(self, msg)
Definition: scale_disparity_map.py:68
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.pc_subscriber1
pc_subscriber1
Definition: scale_disparity_map.py:48
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.transform_point_cloud_to_target_frame
def transform_point_cloud_to_target_frame(self, point_cloud, target_frame)
Definition: scale_disparity_map.py:74
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.request_data_client
request_data_client
Definition: scale_disparity_map.py:37
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap
Definition: scale_disparity_map.py:30
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.set_parameter_client
set_parameter_client
Definition: scale_disparity_map.py:38
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.set_parameters
def set_parameters(self, parameters=[])
Definition: scale_disparity_map.py:106
ensenso_camera_test.ros2_testing
Definition: ros2_testing.py:1
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.point_cloud_ens
point_cloud_ens
Definition: scale_disparity_map.py:44
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.test_scale_disparity_map
def test_scale_disparity_map(self)
Definition: scale_disparity_map.py:114
ensenso_camera_test.scale_disparity_map.main
def main()
Definition: scale_disparity_map.py:169
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.node
node
Definition: scale_disparity_map.py:32
ensenso_camera_test.scale_disparity_map.TestScaleDisparityMap.tf_buffer
tf_buffer
Definition: scale_disparity_map.py:34


ensenso_camera_test
Author(s): Ensenso
autogenerated on Wed Apr 2 2025 02:37:52