linear_assignment.py
Go to the documentation of this file.
00001 # vim: expandtab:ts=4:sw=4
00002 from __future__ import absolute_import
00003 import numpy as np
00004 from sklearn.utils.linear_assignment_ import linear_assignment
00005 from . import kalman_filter
00006 
00007 
00008 INFTY_COST = 1e+5
00009 
00010 
00011 def min_cost_matching(
00012         distance_metric, max_distance, tracks, detections, track_indices=None,
00013         detection_indices=None):
00014     """Solve linear assignment problem.
00015 
00016     Parameters
00017     ----------
00018     distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
00019         The distance metric is given a list of tracks and detections as well as
00020         a list of N track indices and M detection indices. The metric should
00021         return the NxM dimensional cost matrix, where element (i, j) is the
00022         association cost between the i-th track in the given track indices and
00023         the j-th detection in the given detection_indices.
00024     max_distance : float
00025         Gating threshold. Associations with cost larger than this value are
00026         disregarded.
00027     tracks : List[track.Track]
00028         A list of predicted tracks at the current time step.
00029     detections : List[detection.Detection]
00030         A list of detections at the current time step.
00031     track_indices : List[int]
00032         List of track indices that maps rows in `cost_matrix` to tracks in
00033         `tracks` (see description above).
00034     detection_indices : List[int]
00035         List of detection indices that maps columns in `cost_matrix` to
00036         detections in `detections` (see description above).
00037 
00038     Returns
00039     -------
00040     (List[(int, int)], List[int], List[int])
00041         Returns a tuple with the following three entries:
00042         * A list of matched track and detection indices.
00043         * A list of unmatched track indices.
00044         * A list of unmatched detection indices.
00045 
00046     """
00047     if track_indices is None:
00048         track_indices = np.arange(len(tracks))
00049     if detection_indices is None:
00050         detection_indices = np.arange(len(detections))
00051 
00052     if len(detection_indices) == 0 or len(track_indices) == 0:
00053         return [], track_indices, detection_indices  # Nothing to match.
00054 
00055     cost_matrix = distance_metric(
00056         tracks, detections, track_indices, detection_indices)
00057     cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
00058     indices = linear_assignment(cost_matrix)
00059 
00060     matches, unmatched_tracks, unmatched_detections = [], [], []
00061     for col, detection_idx in enumerate(detection_indices):
00062         if col not in indices[:, 1]:
00063             unmatched_detections.append(detection_idx)
00064     for row, track_idx in enumerate(track_indices):
00065         if row not in indices[:, 0]:
00066             unmatched_tracks.append(track_idx)
00067     for row, col in indices:
00068         track_idx = track_indices[row]
00069         detection_idx = detection_indices[col]
00070         if cost_matrix[row, col] > max_distance:
00071             unmatched_tracks.append(track_idx)
00072             unmatched_detections.append(detection_idx)
00073         else:
00074             matches.append((track_idx, detection_idx))
00075     return matches, unmatched_tracks, unmatched_detections
00076 
00077 
00078 def matching_cascade(
00079         distance_metric, max_distance, cascade_depth, tracks, detections,
00080         track_indices=None, detection_indices=None):
00081     """Run matching cascade.
00082 
00083     Parameters
00084     ----------
00085     distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
00086         The distance metric is given a list of tracks and detections as well as
00087         a list of N track indices and M detection indices. The metric should
00088         return the NxM dimensional cost matrix, where element (i, j) is the
00089         association cost between the i-th track in the given track indices and
00090         the j-th detection in the given detection indices.
00091     max_distance : float
00092         Gating threshold. Associations with cost larger than this value are
00093         disregarded.
00094     cascade_depth: int
00095         The cascade depth, should be se to the maximum track age.
00096     tracks : List[track.Track]
00097         A list of predicted tracks at the current time step.
00098     detections : List[detection.Detection]
00099         A list of detections at the current time step.
00100     track_indices : Optional[List[int]]
00101         List of track indices that maps rows in `cost_matrix` to tracks in
00102         `tracks` (see description above). Defaults to all tracks.
00103     detection_indices : Optional[List[int]]
00104         List of detection indices that maps columns in `cost_matrix` to
00105         detections in `detections` (see description above). Defaults to all
00106         detections.
00107 
00108     Returns
00109     -------
00110     (List[(int, int)], List[int], List[int])
00111         Returns a tuple with the following three entries:
00112         * A list of matched track and detection indices.
00113         * A list of unmatched track indices.
00114         * A list of unmatched detection indices.
00115 
00116     """
00117     if track_indices is None:
00118         track_indices = list(range(len(tracks)))
00119     if detection_indices is None:
00120         detection_indices = list(range(len(detections)))
00121 
00122     unmatched_detections = detection_indices
00123     matches = []
00124     for level in range(cascade_depth):
00125         if len(unmatched_detections) == 0:  # No detections left
00126             break
00127 
00128         track_indices_l = [
00129             k for k in track_indices
00130             if tracks[k].time_since_update == 1 + level
00131         ]
00132         if len(track_indices_l) == 0:  # Nothing to match at this level
00133             continue
00134 
00135         matches_l, _, unmatched_detections = \
00136             min_cost_matching(
00137                 distance_metric, max_distance, tracks, detections,
00138                 track_indices_l, unmatched_detections)
00139         matches += matches_l
00140     unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
00141     return matches, unmatched_tracks, unmatched_detections
00142 
00143 
00144 def gate_cost_matrix(
00145         kf, cost_matrix, tracks, detections, track_indices, detection_indices,
00146         gated_cost=INFTY_COST, only_position=False):
00147     """Invalidate infeasible entries in cost matrix based on the state
00148     distributions obtained by Kalman filtering.
00149 
00150     Parameters
00151     ----------
00152     kf : The Kalman filter.
00153     cost_matrix : ndarray
00154         The NxM dimensional cost matrix, where N is the number of track indices
00155         and M is the number of detection indices, such that entry (i, j) is the
00156         association cost between `tracks[track_indices[i]]` and
00157         `detections[detection_indices[j]]`.
00158     tracks : List[track.Track]
00159         A list of predicted tracks at the current time step.
00160     detections : List[detection.Detection]
00161         A list of detections at the current time step.
00162     track_indices : List[int]
00163         List of track indices that maps rows in `cost_matrix` to tracks in
00164         `tracks` (see description above).
00165     detection_indices : List[int]
00166         List of detection indices that maps columns in `cost_matrix` to
00167         detections in `detections` (see description above).
00168     gated_cost : Optional[float]
00169         Entries in the cost matrix corresponding to infeasible associations are
00170         set this value. Defaults to a very large value.
00171     only_position : Optional[bool]
00172         If True, only the x, y position of the state distribution is considered
00173         during gating. Defaults to False.
00174 
00175     Returns
00176     -------
00177     ndarray
00178         Returns the modified cost matrix.
00179 
00180     """
00181     gating_dim = 2 if only_position else 4
00182     gating_threshold = kalman_filter.chi2inv95[gating_dim]
00183     measurements = np.asarray(
00184         [detections[i].to_xyah() for i in detection_indices])
00185     for row, track_idx in enumerate(track_indices):
00186         track = tracks[track_idx]
00187         gating_distance = kf.gating_distance(
00188             track.mean, track.covariance, measurements, only_position)
00189         cost_matrix[row, gating_distance > gating_threshold] = gated_cost
00190     return cost_matrix


jsk_perception
Author(s): Manabu Saito, Ryohei Ueda
autogenerated on Tue Jul 2 2019 19:41:07