Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.time;
00018
00019 import com.google.common.annotations.VisibleForTesting;
00020 import com.google.common.base.Preconditions;
00021 import com.google.common.collect.Lists;
00022
00023 import org.apache.commons.logging.Log;
00024 import org.apache.commons.logging.LogFactory;
00025 import org.ros.exception.RosRuntimeException;
00026
00027 import java.util.Collections;
00028 import java.util.List;
00029 import java.util.Queue;
00030 import java.util.concurrent.Callable;
00031
00035 public class RemoteUptimeClock {
00036
00037 private static final boolean DEBUG = false;
00038 private static final Log log = LogFactory.getLog(RemoteUptimeClock.class);
00039
00040 private final LocalUptimeProvider localUptimeProvider;
00041 private final Callable<Double> callable;
00042 private final LatencyOutlierFilter latencyOutlierFilter;
00043
00051 private final double driftSensitivity;
00052
00056 private final double errorReductionCoefficientSensitivity;
00057
00058 private double localUptime;
00059
00064 private double measuredRemoteUptime;
00065
00069 private double predictedRemoteUptime;
00070
00076 private double drift;
00077
00083 private double errorReductionCoefficient;
00084
00089 private final class UptimeCalculationResult {
00090
00091 final double newLocalUptime;
00092 final double newRemoteUptime;
00093 final double latency;
00094
00095 public UptimeCalculationResult(double newLocalUptime, double newRemoteUptime, double latency) {
00096 this.newLocalUptime = newLocalUptime;
00097 this.newRemoteUptime = newRemoteUptime;
00098 this.latency = latency;
00099 }
00100 }
00101
00117 private final class LatencyOutlierFilter {
00118
00119 private final int sampleSize;
00120 private final double threshold;
00121 private final Queue<Double> latencies;
00122
00123 public LatencyOutlierFilter(int sampleSize, double threshold) {
00124 Preconditions.checkArgument(sampleSize > 0);
00125 Preconditions.checkArgument(threshold > 1);
00126 this.threshold = threshold;
00127 this.sampleSize = sampleSize;
00128 latencies = Lists.newLinkedList();
00129 }
00130
00136 public boolean add(double latency) {
00137 latencies.add(latency);
00138 if (latencies.size() > sampleSize) {
00139 latencies.remove();
00140 } else {
00141
00142 return false;
00143 }
00144 double medianLatency = getMedian();
00145 if (latency < medianLatency * threshold) {
00146 return false;
00147 }
00148 return true;
00149 }
00150
00151 public double getMedian() {
00152 List<Double> ordered = Lists.newArrayList(latencies);
00153 Collections.sort(ordered);
00154 return ordered.get(latencies.size() / 2);
00155 }
00156 }
00157
00158 @VisibleForTesting
00159 interface LocalUptimeProvider {
00160 double getSeconds();
00161 }
00162
00180 public static RemoteUptimeClock newDefault(final TimeProvider timeProvider,
00181 Callable<Double> callable, double driftSensitivity,
00182 double errorReductionCoefficientSensitivity, int latencyOutlierFilterSampleSize,
00183 double latencyOutlierFilterThreshold) {
00184 return new RemoteUptimeClock(new LocalUptimeProvider() {
00185 @Override
00186 public double getSeconds() {
00187 return timeProvider.getCurrentTime().toSeconds();
00188 }
00189 }, callable, driftSensitivity, errorReductionCoefficientSensitivity,
00190 latencyOutlierFilterSampleSize, latencyOutlierFilterThreshold);
00191 }
00192
00193 @VisibleForTesting
00194 RemoteUptimeClock(LocalUptimeProvider localUptimeProvider, Callable<Double> callable,
00195 double driftSensitivity, double errorReductionCoefficientSensitivity,
00196 int latencyOutlierFilterSampleSize, double latencyOutlierFilterThreshold) {
00197 Preconditions.checkArgument(driftSensitivity >= 0 && driftSensitivity <= 1);
00198 Preconditions.checkArgument(errorReductionCoefficientSensitivity >= 0
00199 && errorReductionCoefficientSensitivity <= 1);
00200 this.localUptimeProvider = localUptimeProvider;
00201 this.callable = callable;
00202 this.driftSensitivity = driftSensitivity;
00203 this.errorReductionCoefficientSensitivity = errorReductionCoefficientSensitivity;
00204 latencyOutlierFilter =
00205 new LatencyOutlierFilter(latencyOutlierFilterSampleSize, latencyOutlierFilterThreshold);
00206 errorReductionCoefficient = 0;
00207 }
00208
00219 public void calibrate(int sampleSize, double samplingDelayMillis) {
00220 log.info("Starting calibration...");
00221 double remoteUptimeSum = 0;
00222 double localUptimeSum = 0;
00223 double driftSum = 0;
00224 for (int i = 0; i < sampleSize; i++) {
00225 UptimeCalculationResult result = calculateNewUptime(callable);
00226 latencyOutlierFilter.add(result.latency);
00227 if (i > 0) {
00228 double localUptimeDelta = result.newLocalUptime - localUptime;
00229 double remoteUptimeDelta = result.newRemoteUptime - measuredRemoteUptime;
00230 driftSum += calculateDrift(localUptimeDelta, remoteUptimeDelta);
00231 }
00232 measuredRemoteUptime = result.newRemoteUptime;
00233 localUptime = result.newLocalUptime;
00234 remoteUptimeSum += measuredRemoteUptime;
00235 localUptimeSum += localUptime;
00236 try {
00237 Thread.sleep((long) samplingDelayMillis);
00238 } catch (InterruptedException e) {
00239 throw new RosRuntimeException(e);
00240 }
00241 }
00242
00243
00244 drift = driftSum / (sampleSize - 1);
00245
00246
00247
00248 double offset = (drift * remoteUptimeSum - localUptimeSum) / sampleSize;
00249 predictedRemoteUptime = (localUptime + offset) / drift;
00250 log.info(String.format("Calibration complete. Drift: %.4g, Offset: %.4f s", drift, offset));
00251 }
00252
00264 private double calculateDrift(double localUptimeDelta, double remoteUptimeDelta) {
00265 Preconditions.checkState(remoteUptimeDelta > 1e-9);
00266 return localUptimeDelta / remoteUptimeDelta;
00267 }
00268
00278 public void update() {
00279 UptimeCalculationResult result = calculateNewUptime(callable);
00280 double newLocalUptime = result.newLocalUptime;
00281 double newRemoteUptime = result.newRemoteUptime;
00282 double latency = result.latency;
00283
00284 if (latencyOutlierFilter.add(latency)) {
00285 log.warn(String.format(
00286 "Measurement latency marked as outlier. Latency: %.4f s, Median: %.4f s", latency,
00287 latencyOutlierFilter.getMedian()));
00288 return;
00289 }
00290
00291 double localUptimeDelta = newLocalUptime - localUptime;
00292 double remoteUptimeDelta = newRemoteUptime - measuredRemoteUptime;
00293 Preconditions.checkState(localUptimeDelta > 1e-9);
00294 Preconditions.checkState(remoteUptimeDelta > 1e-9);
00295 if (DEBUG) {
00296 log.info(String.format("localUptimeDelta: %.4g, remoteUptimeDelta: %.4g", localUptimeDelta,
00297 remoteUptimeDelta));
00298 }
00299
00300 double newDrift =
00301 driftSensitivity * (localUptimeDelta / remoteUptimeDelta) + (1 - driftSensitivity) * drift;
00302
00303
00304
00305 double newPredictedRemoteUptime =
00306 predictedRemoteUptime + (localUptimeDelta / (drift + errorReductionCoefficient));
00307 double nextPredictedRemoteUptime = newRemoteUptime + remoteUptimeDelta;
00308 double newCombinedDriftAndError =
00309 localUptimeDelta / (nextPredictedRemoteUptime - newPredictedRemoteUptime);
00310 double newErrorReductionCoefficient =
00311 errorReductionCoefficientSensitivity * (newCombinedDriftAndError - newDrift);
00312 double deltaRatio = remoteUptimeDelta / localUptimeDelta;
00313 double error = newLocalUptime - toLocalUptime(newRemoteUptime);
00314 log.info(String.format("Latency: %.4f s, Delta ratio: %.4f, Drift: %.4g, "
00315 + "Error reduction coefficient: %.4g, Error: %.4f s", latency, deltaRatio, newDrift,
00316 newErrorReductionCoefficient, error));
00317
00318 measuredRemoteUptime = newRemoteUptime;
00319 predictedRemoteUptime = newPredictedRemoteUptime;
00320 localUptime = newLocalUptime;
00321 drift = newDrift;
00322 errorReductionCoefficient = newErrorReductionCoefficient;
00323 }
00324
00333 private UptimeCalculationResult calculateNewUptime(Callable<Double> callable) {
00334 double newLocalUptime = localUptimeProvider.getSeconds();
00335 double newRemoteUptime;
00336 try {
00337 newRemoteUptime = callable.call();
00338 } catch (Exception e) {
00339 log.error(e);
00340 throw new RosRuntimeException(e);
00341 }
00342 double latency = localUptimeProvider.getSeconds() - newLocalUptime;
00343 double latencyOffset = latency / 2;
00344 newLocalUptime += latencyOffset;
00345 return new UptimeCalculationResult(newLocalUptime, newRemoteUptime, latency);
00346 }
00347
00355 public double toLocalUptime(double remoteUptime) {
00356 double localOffset =
00357 (drift + errorReductionCoefficient) * (remoteUptime - predictedRemoteUptime);
00358 return localUptime + localOffset;
00359 }
00360
00361 @VisibleForTesting
00362 double getDrift() {
00363 return drift;
00364 }
00365
00366 @VisibleForTesting
00367 double getErrorReductionCoefficient() {
00368 return errorReductionCoefficient;
00369 }
00370 }