00001 /* 00002 * Copyright (C) 2011 Google Inc. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 00005 * use this file except in compliance with the License. You may obtain a copy of 00006 * the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 00012 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 00013 * License for the specific language governing permissions and limitations under 00014 * the License. 00015 */ 00016 00017 package org.ros.concurrent; 00018 00019 import com.google.common.collect.Maps; 00020 00021 import org.apache.commons.logging.Log; 00022 import org.apache.commons.logging.LogFactory; 00023 import org.ros.exception.RosRuntimeException; 00024 00025 import java.util.Map; 00026 import java.util.concurrent.Callable; 00027 import java.util.concurrent.CompletionService; 00028 import java.util.concurrent.CountDownLatch; 00029 import java.util.concurrent.ExecutionException; 00030 import java.util.concurrent.ExecutorCompletionService; 00031 import java.util.concurrent.ExecutorService; 00032 import java.util.concurrent.Future; 00033 import java.util.concurrent.RejectedExecutionException; 00034 import java.util.concurrent.ScheduledExecutorService; 00035 import java.util.concurrent.TimeUnit; 00036 00043 public class RetryingExecutorService { 00044 00045 private static final boolean DEBUG = false; 00046 private static final Log log = LogFactory.getLog(RetryingExecutorService.class); 00047 00048 private static final long DEFAULT_RETRY_DELAY = 5; 00049 private static final TimeUnit DEFAULT_RETRY_TIME_UNIT = TimeUnit.SECONDS; 00050 00051 private final ScheduledExecutorService scheduledExecutorService; 00052 private final RetryLoop retryLoop; 00053 private final Map<Callable<Boolean>, CountDownLatch> latches; 00054 private final Map<Future<Boolean>, Callable<Boolean>> callables; 00055 private final CompletionService<Boolean> completionService; 00056 private final Object mutex; 00057 00058 private long retryDelay; 00059 private TimeUnit retryTimeUnit; 00060 private boolean running; 00061 00062 private class RetryLoop extends CancellableLoop { 00063 @Override 00064 public void loop() throws InterruptedException { 00065 Future<Boolean> future = completionService.take(); 00066 final Callable<Boolean> callable = callables.remove(future); 00067 boolean retry; 00068 try { 00069 retry = future.get(); 00070 } catch (ExecutionException e) { 00071 throw new RosRuntimeException(e.getCause()); 00072 } 00073 if (retry) { 00074 if (DEBUG) { 00075 log.info("Retry requested."); 00076 } 00077 scheduledExecutorService.schedule(new Runnable() { 00078 @Override 00079 public void run() { 00080 submit(callable); 00081 } 00082 }, retryDelay, retryTimeUnit); 00083 } else { 00084 latches.get(callable).countDown(); 00085 } 00086 } 00087 } 00088 00093 public RetryingExecutorService(ScheduledExecutorService scheduledExecutorService) { 00094 this.scheduledExecutorService = scheduledExecutorService; 00095 retryLoop = new RetryLoop(); 00096 latches = Maps.newConcurrentMap(); 00097 callables = Maps.newConcurrentMap(); 00098 completionService = new ExecutorCompletionService<Boolean>(scheduledExecutorService); 00099 mutex = new Object(); 00100 retryDelay = DEFAULT_RETRY_DELAY; 00101 retryTimeUnit = DEFAULT_RETRY_TIME_UNIT; 00102 running = true; 00103 // TODO(damonkohler): Unify this with the passed in ExecutorService. 00104 scheduledExecutorService.execute(retryLoop); 00105 } 00106 00117 public void submit(Callable<Boolean> callable) { 00118 synchronized (mutex) { 00119 if (running) { 00120 Future<Boolean> future = completionService.submit(callable); 00121 latches.put(callable, new CountDownLatch(1)); 00122 callables.put(future, callable); 00123 } else { 00124 throw new RejectedExecutionException(); 00125 } 00126 } 00127 } 00128 00135 public void setRetryDelay(long delay, TimeUnit unit) { 00136 retryDelay = delay; 00137 retryTimeUnit = unit; 00138 } 00139 00150 public void shutdown(long timeout, TimeUnit unit) throws InterruptedException { 00151 running = false; 00152 for (CountDownLatch latch : latches.values()) { 00153 latch.await(timeout, unit); 00154 } 00155 retryLoop.cancel(); 00156 } 00157 }