ListenerGroup.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2012 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.concurrent;
00018 
00019 import com.google.common.collect.Lists;
00020 
00021 import java.util.Collection;
00022 import java.util.concurrent.CountDownLatch;
00023 import java.util.concurrent.ExecutorService;
00024 import java.util.concurrent.TimeUnit;
00025 
00031 public class ListenerGroup<T> {
00032 
00033   private final static int DEFAULT_QUEUE_CAPACITY = 128;
00034 
00035   private final ExecutorService executorService;
00036   private final Collection<EventDispatcher<T>> eventDispatchers;
00037 
00038   public ListenerGroup(ExecutorService executorService) {
00039     this.executorService = executorService;
00040     eventDispatchers = Lists.newCopyOnWriteArrayList();
00041   }
00042 
00053   public EventDispatcher<T> add(T listener, int queueCapacity) {
00054     EventDispatcher<T> eventDispatcher = new EventDispatcher<T>(listener, queueCapacity);
00055     eventDispatchers.add(eventDispatcher);
00056     executorService.execute(eventDispatcher);
00057     return eventDispatcher;
00058   }
00059 
00069   public EventDispatcher<T> add(T listener) {
00070     return add(listener, DEFAULT_QUEUE_CAPACITY);
00071   }
00072 
00083   public Collection<EventDispatcher<T>> addAll(Collection<T> listeners, int limit) {
00084     Collection<EventDispatcher<T>> eventDispatchers = Lists.newArrayList();
00085     for (T listener : listeners) {
00086       eventDispatchers.add(add(listener, limit));
00087     }
00088     return eventDispatchers;
00089   }
00090 
00100   public Collection<EventDispatcher<T>> addAll(Collection<T> listeners) {
00101     return addAll(listeners, DEFAULT_QUEUE_CAPACITY);
00102   }
00103 
00107   public int size() {
00108     return eventDispatchers.size();
00109   }
00110 
00116   public void signal(SignalRunnable<T> signalRunnable) {
00117     for (EventDispatcher<T> eventDispatcher : eventDispatchers) {
00118       eventDispatcher.signal(signalRunnable);
00119     }
00120   }
00121 
00134   public boolean signal(final SignalRunnable<T> signalRunnable, long timeout, TimeUnit unit)
00135       throws InterruptedException {
00136     Collection<EventDispatcher<T>> copy = Lists.newArrayList(eventDispatchers);
00137     final CountDownLatch latch = new CountDownLatch(copy.size());
00138     for (EventDispatcher<T> eventDispatcher : copy) {
00139       eventDispatcher.signal(new SignalRunnable<T>() {
00140         @Override
00141         public void run(T listener) {
00142           signalRunnable.run(listener);
00143           latch.countDown();
00144         }
00145       });
00146     }
00147     return latch.await(timeout, unit);
00148   }
00149 
00150   public void shutdown() {
00151     for (EventDispatcher<T> eventDispatcher : eventDispatchers) {
00152       eventDispatcher.cancel();
00153     }
00154   }
00155 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49