00001
00002
00003
00004 import sys
00005 import time
00006 import unittest
00007 import rospy
00008 import rostest
00009 import numpy as np
00010 import tf2_ros
00011 import tf2_geometry_msgs
00012 from sensor_msgs.msg import Image
00013 from geometry_msgs.msg import PointStamped
00014 from cv_bridge import CvBridge, CvBridgeError
00015 from o3d3xx.msg import Extrinsics
00016
00017 class TestCamera(unittest.TestCase):
00018 """
00019 This class tests the following:
00020 - Getting data from the camera
00021 - Computing the Cartesian data and comparing it to ground truth
00022 - To do the comparison to ground truther, the computed cartesian data
00023 must also be transformed, to do that we use the tf2 API and hence The
00024 transform from the optical frame to the camera frame that we are
00025 publishing is also tested.
00026
00027 NOTE: The camera h/w is needed to run this test.
00028 """
00029
00030 def __init__(self, *args):
00031 super(TestCamera, self).__init__(*args)
00032 self.success = False
00033
00034 self.cloud_ = None
00035 self.rdis_ = None
00036 self.uvec_ = None
00037 self.extrinsics_ = None
00038
00039 def test_camera(self):
00040 time.sleep(2.0)
00041 rospy.init_node('test_camera')
00042
00043 self.bridge_ = CvBridge()
00044 self.rdis_sub_ = \
00045 rospy.Subscriber("/depth", Image, self.image_cb,
00046 queue_size=None, callback_args="rdis")
00047 self.cloud_sub_ = \
00048 rospy.Subscriber("/xyz_image", Image, self.image_cb,
00049 queue_size=None, callback_args="cloud")
00050 self.uvec_sub_ = \
00051 rospy.Subscriber("/unit_vectors", Image, self.image_cb,
00052 queue_size=None, callback_args="uvec")
00053 self.extrinsics_sub_ = \
00054 rospy.Subscriber("/extrinsics", Extrinsics, self.extrinsics_cb,
00055 queue_size=None)
00056
00057
00058 timeout_t = time.time() + 10.0
00059 rate = rospy.Rate(10.0)
00060 while ((not rospy.is_shutdown()) and
00061 (not self.success) and
00062 (time.time() < timeout_t)):
00063
00064 if ((self.rdis_ is not None) and
00065 (self.cloud_ is not None) and
00066 (self.uvec_ is not None) and
00067 (self.extrinsics_ is not None)):
00068
00069
00070
00071 d = self.rdis_.header.stamp - self.cloud_.header.stamp
00072 if d.to_sec() == 0:
00073 self.success = self.compute_cartesian()
00074 break
00075 else:
00076
00077 self.rdis_ = None
00078 self.cloud_ = None
00079
00080 rate.sleep()
00081
00082 self.assertTrue(self.success)
00083
00084 def extrinsics_cb(self, data, *args):
00085 if self.extrinsics_ is None:
00086 self.extrinsics_ = data
00087
00088 def image_cb(self, data, *args):
00089 im_type = args[0]
00090 if im_type == "rdis":
00091 if self.rdis_ is None:
00092 self.rdis_ = data
00093 elif im_type == "cloud":
00094 if self.cloud_ is None:
00095 self.cloud_ = data
00096 elif im_type == "uvec":
00097 if self.uvec_ is None:
00098 self.uvec_ = data
00099
00100 def compute_cartesian(self):
00101 """
00102 Computes the Cartesian data from the radial distance image, unit
00103 vectors, and extrinsics. Then transforms it to the camera frame using
00104 the tf2 api. Once transformed, it will do a pixel-by-pixel comparision
00105 to ground truth.
00106
00107 Returns a bool indicating the the success of the computation.
00108 """
00109 rdis = np.array(self.bridge_.imgmsg_to_cv2(self.rdis_))
00110 uvec = np.array(self.bridge_.imgmsg_to_cv2(self.uvec_))
00111 cloud = np.array(self.bridge_.imgmsg_to_cv2(self.cloud_))
00112
00113
00114 ex = uvec[:,:,0]
00115 ey = uvec[:,:,1]
00116 ez = uvec[:,:,2]
00117
00118
00119 rdis_f = rdis.astype(np.float32)
00120
00121
00122 x_ = (ex * rdis_f) + self.extrinsics_.tx
00123 y_ = (ey * rdis_f) + self.extrinsics_.ty
00124 z_ = (ez * rdis_f) + self.extrinsics_.tz
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135 tf_buffer = tf2_ros.Buffer()
00136 tf_listener = tf2_ros.TransformListener(tf_buffer)
00137 n_rows = x_.shape[0]
00138 n_cols = x_.shape[1]
00139 x_f = np.zeros((n_rows, n_cols), dtype=np.float64)
00140 y_f = np.zeros((n_rows, n_cols), dtype=np.float64)
00141 z_f = np.zeros((n_rows, n_cols), dtype=np.float64)
00142 for i in range(n_rows):
00143 for j in range(n_cols):
00144 p = PointStamped()
00145 p.header = self.rdis_.header
00146 p.point.x = x_[i,j]
00147 p.point.y = y_[i,j]
00148 p.point.z = z_[i,j]
00149 pt = tf_buffer.transform(p, self.cloud_.header.frame_id,
00150 rospy.Duration(2.0))
00151 x_f[i,j] = pt.point.x
00152 y_f[i,j] = pt.point.y
00153 z_f[i,j] = pt.point.z
00154
00155
00156 x_i = x_f.astype(np.int16)
00157 y_i = y_f.astype(np.int16)
00158 z_i = z_f.astype(np.int16)
00159
00160
00161 mask = rdis == 0
00162 x_i[mask] = 0
00163 y_i[mask] = 0
00164 z_i[mask] = 0
00165
00166
00167
00168 x_mask = np.fabs(x_i - cloud[:,:,0]) > 1
00169 y_mask = np.fabs(y_i - cloud[:,:,1]) > 1
00170 z_mask = np.fabs(z_i - cloud[:,:,2]) > 1
00171
00172 self.assertTrue(x_mask.sum() == 0)
00173 self.assertTrue(y_mask.sum() == 0)
00174 self.assertTrue(z_mask.sum() == 0)
00175
00176
00177
00178 return True
00179
00180 def main():
00181 rostest.rosrun('o3d3xx', 'test_camera', TestCamera, sys.argv)
00182 return 0
00183
00184 if __name__ == '__main__':
00185 sys.exit(main())