RemoteUptimeClock.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.time;
00018 
00019 import com.google.common.annotations.VisibleForTesting;
00020 import com.google.common.base.Preconditions;
00021 import com.google.common.collect.Lists;
00022 
00023 import org.apache.commons.logging.Log;
00024 import org.apache.commons.logging.LogFactory;
00025 import org.ros.exception.RosRuntimeException;
00026 
00027 import java.util.Collections;
00028 import java.util.List;
00029 import java.util.Queue;
00030 import java.util.concurrent.Callable;
00031 
00035 public class RemoteUptimeClock {
00036 
00037   private static final boolean DEBUG = false;
00038   private static final Log log = LogFactory.getLog(RemoteUptimeClock.class);
00039 
00040   private final LocalUptimeProvider localUptimeProvider;
00041   private final Callable<Double> callable;
00042   private final LatencyOutlierFilter latencyOutlierFilter;
00043 
00051   private final double driftSensitivity;
00052 
00056   private final double errorReductionCoefficientSensitivity;
00057 
00058   private double localUptime;
00059 
00064   private double measuredRemoteUptime;
00065 
00069   private double predictedRemoteUptime;
00070 
00076   private double drift;
00077 
00083   private double errorReductionCoefficient;
00084 
00089   private final class UptimeCalculationResult {
00090 
00091     final double newLocalUptime;
00092     final double newRemoteUptime;
00093     final double latency;
00094 
00095     public UptimeCalculationResult(double newLocalUptime, double newRemoteUptime, double latency) {
00096       this.newLocalUptime = newLocalUptime;
00097       this.newRemoteUptime = newRemoteUptime;
00098       this.latency = latency;
00099     }
00100   }
00101 
00117   private final class LatencyOutlierFilter {
00118 
00119     private final int sampleSize;
00120     private final double threshold;
00121     private final Queue<Double> latencies;
00122 
00123     public LatencyOutlierFilter(int sampleSize, double threshold) {
00124       Preconditions.checkArgument(sampleSize > 0);
00125       Preconditions.checkArgument(threshold > 1);
00126       this.threshold = threshold;
00127       this.sampleSize = sampleSize;
00128       latencies = Lists.newLinkedList();
00129     }
00130 
00136     public boolean add(double latency) {
00137       latencies.add(latency);
00138       if (latencies.size() > sampleSize) {
00139         latencies.remove();
00140       } else {
00141         // Until the sliding window is full, we cannot reliably detect outliers.
00142         return false;
00143       }
00144       double medianLatency = getMedian();
00145       if (latency < medianLatency * threshold) {
00146         return false;
00147       }
00148       return true;
00149     }
00150 
00151     public double getMedian() {
00152       List<Double> ordered = Lists.newArrayList(latencies);
00153       Collections.sort(ordered);
00154       return ordered.get(latencies.size() / 2);
00155     }
00156   }
00157 
00158   @VisibleForTesting
00159   interface LocalUptimeProvider {
00160     double getSeconds();
00161   }
00162 
00180   public static RemoteUptimeClock newDefault(final TimeProvider timeProvider,
00181       Callable<Double> callable, double driftSensitivity,
00182       double errorReductionCoefficientSensitivity, int latencyOutlierFilterSampleSize,
00183       double latencyOutlierFilterThreshold) {
00184     return new RemoteUptimeClock(new LocalUptimeProvider() {
00185       @Override
00186       public double getSeconds() {
00187         return timeProvider.getCurrentTime().toSeconds();
00188       }
00189     }, callable, driftSensitivity, errorReductionCoefficientSensitivity,
00190         latencyOutlierFilterSampleSize, latencyOutlierFilterThreshold);
00191   }
00192 
00193   @VisibleForTesting
00194   RemoteUptimeClock(LocalUptimeProvider localUptimeProvider, Callable<Double> callable,
00195       double driftSensitivity, double errorReductionCoefficientSensitivity,
00196       int latencyOutlierFilterSampleSize, double latencyOutlierFilterThreshold) {
00197     Preconditions.checkArgument(driftSensitivity >= 0 && driftSensitivity <= 1);
00198     Preconditions.checkArgument(errorReductionCoefficientSensitivity >= 0
00199         && errorReductionCoefficientSensitivity <= 1);
00200     this.localUptimeProvider = localUptimeProvider;
00201     this.callable = callable;
00202     this.driftSensitivity = driftSensitivity;
00203     this.errorReductionCoefficientSensitivity = errorReductionCoefficientSensitivity;
00204     latencyOutlierFilter =
00205         new LatencyOutlierFilter(latencyOutlierFilterSampleSize, latencyOutlierFilterThreshold);
00206     errorReductionCoefficient = 0;
00207   }
00208 
00219   public void calibrate(int sampleSize, double samplingDelayMillis) {
00220     log.info("Starting calibration...");
00221     double remoteUptimeSum = 0;
00222     double localUptimeSum = 0;
00223     double driftSum = 0;
00224     for (int i = 0; i < sampleSize; i++) {
00225       UptimeCalculationResult result = calculateNewUptime(callable);
00226       latencyOutlierFilter.add(result.latency);
00227       if (i > 0) {
00228         double localUptimeDelta = result.newLocalUptime - localUptime;
00229         double remoteUptimeDelta = result.newRemoteUptime - measuredRemoteUptime;
00230         driftSum += calculateDrift(localUptimeDelta, remoteUptimeDelta);
00231       }
00232       measuredRemoteUptime = result.newRemoteUptime;
00233       localUptime = result.newLocalUptime;
00234       remoteUptimeSum += measuredRemoteUptime;
00235       localUptimeSum += localUptime;
00236       try {
00237         Thread.sleep((long) samplingDelayMillis);
00238       } catch (InterruptedException e) {
00239         throw new RosRuntimeException(e);
00240       }
00241     }
00242     // We have n samples, but n - 1 intervals. errorReductionCoefficient is the
00243     // average interval magnitude.
00244     drift = driftSum / (sampleSize - 1);
00245     // If localUptime == -offset then measuredRemoteUptime == 0 (e.g. if
00246     // localUptime is 10s and measuredRemoteUptime is 5s, then offset should be
00247     // -5s since the localUptime started 5s earlier than measuredRemoteUptime).
00248     double offset = (drift * remoteUptimeSum - localUptimeSum) / sampleSize;
00249     predictedRemoteUptime = (localUptime + offset) / drift;
00250     log.info(String.format("Calibration complete. Drift: %.4g, Offset: %.4f s", drift, offset));
00251   }
00252 
00264   private double calculateDrift(double localUptimeDelta, double remoteUptimeDelta) {
00265     Preconditions.checkState(remoteUptimeDelta > 1e-9);
00266     return localUptimeDelta / remoteUptimeDelta;
00267   }
00268 
00278   public void update() {
00279     UptimeCalculationResult result = calculateNewUptime(callable);
00280     double newLocalUptime = result.newLocalUptime;
00281     double newRemoteUptime = result.newRemoteUptime;
00282     double latency = result.latency;
00283 
00284     if (latencyOutlierFilter.add(latency)) {
00285       log.warn(String.format(
00286           "Measurement latency marked as outlier. Latency: %.4f s, Median: %.4f s", latency,
00287           latencyOutlierFilter.getMedian()));
00288       return;
00289     }
00290 
00291     double localUptimeDelta = newLocalUptime - localUptime;
00292     double remoteUptimeDelta = newRemoteUptime - measuredRemoteUptime;
00293     Preconditions.checkState(localUptimeDelta > 1e-9);
00294     Preconditions.checkState(remoteUptimeDelta > 1e-9);
00295     if (DEBUG) {
00296       log.info(String.format("localUptimeDelta: %.4g, remoteUptimeDelta: %.4g", localUptimeDelta,
00297           remoteUptimeDelta));
00298     }
00299 
00300     double newDrift =
00301         driftSensitivity * (localUptimeDelta / remoteUptimeDelta) + (1 - driftSensitivity) * drift;
00302     // Non-jumping behavior from (localUptime, predictedRemoteUptime) to
00303     // (newLocalUptime, newAdjustedRemoteUptime). Note that it does not depend
00304     // directly on measuredRemoteUptime or newRemoteUptime.
00305     double newPredictedRemoteUptime =
00306         predictedRemoteUptime + (localUptimeDelta / (drift + errorReductionCoefficient));
00307     double nextPredictedRemoteUptime = newRemoteUptime + remoteUptimeDelta;
00308     double newCombinedDriftAndError =
00309         localUptimeDelta / (nextPredictedRemoteUptime - newPredictedRemoteUptime);
00310     double newErrorReductionCoefficient =
00311         errorReductionCoefficientSensitivity * (newCombinedDriftAndError - newDrift);
00312     double deltaRatio = remoteUptimeDelta / localUptimeDelta;
00313     double error = newLocalUptime - toLocalUptime(newRemoteUptime);
00314     log.info(String.format("Latency: %.4f s, Delta ratio: %.4f, Drift: %.4g, "
00315         + "Error reduction coefficient: %.4g, Error: %.4f s", latency, deltaRatio, newDrift,
00316         newErrorReductionCoefficient, error));
00317 
00318     measuredRemoteUptime = newRemoteUptime;
00319     predictedRemoteUptime = newPredictedRemoteUptime;
00320     localUptime = newLocalUptime;
00321     drift = newDrift;
00322     errorReductionCoefficient = newErrorReductionCoefficient;
00323   }
00324 
00333   private UptimeCalculationResult calculateNewUptime(Callable<Double> callable) {
00334     double newLocalUptime = localUptimeProvider.getSeconds();
00335     double newRemoteUptime;
00336     try {
00337       newRemoteUptime = callable.call();
00338     } catch (Exception e) {
00339       log.error(e);
00340       throw new RosRuntimeException(e);
00341     }
00342     double latency = localUptimeProvider.getSeconds() - newLocalUptime;
00343     double latencyOffset = latency / 2;
00344     newLocalUptime += latencyOffset;
00345     return new UptimeCalculationResult(newLocalUptime, newRemoteUptime, latency);
00346   }
00347 
00355   public double toLocalUptime(double remoteUptime) {
00356     double localOffset =
00357         (drift + errorReductionCoefficient) * (remoteUptime - predictedRemoteUptime);
00358     return localUptime + localOffset;
00359   }
00360 
00361   @VisibleForTesting
00362   double getDrift() {
00363     return drift;
00364   }
00365 
00366   @VisibleForTesting
00367   double getErrorReductionCoefficient() {
00368     return errorReductionCoefficient;
00369   }
00370 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49