Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.concurrent;
00018
00019 import com.google.common.collect.Lists;
00020
00021 import java.util.Collection;
00022 import java.util.concurrent.CountDownLatch;
00023 import java.util.concurrent.ExecutorService;
00024 import java.util.concurrent.TimeUnit;
00025
00031 public class ListenerGroup<T> {
00032
00033 private final static int DEFAULT_QUEUE_CAPACITY = 128;
00034
00035 private final ExecutorService executorService;
00036 private final Collection<EventDispatcher<T>> eventDispatchers;
00037
00038 public ListenerGroup(ExecutorService executorService) {
00039 this.executorService = executorService;
00040 eventDispatchers = Lists.newCopyOnWriteArrayList();
00041 }
00042
00053 public EventDispatcher<T> add(T listener, int queueCapacity) {
00054 EventDispatcher<T> eventDispatcher = new EventDispatcher<T>(listener, queueCapacity);
00055 eventDispatchers.add(eventDispatcher);
00056 executorService.execute(eventDispatcher);
00057 return eventDispatcher;
00058 }
00059
00069 public EventDispatcher<T> add(T listener) {
00070 return add(listener, DEFAULT_QUEUE_CAPACITY);
00071 }
00072
00083 public Collection<EventDispatcher<T>> addAll(Collection<T> listeners, int limit) {
00084 Collection<EventDispatcher<T>> eventDispatchers = Lists.newArrayList();
00085 for (T listener : listeners) {
00086 eventDispatchers.add(add(listener, limit));
00087 }
00088 return eventDispatchers;
00089 }
00090
00100 public Collection<EventDispatcher<T>> addAll(Collection<T> listeners) {
00101 return addAll(listeners, DEFAULT_QUEUE_CAPACITY);
00102 }
00103
00107 public int size() {
00108 return eventDispatchers.size();
00109 }
00110
00116 public void signal(SignalRunnable<T> signalRunnable) {
00117 for (EventDispatcher<T> eventDispatcher : eventDispatchers) {
00118 eventDispatcher.signal(signalRunnable);
00119 }
00120 }
00121
00134 public boolean signal(final SignalRunnable<T> signalRunnable, long timeout, TimeUnit unit)
00135 throws InterruptedException {
00136 Collection<EventDispatcher<T>> copy = Lists.newArrayList(eventDispatchers);
00137 final CountDownLatch latch = new CountDownLatch(copy.size());
00138 for (EventDispatcher<T> eventDispatcher : copy) {
00139 eventDispatcher.signal(new SignalRunnable<T>() {
00140 @Override
00141 public void run(T listener) {
00142 signalRunnable.run(listener);
00143 latch.countDown();
00144 }
00145 });
00146 }
00147 return latch.await(timeout, unit);
00148 }
00149
00150 public void shutdown() {
00151 for (EventDispatcher<T> eventDispatcher : eventDispatchers) {
00152 eventDispatcher.cancel();
00153 }
00154 }
00155 }