RetryingExecutorService.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.concurrent;
00018 
00019 import com.google.common.collect.Maps;
00020 
00021 import org.apache.commons.logging.Log;
00022 import org.apache.commons.logging.LogFactory;
00023 import org.ros.exception.RosRuntimeException;
00024 
00025 import java.util.Map;
00026 import java.util.concurrent.Callable;
00027 import java.util.concurrent.CompletionService;
00028 import java.util.concurrent.CountDownLatch;
00029 import java.util.concurrent.ExecutionException;
00030 import java.util.concurrent.ExecutorCompletionService;
00031 import java.util.concurrent.ExecutorService;
00032 import java.util.concurrent.Future;
00033 import java.util.concurrent.RejectedExecutionException;
00034 import java.util.concurrent.ScheduledExecutorService;
00035 import java.util.concurrent.TimeUnit;
00036 
00043 public class RetryingExecutorService {
00044 
00045   private static final boolean DEBUG = false;
00046   private static final Log log = LogFactory.getLog(RetryingExecutorService.class);
00047 
00048   private static final long DEFAULT_RETRY_DELAY = 5;
00049   private static final TimeUnit DEFAULT_RETRY_TIME_UNIT = TimeUnit.SECONDS;
00050 
00051   private final ScheduledExecutorService scheduledExecutorService;
00052   private final RetryLoop retryLoop;
00053   private final Map<Callable<Boolean>, CountDownLatch> latches;
00054   private final Map<Future<Boolean>, Callable<Boolean>> callables;
00055   private final CompletionService<Boolean> completionService;
00056   private final Object mutex;
00057 
00058   private long retryDelay;
00059   private TimeUnit retryTimeUnit;
00060   private boolean running;
00061 
00062   private class RetryLoop extends CancellableLoop {
00063     @Override
00064     public void loop() throws InterruptedException {
00065       Future<Boolean> future = completionService.take();
00066       final Callable<Boolean> callable = callables.remove(future);
00067       boolean retry;
00068       try {
00069         retry = future.get();
00070       } catch (ExecutionException e) {
00071         throw new RosRuntimeException(e.getCause());
00072       }
00073       if (retry) {
00074         if (DEBUG) {
00075           log.info("Retry requested.");
00076         }
00077         scheduledExecutorService.schedule(new Runnable() {
00078           @Override
00079           public void run() {
00080             submit(callable);
00081           }
00082         }, retryDelay, retryTimeUnit);
00083       } else {
00084         latches.get(callable).countDown();
00085       }
00086     }
00087   }
00088 
00093   public RetryingExecutorService(ScheduledExecutorService scheduledExecutorService) {
00094     this.scheduledExecutorService = scheduledExecutorService;
00095     retryLoop = new RetryLoop();
00096     latches = Maps.newConcurrentMap();
00097     callables = Maps.newConcurrentMap();
00098     completionService = new ExecutorCompletionService<Boolean>(scheduledExecutorService);
00099     mutex = new Object();
00100     retryDelay = DEFAULT_RETRY_DELAY;
00101     retryTimeUnit = DEFAULT_RETRY_TIME_UNIT;
00102     running = true;
00103     // TODO(damonkohler): Unify this with the passed in ExecutorService.
00104     scheduledExecutorService.execute(retryLoop);
00105   }
00106 
00117   public void submit(Callable<Boolean> callable) {
00118     synchronized (mutex) {
00119       if (running) {
00120         Future<Boolean> future = completionService.submit(callable);
00121         latches.put(callable, new CountDownLatch(1));
00122         callables.put(future, callable);
00123       } else {
00124         throw new RejectedExecutionException();
00125       }
00126     }
00127   }
00128 
00135   public void setRetryDelay(long delay, TimeUnit unit) {
00136     retryDelay = delay;
00137     retryTimeUnit = unit;
00138   }
00139 
00150   public void shutdown(long timeout, TimeUnit unit) throws InterruptedException {
00151     running = false;
00152     for (CountDownLatch latch : latches.values()) {
00153       latch.await(timeout, unit);
00154     }
00155     retryLoop.cancel();
00156   }
00157 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49