00001
00002 import numpy as np
00003
00004
00005 def _pdist(a, b):
00006 """Compute pair-wise squared distance between points in `a` and `b`.
00007
00008 Parameters
00009 ----------
00010 a : array_like
00011 An NxM matrix of N samples of dimensionality M.
00012 b : array_like
00013 An LxM matrix of L samples of dimensionality M.
00014
00015 Returns
00016 -------
00017 ndarray
00018 Returns a matrix of size len(a), len(b) such that eleement (i, j)
00019 contains the squared distance between `a[i]` and `b[j]`.
00020
00021 """
00022 a, b = np.asarray(a), np.asarray(b)
00023 if len(a) == 0 or len(b) == 0:
00024 return np.zeros((len(a), len(b)))
00025 a2, b2 = np.square(a).sum(axis=1), np.square(b).sum(axis=1)
00026 r2 = -2. * np.dot(a, b.T) + a2[:, None] + b2[None, :]
00027 r2 = np.clip(r2, 0., float(np.inf))
00028 return r2
00029
00030
00031 def _cosine_distance(a, b, data_is_normalized=False):
00032 """Compute pair-wise cosine distance between points in `a` and `b`.
00033
00034 Parameters
00035 ----------
00036 a : array_like
00037 An NxM matrix of N samples of dimensionality M.
00038 b : array_like
00039 An LxM matrix of L samples of dimensionality M.
00040 data_is_normalized : Optional[bool]
00041 If True, assumes rows in a and b are unit length vectors.
00042 Otherwise, a and b are explicitly normalized to lenght 1.
00043
00044 Returns
00045 -------
00046 ndarray
00047 Returns a matrix of size len(a), len(b) such that eleement (i, j)
00048 contains the squared distance between `a[i]` and `b[j]`.
00049
00050 """
00051 if not data_is_normalized:
00052 a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)
00053 b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True)
00054 return 1. - np.dot(a, b.T)
00055
00056
00057 def _nn_euclidean_distance(x, y):
00058 """ Helper function for nearest neighbor distance metric (Euclidean).
00059
00060 Parameters
00061 ----------
00062 x : ndarray
00063 A matrix of N row-vectors (sample points).
00064 y : ndarray
00065 A matrix of M row-vectors (query points).
00066
00067 Returns
00068 -------
00069 ndarray
00070 A vector of length M that contains for each entry in `y` the
00071 smallest Euclidean distance to a sample in `x`.
00072
00073 """
00074 distances = _pdist(x, y)
00075 return np.maximum(0.0, distances.min(axis=0))
00076
00077
00078 def _nn_cosine_distance(x, y):
00079 """ Helper function for nearest neighbor distance metric (cosine).
00080
00081 Parameters
00082 ----------
00083 x : ndarray
00084 A matrix of N row-vectors (sample points).
00085 y : ndarray
00086 A matrix of M row-vectors (query points).
00087
00088 Returns
00089 -------
00090 ndarray
00091 A vector of length M that contains for each entry in `y` the
00092 smallest cosine distance to a sample in `x`.
00093
00094 """
00095 distances = _cosine_distance(x, y)
00096 return distances.min(axis=0)
00097
00098
00099 class NearestNeighborDistanceMetric(object):
00100 """
00101 A nearest neighbor distance metric that, for each target, returns
00102 the closest distance to any sample that has been observed so far.
00103
00104 Parameters
00105 ----------
00106 metric : str
00107 Either "euclidean" or "cosine".
00108 matching_threshold: float
00109 The matching threshold. Samples with larger distance are considered an
00110 invalid match.
00111 budget : Optional[int]
00112 If not None, fix samples per class to at most this number. Removes
00113 the oldest samples when the budget is reached.
00114
00115 Attributes
00116 ----------
00117 samples : Dict[int -> List[ndarray]]
00118 A dictionary that maps from target identities to the list of samples
00119 that have been observed so far.
00120
00121 """
00122
00123 def __init__(self, metric, matching_threshold, budget=None):
00124
00125
00126 if metric == "euclidean":
00127 self._metric = _nn_euclidean_distance
00128 elif metric == "cosine":
00129 self._metric = _nn_cosine_distance
00130 else:
00131 raise ValueError(
00132 "Invalid metric; must be either 'euclidean' or 'cosine'")
00133 self.matching_threshold = matching_threshold
00134 self.budget = budget
00135 self.samples = {}
00136
00137 def partial_fit(self, features, targets, active_targets):
00138 """Update the distance metric with new data.
00139
00140 Parameters
00141 ----------
00142 features : ndarray
00143 An NxM matrix of N features of dimensionality M.
00144 targets : ndarray
00145 An integer array of associated target identities.
00146 active_targets : List[int]
00147 A list of targets that are currently present in the scene.
00148
00149 """
00150 for feature, target in zip(features, targets):
00151 self.samples.setdefault(target, []).append(feature)
00152 if self.budget is not None:
00153 self.samples[target] = self.samples[target][-self.budget:]
00154 self.samples = {k: self.samples[k] for k in active_targets}
00155
00156 def distance(self, features, targets):
00157 """Compute distance between features and targets.
00158
00159 Parameters
00160 ----------
00161 features : ndarray
00162 An NxM matrix of N features of dimensionality M.
00163 targets : List[int]
00164 A list of targets to match the given `features` against.
00165
00166 Returns
00167 -------
00168 ndarray
00169 Returns a cost matrix of shape len(targets), len(features), where
00170 element (i, j) contains the closest squared distance between
00171 `targets[i]` and `features[j]`.
00172
00173 """
00174 cost_matrix = np.zeros((len(targets), len(features)))
00175 for i, target in enumerate(targets):
00176 cost_matrix[i, :] = self._metric(self.samples[target], features)
00177 return cost_matrix