test_camera.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- python -*-
00003 
00004 import sys
00005 import time
00006 import unittest
00007 import rospy
00008 import rostest
00009 import numpy as np
00010 import tf2_ros
00011 import tf2_geometry_msgs
00012 from sensor_msgs.msg import Image
00013 from geometry_msgs.msg import PointStamped
00014 from cv_bridge import CvBridge, CvBridgeError
00015 from o3d3xx.msg import Extrinsics
00016 
00017 class TestCamera(unittest.TestCase):
00018     """
00019     This class tests the following:
00020       - Getting data from the camera
00021       - Computing the Cartesian data and comparing it to ground truth
00022       - To do the comparison to ground truther, the computed cartesian data
00023         must also be transformed, to do that we use the tf2 API and hence The
00024         transform from the optical frame to the camera frame that we are
00025         publishing is also tested.
00026 
00027     NOTE: The camera h/w is needed to run this test.
00028     """
00029 
00030     def __init__(self, *args):
00031         super(TestCamera, self).__init__(*args)
00032         self.success = False
00033 
00034         self.cloud_ = None # ground truth for cartesian computation
00035         self.rdis_ = None
00036         self.uvec_ = None
00037         self.extrinsics_ = None
00038 
00039     def test_camera(self):
00040         time.sleep(2.0) # <-- let rosmaster and camera node start
00041         rospy.init_node('test_camera')
00042 
00043         self.bridge_ = CvBridge()
00044         self.rdis_sub_ = \
00045           rospy.Subscriber("/depth", Image, self.image_cb,
00046                            queue_size=None, callback_args="rdis")
00047         self.cloud_sub_ = \
00048           rospy.Subscriber("/xyz_image", Image, self.image_cb,
00049                            queue_size=None, callback_args="cloud")
00050         self.uvec_sub_ = \
00051           rospy.Subscriber("/unit_vectors", Image, self.image_cb,
00052                            queue_size=None, callback_args="uvec")
00053         self.extrinsics_sub_ = \
00054           rospy.Subscriber("/extrinsics", Extrinsics, self.extrinsics_cb,
00055                            queue_size=None)
00056 
00057         # If it takes more than 10 secs to run our test, something is wrong.
00058         timeout_t = time.time() + 10.0
00059         rate = rospy.Rate(10.0)
00060         while ((not rospy.is_shutdown()) and
00061                (not self.success) and
00062                (time.time() < timeout_t)):
00063             # Make sure we have all the data we need to compute
00064             if ((self.rdis_ is not None) and
00065                 (self.cloud_ is not None) and
00066                 (self.uvec_ is not None) and
00067                 (self.extrinsics_ is not None)):
00068 
00069                 # Make sure the cloud and rdis are from the same image
00070                 # acquisition.
00071                 d = self.rdis_.header.stamp - self.cloud_.header.stamp
00072                 if d.to_sec() == 0:
00073                     self.success = self.compute_cartesian()
00074                     break
00075                 else:
00076                     # get new data
00077                     self.rdis_ = None
00078                     self.cloud_ = None
00079 
00080             rate.sleep()
00081 
00082         self.assertTrue(self.success)
00083 
00084     def extrinsics_cb(self, data, *args):
00085         if self.extrinsics_ is None:
00086             self.extrinsics_ = data
00087 
00088     def image_cb(self, data, *args):
00089         im_type = args[0]
00090         if im_type == "rdis":
00091             if self.rdis_ is None:
00092                 self.rdis_ = data
00093         elif im_type == "cloud":
00094             if self.cloud_ is None:
00095                 self.cloud_ = data
00096         elif im_type == "uvec":
00097             if self.uvec_ is None:
00098                 self.uvec_ = data
00099 
00100     def compute_cartesian(self):
00101         """
00102         Computes the Cartesian data from the radial distance image, unit
00103         vectors, and extrinsics. Then transforms it to the camera frame using
00104         the tf2 api. Once transformed, it will do a pixel-by-pixel comparision
00105         to ground truth.
00106 
00107         Returns a bool indicating the the success of the computation.
00108         """
00109         rdis = np.array(self.bridge_.imgmsg_to_cv2(self.rdis_))
00110         uvec = np.array(self.bridge_.imgmsg_to_cv2(self.uvec_))
00111         cloud = np.array(self.bridge_.imgmsg_to_cv2(self.cloud_))
00112 
00113         # Get the unit vectors
00114         ex = uvec[:,:,0]
00115         ey = uvec[:,:,1]
00116         ez = uvec[:,:,2]
00117 
00118         # Cast the radial distance image to float
00119         rdis_f = rdis.astype(np.float32)
00120 
00121         # Compute Cartesian
00122         x_ = (ex * rdis_f) + self.extrinsics_.tx
00123         y_ = (ey * rdis_f) + self.extrinsics_.ty
00124         z_ = (ez * rdis_f) + self.extrinsics_.tz
00125 
00126         # Transform to target frame
00127         #
00128         # NOTE: This could obviously be vectorized if we exploit our apriori
00129         # knowledge of the transform, but we want to test the transform we are
00130         # broadcasting via tf and the tf2 api for doing the transformation.
00131         #
00132         # I agree, this loop is slow and ugly :-( -- mostly b/c using the tf2
00133         # interface with our data structures here is kind of wonky
00134         #
00135         tf_buffer = tf2_ros.Buffer()
00136         tf_listener = tf2_ros.TransformListener(tf_buffer)
00137         n_rows = x_.shape[0]
00138         n_cols = x_.shape[1]
00139         x_f = np.zeros((n_rows, n_cols), dtype=np.float64)
00140         y_f = np.zeros((n_rows, n_cols), dtype=np.float64)
00141         z_f = np.zeros((n_rows, n_cols), dtype=np.float64)
00142         for i in range(n_rows):
00143             for j in range(n_cols):
00144                 p = PointStamped()
00145                 p.header = self.rdis_.header
00146                 p.point.x = x_[i,j]
00147                 p.point.y = y_[i,j]
00148                 p.point.z = z_[i,j]
00149                 pt = tf_buffer.transform(p, self.cloud_.header.frame_id,
00150                                          rospy.Duration(2.0))
00151                 x_f[i,j] = pt.point.x
00152                 y_f[i,j] = pt.point.y
00153                 z_f[i,j] = pt.point.z
00154 
00155         # Cast to int16_t
00156         x_i = x_f.astype(np.int16)
00157         y_i = y_f.astype(np.int16)
00158         z_i = z_f.astype(np.int16)
00159 
00160         # Explicitly set to zero any bad pixels
00161         mask = rdis == 0
00162         x_i[mask] = 0
00163         y_i[mask] = 0
00164         z_i[mask] = 0
00165 
00166         # Compare to ground truth -- its subtle, but we are testing here that
00167         # we are accurate to w/in 1mm
00168         x_mask = np.fabs(x_i - cloud[:,:,0]) > 1
00169         y_mask = np.fabs(y_i - cloud[:,:,1]) > 1
00170         z_mask = np.fabs(z_i - cloud[:,:,2]) > 1
00171 
00172         self.assertTrue(x_mask.sum() == 0)
00173         self.assertTrue(y_mask.sum() == 0)
00174         self.assertTrue(z_mask.sum() == 0)
00175 
00176         # if any of the above asserts fail, the test will error out,
00177         # so, we return True here carte blanche
00178         return True
00179 
00180 def main():
00181     rostest.rosrun('o3d3xx', 'test_camera', TestCamera, sys.argv)
00182     return 0
00183 
00184 if __name__ == '__main__':
00185     sys.exit(main())


o3d3xx
Author(s): Tom Panzarella
autogenerated on Thu Jun 6 2019 18:17:40