00001
00002 from __future__ import absolute_import
00003 import numpy as np
00004 from sklearn.utils.linear_assignment_ import linear_assignment
00005 from . import kalman_filter
00006
00007
00008 INFTY_COST = 1e+5
00009
00010
00011 def min_cost_matching(
00012 distance_metric, max_distance, tracks, detections, track_indices=None,
00013 detection_indices=None):
00014 """Solve linear assignment problem.
00015
00016 Parameters
00017 ----------
00018 distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
00019 The distance metric is given a list of tracks and detections as well as
00020 a list of N track indices and M detection indices. The metric should
00021 return the NxM dimensional cost matrix, where element (i, j) is the
00022 association cost between the i-th track in the given track indices and
00023 the j-th detection in the given detection_indices.
00024 max_distance : float
00025 Gating threshold. Associations with cost larger than this value are
00026 disregarded.
00027 tracks : List[track.Track]
00028 A list of predicted tracks at the current time step.
00029 detections : List[detection.Detection]
00030 A list of detections at the current time step.
00031 track_indices : List[int]
00032 List of track indices that maps rows in `cost_matrix` to tracks in
00033 `tracks` (see description above).
00034 detection_indices : List[int]
00035 List of detection indices that maps columns in `cost_matrix` to
00036 detections in `detections` (see description above).
00037
00038 Returns
00039 -------
00040 (List[(int, int)], List[int], List[int])
00041 Returns a tuple with the following three entries:
00042 * A list of matched track and detection indices.
00043 * A list of unmatched track indices.
00044 * A list of unmatched detection indices.
00045
00046 """
00047 if track_indices is None:
00048 track_indices = np.arange(len(tracks))
00049 if detection_indices is None:
00050 detection_indices = np.arange(len(detections))
00051
00052 if len(detection_indices) == 0 or len(track_indices) == 0:
00053 return [], track_indices, detection_indices
00054
00055 cost_matrix = distance_metric(
00056 tracks, detections, track_indices, detection_indices)
00057 cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
00058 indices = linear_assignment(cost_matrix)
00059
00060 matches, unmatched_tracks, unmatched_detections = [], [], []
00061 for col, detection_idx in enumerate(detection_indices):
00062 if col not in indices[:, 1]:
00063 unmatched_detections.append(detection_idx)
00064 for row, track_idx in enumerate(track_indices):
00065 if row not in indices[:, 0]:
00066 unmatched_tracks.append(track_idx)
00067 for row, col in indices:
00068 track_idx = track_indices[row]
00069 detection_idx = detection_indices[col]
00070 if cost_matrix[row, col] > max_distance:
00071 unmatched_tracks.append(track_idx)
00072 unmatched_detections.append(detection_idx)
00073 else:
00074 matches.append((track_idx, detection_idx))
00075 return matches, unmatched_tracks, unmatched_detections
00076
00077
00078 def matching_cascade(
00079 distance_metric, max_distance, cascade_depth, tracks, detections,
00080 track_indices=None, detection_indices=None):
00081 """Run matching cascade.
00082
00083 Parameters
00084 ----------
00085 distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
00086 The distance metric is given a list of tracks and detections as well as
00087 a list of N track indices and M detection indices. The metric should
00088 return the NxM dimensional cost matrix, where element (i, j) is the
00089 association cost between the i-th track in the given track indices and
00090 the j-th detection in the given detection indices.
00091 max_distance : float
00092 Gating threshold. Associations with cost larger than this value are
00093 disregarded.
00094 cascade_depth: int
00095 The cascade depth, should be se to the maximum track age.
00096 tracks : List[track.Track]
00097 A list of predicted tracks at the current time step.
00098 detections : List[detection.Detection]
00099 A list of detections at the current time step.
00100 track_indices : Optional[List[int]]
00101 List of track indices that maps rows in `cost_matrix` to tracks in
00102 `tracks` (see description above). Defaults to all tracks.
00103 detection_indices : Optional[List[int]]
00104 List of detection indices that maps columns in `cost_matrix` to
00105 detections in `detections` (see description above). Defaults to all
00106 detections.
00107
00108 Returns
00109 -------
00110 (List[(int, int)], List[int], List[int])
00111 Returns a tuple with the following three entries:
00112 * A list of matched track and detection indices.
00113 * A list of unmatched track indices.
00114 * A list of unmatched detection indices.
00115
00116 """
00117 if track_indices is None:
00118 track_indices = list(range(len(tracks)))
00119 if detection_indices is None:
00120 detection_indices = list(range(len(detections)))
00121
00122 unmatched_detections = detection_indices
00123 matches = []
00124 for level in range(cascade_depth):
00125 if len(unmatched_detections) == 0:
00126 break
00127
00128 track_indices_l = [
00129 k for k in track_indices
00130 if tracks[k].time_since_update == 1 + level
00131 ]
00132 if len(track_indices_l) == 0:
00133 continue
00134
00135 matches_l, _, unmatched_detections = \
00136 min_cost_matching(
00137 distance_metric, max_distance, tracks, detections,
00138 track_indices_l, unmatched_detections)
00139 matches += matches_l
00140 unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
00141 return matches, unmatched_tracks, unmatched_detections
00142
00143
00144 def gate_cost_matrix(
00145 kf, cost_matrix, tracks, detections, track_indices, detection_indices,
00146 gated_cost=INFTY_COST, only_position=False):
00147 """Invalidate infeasible entries in cost matrix based on the state
00148 distributions obtained by Kalman filtering.
00149
00150 Parameters
00151 ----------
00152 kf : The Kalman filter.
00153 cost_matrix : ndarray
00154 The NxM dimensional cost matrix, where N is the number of track indices
00155 and M is the number of detection indices, such that entry (i, j) is the
00156 association cost between `tracks[track_indices[i]]` and
00157 `detections[detection_indices[j]]`.
00158 tracks : List[track.Track]
00159 A list of predicted tracks at the current time step.
00160 detections : List[detection.Detection]
00161 A list of detections at the current time step.
00162 track_indices : List[int]
00163 List of track indices that maps rows in `cost_matrix` to tracks in
00164 `tracks` (see description above).
00165 detection_indices : List[int]
00166 List of detection indices that maps columns in `cost_matrix` to
00167 detections in `detections` (see description above).
00168 gated_cost : Optional[float]
00169 Entries in the cost matrix corresponding to infeasible associations are
00170 set this value. Defaults to a very large value.
00171 only_position : Optional[bool]
00172 If True, only the x, y position of the state distribution is considered
00173 during gating. Defaults to False.
00174
00175 Returns
00176 -------
00177 ndarray
00178 Returns the modified cost matrix.
00179
00180 """
00181 gating_dim = 2 if only_position else 4
00182 gating_threshold = kalman_filter.chi2inv95[gating_dim]
00183 measurements = np.asarray(
00184 [detections[i].to_xyah() for i in detection_indices])
00185 for row, track_idx in enumerate(track_indices):
00186 track = tracks[track_idx]
00187 gating_distance = kf.gating_distance(
00188 track.mean, track.covariance, measurements, only_position)
00189 cost_matrix[row, gating_distance > gating_threshold] = gated_cost
00190 return cost_matrix