linear_assignment.py
Go to the documentation of this file.
1 # vim: expandtab:ts=4:sw=4
2 from __future__ import absolute_import
3 import numpy as np
4 from sklearn.utils.linear_assignment_ import linear_assignment
5 from . import kalman_filter
6 
7 
8 INFTY_COST = 1e+5
9 
10 
12  distance_metric, max_distance, tracks, detections, track_indices=None,
13  detection_indices=None):
14  """Solve linear assignment problem.
15 
16  Parameters
17  ----------
18  distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
19  The distance metric is given a list of tracks and detections as well as
20  a list of N track indices and M detection indices. The metric should
21  return the NxM dimensional cost matrix, where element (i, j) is the
22  association cost between the i-th track in the given track indices and
23  the j-th detection in the given detection_indices.
24  max_distance : float
25  Gating threshold. Associations with cost larger than this value are
26  disregarded.
27  tracks : List[track.Track]
28  A list of predicted tracks at the current time step.
29  detections : List[detection.Detection]
30  A list of detections at the current time step.
31  track_indices : List[int]
32  List of track indices that maps rows in `cost_matrix` to tracks in
33  `tracks` (see description above).
34  detection_indices : List[int]
35  List of detection indices that maps columns in `cost_matrix` to
36  detections in `detections` (see description above).
37 
38  Returns
39  -------
40  (List[(int, int)], List[int], List[int])
41  Returns a tuple with the following three entries:
42  * A list of matched track and detection indices.
43  * A list of unmatched track indices.
44  * A list of unmatched detection indices.
45 
46  """
47  if track_indices is None:
48  track_indices = np.arange(len(tracks))
49  if detection_indices is None:
50  detection_indices = np.arange(len(detections))
51 
52  if len(detection_indices) == 0 or len(track_indices) == 0:
53  return [], track_indices, detection_indices # Nothing to match.
54 
55  cost_matrix = distance_metric(
56  tracks, detections, track_indices, detection_indices)
57  cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
58  indices = linear_assignment(cost_matrix)
59 
60  matches, unmatched_tracks, unmatched_detections = [], [], []
61  for col, detection_idx in enumerate(detection_indices):
62  if col not in indices[:, 1]:
63  unmatched_detections.append(detection_idx)
64  for row, track_idx in enumerate(track_indices):
65  if row not in indices[:, 0]:
66  unmatched_tracks.append(track_idx)
67  for row, col in indices:
68  track_idx = track_indices[row]
69  detection_idx = detection_indices[col]
70  if cost_matrix[row, col] > max_distance:
71  unmatched_tracks.append(track_idx)
72  unmatched_detections.append(detection_idx)
73  else:
74  matches.append((track_idx, detection_idx))
75  return matches, unmatched_tracks, unmatched_detections
76 
77 
79  distance_metric, max_distance, cascade_depth, tracks, detections,
80  track_indices=None, detection_indices=None):
81  """Run matching cascade.
82 
83  Parameters
84  ----------
85  distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
86  The distance metric is given a list of tracks and detections as well as
87  a list of N track indices and M detection indices. The metric should
88  return the NxM dimensional cost matrix, where element (i, j) is the
89  association cost between the i-th track in the given track indices and
90  the j-th detection in the given detection indices.
91  max_distance : float
92  Gating threshold. Associations with cost larger than this value are
93  disregarded.
94  cascade_depth: int
95  The cascade depth, should be se to the maximum track age.
96  tracks : List[track.Track]
97  A list of predicted tracks at the current time step.
98  detections : List[detection.Detection]
99  A list of detections at the current time step.
100  track_indices : Optional[List[int]]
101  List of track indices that maps rows in `cost_matrix` to tracks in
102  `tracks` (see description above). Defaults to all tracks.
103  detection_indices : Optional[List[int]]
104  List of detection indices that maps columns in `cost_matrix` to
105  detections in `detections` (see description above). Defaults to all
106  detections.
107 
108  Returns
109  -------
110  (List[(int, int)], List[int], List[int])
111  Returns a tuple with the following three entries:
112  * A list of matched track and detection indices.
113  * A list of unmatched track indices.
114  * A list of unmatched detection indices.
115 
116  """
117  if track_indices is None:
118  track_indices = list(range(len(tracks)))
119  if detection_indices is None:
120  detection_indices = list(range(len(detections)))
121 
122  unmatched_detections = detection_indices
123  matches = []
124  for level in range(cascade_depth):
125  if len(unmatched_detections) == 0: # No detections left
126  break
127 
128  track_indices_l = [
129  k for k in track_indices
130  if tracks[k].time_since_update == 1 + level
131  ]
132  if len(track_indices_l) == 0: # Nothing to match at this level
133  continue
134 
135  matches_l, _, unmatched_detections = \
137  distance_metric, max_distance, tracks, detections,
138  track_indices_l, unmatched_detections)
139  matches += matches_l
140  unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
141  return matches, unmatched_tracks, unmatched_detections
142 
143 
144 def gate_cost_matrix(
145  kf, cost_matrix, tracks, detections, track_indices, detection_indices,
146  gated_cost=INFTY_COST, only_position=False):
147  """Invalidate infeasible entries in cost matrix based on the state
148  distributions obtained by Kalman filtering.
149 
150  Parameters
151  ----------
152  kf : The Kalman filter.
153  cost_matrix : ndarray
154  The NxM dimensional cost matrix, where N is the number of track indices
155  and M is the number of detection indices, such that entry (i, j) is the
156  association cost between `tracks[track_indices[i]]` and
157  `detections[detection_indices[j]]`.
158  tracks : List[track.Track]
159  A list of predicted tracks at the current time step.
160  detections : List[detection.Detection]
161  A list of detections at the current time step.
162  track_indices : List[int]
163  List of track indices that maps rows in `cost_matrix` to tracks in
164  `tracks` (see description above).
165  detection_indices : List[int]
166  List of detection indices that maps columns in `cost_matrix` to
167  detections in `detections` (see description above).
168  gated_cost : Optional[float]
169  Entries in the cost matrix corresponding to infeasible associations are
170  set this value. Defaults to a very large value.
171  only_position : Optional[bool]
172  If True, only the x, y position of the state distribution is considered
173  during gating. Defaults to False.
174 
175  Returns
176  -------
177  ndarray
178  Returns the modified cost matrix.
179 
180  """
181  gating_dim = 2 if only_position else 4
182  gating_threshold = kalman_filter.chi2inv95[gating_dim]
183  measurements = np.asarray(
184  [detections[i].to_xyah() for i in detection_indices])
185  for row, track_idx in enumerate(track_indices):
186  track = tracks[track_idx]
187  gating_distance = kf.gating_distance(
188  track.mean, track.covariance, measurements, only_position)
189  cost_matrix[row, gating_distance > gating_threshold] = gated_cost
190  return cost_matrix
def min_cost_matching(distance_metric, max_distance, tracks, detections, track_indices=None, detection_indices=None)
def matching_cascade(distance_metric, max_distance, cascade_depth, tracks, detections, track_indices=None, detection_indices=None)
def gate_cost_matrix(kf, cost_matrix, tracks, detections, track_indices, detection_indices, gated_cost=INFTY_COST, only_position=False)


jsk_perception
Author(s): Manabu Saito, Ryohei Ueda
autogenerated on Mon May 3 2021 03:03:27