00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 import numpy
00044 from numpy import matrix, reshape, array, zeros, diag, real
00045
00046 import roslib; roslib.load_manifest('calibration_estimation')
00047 import rospy
00048 from calibration_estimation.full_chain import FullChainRobotParams
00049 from sensor_msgs.msg import JointState
00050
00051 class CameraChainBundler:
00052 """
00053 Tool used to generate a list of CameraChain sensors from a single calibration sample message
00054 """
00055 def __init__(self, valid_configs):
00056 self._valid_configs = valid_configs
00057
00058
00059 def build_blocks(self, M_robot):
00060 """
00061 Input:
00062 - M_robot: A calibration sample of type calibration_msgs/RobotMeasurement
00063 Returns:
00064 - sensors: A list of CameraChainSensor objects that corresponds to all camera chain sensors found
00065 in the calibration message that was passed in.
00066 """
00067 sensors = []
00068 for cur_config in self._valid_configs:
00069 if cur_config["sensor_id"] in [ x.camera_id for x in M_robot.M_cam ]:
00070 if "chain_id" in cur_config.keys() and cur_config["chain_id"] != None and cur_config["chain_id"] != "NONE":
00071 if cur_config["chain_id"] in [ x.chain_id for x in M_robot.M_chain ] :
00072 M_cam = M_robot.M_cam [ [ x.camera_id for x in M_robot.M_cam ].index(cur_config["sensor_id"])]
00073 M_chain = M_robot.M_chain[ [ x.chain_id for x in M_robot.M_chain ].index(cur_config["chain_id"]) ]
00074 if cur_config["frame_id"] != M_cam.cam_info.header.frame_id:
00075 rospy.logwarn('WARNING frame_id of cam_info.header.frame_id({}) from bag file'.format(M_cam.cam_info.header.frame_id))
00076 rospy.logwarn(' and sensors.rectified_cams.{}.frame_id({}) of system.yaml differ'.format(cur_config["chain_id"],cur_config["frame_id"]))
00077 else:
00078 continue
00079 else:
00080 M_cam = M_robot.M_cam [ [ x.camera_id for x in M_robot.M_cam ].index(cur_config["sensor_id"])]
00081 M_chain = None
00082 cur_config["chain_id"] = None
00083 cur_sensor = CameraChainSensor(cur_config, M_cam, M_chain)
00084 sensors.append(cur_sensor)
00085 else:
00086 rospy.logdebug(" Didn't find block")
00087 return sensors
00088
00089 class CameraChainSensor:
00090 def __init__(self, config_dict, M_cam, M_chain):
00091 """
00092 Generates a single sensor block for a single configuration
00093 Inputs:
00094 - config_dict: The configuration for this specific sensor. This looks exactly like
00095 a single element from the valid_configs list passed into CameraChainBundler.__init__
00096 - M_cam: The camera measurement of type calibration_msgs/CameraMeasurement
00097 - M_chain: The chain measurement of type calibration_msgs/ChainMeasurement
00098 """
00099 self.sensor_type = "camera"
00100 self.sensor_id = config_dict["sensor_id"]
00101
00102 self._config_dict = config_dict
00103 self._M_cam = M_cam
00104 self._M_chain = M_chain
00105
00106 self._full_chain = FullChainRobotParams(config_dict["chain_id"], config_dict["frame_id"])
00107
00108 self.terms_per_sample = 2
00109 if "baseline_rgbd" in config_dict.keys():
00110 self.terms_per_sample = 3
00111
00112 def update_config(self, robot_params):
00113 """
00114 On each optimization cycle the set of system parameters that we're optimizing over will change. Thus,
00115 after each change in parameters we must call update_config to ensure that our calculated residual reflects
00116 the newest set of system parameters.
00117 """
00118 self._camera = robot_params.rectified_cams[ self.sensor_id ]
00119 if self._full_chain is not None:
00120 self._full_chain.update_config(robot_params)
00121
00122 def compute_residual(self, target_pts):
00123 """
00124 Computes the measurement residual for the current set of system parameters and target points
00125 Input:
00126 - target_pts: 4XN matrix, storing features point locations in world cartesian homogenous coordinates.
00127 Output:
00128 - r: terms_per_sample*N long vector, storing pixel residuals for the target points in the form of
00129 [u1, v1, u2, v2, ..., uN, vN] or [u1, v1, u'1, u2....]
00130 """
00131 z_mat = self.get_measurement()
00132 h_mat = self.compute_expected(target_pts)
00133 r = array(reshape(h_mat - z_mat, [-1,1]))[:,0]
00134 return r
00135
00136 def compute_residual_scaled(self, target_pts):
00137 """
00138 Computes the residual, and then scales it by sqrt(Gamma), where Gamma
00139 is the information matrix for this measurement (Cov^-1).
00140 """
00141 r = self.compute_residual(target_pts)
00142 gamma_sqrt = self.compute_marginal_gamma_sqrt(target_pts)
00143 r_scaled = gamma_sqrt * matrix(r).T
00144 return array(r_scaled.T)[0]
00145
00146 def compute_marginal_gamma_sqrt(self, target_pts):
00147 """
00148 Calculates the square root of the information matrix for the measurement of the
00149 current set of system parameters at the passed in set of target points.
00150 """
00151 import scipy.linalg
00152 cov = self.compute_cov(target_pts)
00153 gamma = matrix(zeros(cov.shape))
00154 num_pts = self.get_residual_length()/self.terms_per_sample
00155
00156 for k in range(num_pts):
00157 first = self.terms_per_sample*k
00158 last = self.terms_per_sample*k+self.terms_per_sample
00159 sub_cov = matrix(cov[first:last, first:last])
00160 sub_gamma_sqrt_full = matrix(scipy.linalg.sqrtm(sub_cov.I))
00161 sub_gamma_sqrt = real(sub_gamma_sqrt_full)
00162 assert(scipy.linalg.norm(sub_gamma_sqrt_full - sub_gamma_sqrt) < 1e-6)
00163 gamma[first:last, first:last] = sub_gamma_sqrt
00164 return gamma
00165
00166 def get_residual_length(self):
00167 return self.terms_per_sample*len(self._M_cam.image_points)
00168
00169 def get_measurement(self):
00170 """
00171 Get the target's pixel coordinates as measured by the actual sensor
00172 """
00173 if self.terms_per_sample == 2:
00174 return numpy.matrix([[pt.x, pt.y] for pt in self._M_cam.image_points])
00175 elif self.terms_per_sample == 3:
00176 return numpy.matrix([[pt.x, pt.y, self._camera.get_disparity(self._M_cam.cam_info.P, pt.z)] for pt in self._M_cam.image_points])
00177
00178 def compute_expected(self, target_pts):
00179 """
00180 Compute the expected pixel coordinates for a set of target points.
00181 target_pts: 4xN matrix, storing feature points of the target, in homogeneous coords
00182 Returns: target points projected into pixel coordinates, in a Nx2 matrix
00183 """
00184 if self._M_chain is not None:
00185 return self._compute_expected(self._M_chain.chain_state, target_pts)
00186 else:
00187 return self._compute_expected(None, target_pts)
00188 def _compute_expected(self, chain_state, target_pts):
00189 """
00190 Compute what measurement we would expect to see, based on the current system parameters
00191 and the specified target point locations.
00192
00193 Inputs:
00194 - chain_state: The joint angles of this sensor's chain of type sensor_msgs/JointState.
00195 - target_pts: 4XN matrix, storing features point locations in world cartesian homogenous coordinates.
00196
00197 Returns: target points projected into pixel coordinates, in a Nx2 matrix
00198 """
00199
00200 camera_pose_root = self._full_chain.calc_block.fk(chain_state)
00201 cam_frame_pts = camera_pose_root.I * target_pts
00202
00203 pixel_pts = self._camera.project(self._M_cam.cam_info.P, cam_frame_pts)
00204 return pixel_pts.T
00205
00206 def compute_expected_J(self, target_pts):
00207 """
00208 The output Jacobian J shows how moving target_pts in cartesian space affects
00209 the expected measurement in (u,v) camera coordinates.
00210 For n points in target_pts, J is a 2nx3n matrix
00211 Note: This doesn't seem to be used anywhere, except maybe in some drawing code
00212 """
00213 epsilon = 1e-8
00214 N = len(self._M_cam.image_points)
00215 Jt = zeros([N*3, N*self.terms_per_sample])
00216 for k in range(N):
00217
00218 sub_Jt = zeros([3,self.terms_per_sample])
00219 x = target_pts[:,k].copy()
00220 f0 = self.compute_expected(x)
00221 for i in [0,1,2]:
00222 x[i,0] += epsilon
00223 fTest = self.compute_expected(x)
00224 sub_Jt[i,:] = array((fTest - f0) / epsilon)
00225 x[i,0] -= epsilon
00226 Jt[k*3:(k+1)*3, k*self.terms_per_sample:(k+1)*self.terms_per_sample] = sub_Jt
00227 return Jt.T
00228
00229 def compute_cov(self, target_pts):
00230 '''
00231 Computes the measurement covariance in pixel coordinates for the given
00232 set of target points (target_pts)
00233 Input:
00234 - target_pts: 4xN matrix, storing N feature points of the target, in homogeneous coords
00235 '''
00236 cam_cov = matrix(zeros([self.get_residual_length(), self.get_residual_length()]))
00237
00238
00239 var_u = self._camera._cov_dict['u'] * self._camera._cov_dict['u']
00240 var_v = self._camera._cov_dict['v'] * self._camera._cov_dict['v']
00241 for k in range(cam_cov.shape[0]/self.terms_per_sample):
00242 cam_cov[self.terms_per_sample*k , self.terms_per_sample*k] = var_u
00243 cam_cov[self.terms_per_sample*k+1, self.terms_per_sample*k+1] = var_v
00244 if self.terms_per_sample == 3:
00245 cam_cov[self.terms_per_sample*k+2, self.terms_per_sample*k+2] = self._camera._cov_dict['x'] * self._camera._cov_dict['x']
00246
00247
00248 if self._M_chain is not None:
00249 return self.get_chain_cov(target_pts) + cam_cov
00250 else:
00251 return cam_cov
00252
00253 def get_chain_cov(self, target_pts, epsilon=1e-8):
00254 num_joints = len(self._M_chain.chain_state.position)
00255 Jt = zeros([num_joints, self.get_residual_length()])
00256
00257 x = JointState()
00258 x.position = self._M_chain.chain_state.position[:]
00259
00260
00261 f0 = reshape(array(self._compute_expected(x, target_pts)), [-1])
00262 for i in range(num_joints):
00263 x.position = [cur_pos for cur_pos in self._M_chain.chain_state.position]
00264 x.position[i] += epsilon
00265 fTest = reshape(array(self._compute_expected(x, target_pts)), [-1])
00266 Jt[i] = (fTest - f0)/epsilon
00267 cov_angles = [x*x for x in self._full_chain.calc_block._chain._cov_dict['joint_angles']]
00268
00269
00270 return matrix(Jt).T * matrix(diag(cov_angles)) * matrix(Jt)
00271
00272 def build_sparsity_dict(self):
00273 """
00274 Build a dictionary that defines which parameters will in fact affect this measurement.
00275
00276 Returned dictionary should be of the following form:
00277 transforms:
00278 my_transformA: [1, 1, 1, 1, 1, 1]
00279 my_transformB: [1, 1, 1, 1, 1, 1]
00280 chains:
00281 my_chain:
00282 gearing: [1, 1, ---, 1]
00283 rectified_cams:
00284 my_cam:
00285 baseline_shift: 1
00286 f_shift: 1
00287 cx_shift: 1
00288 cy_shift: 1
00289 """
00290 sparsity = self._full_chain.build_sparsity_dict()
00291 sparsity['rectified_cams'] = {}
00292 sparsity['rectified_cams'][self.sensor_id] = dict( [(x,1) for x in self._camera.get_param_names()] )
00293 return sparsity
00294