Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.transport.queue;
00018
00019 import org.apache.commons.logging.Log;
00020 import org.apache.commons.logging.LogFactory;
00021 import org.ros.concurrent.CancellableLoop;
00022 import org.ros.concurrent.CircularBlockingDeque;
00023 import org.ros.concurrent.EventDispatcher;
00024 import org.ros.concurrent.ListenerGroup;
00025 import org.ros.concurrent.SignalRunnable;
00026 import org.ros.message.MessageListener;
00027
00028 import java.util.concurrent.ExecutorService;
00029
00036 public class MessageDispatcher<T> extends CancellableLoop {
00037
00038 private static final boolean DEBUG = false;
00039 private static final Log log = LogFactory.getLog(MessageDispatcher.class);
00040
00041 private final CircularBlockingDeque<LazyMessage<T>> lazyMessages;
00042 private final ListenerGroup<MessageListener<T>> messageListeners;
00043
00048 private final Object mutex;
00049
00050 private boolean latchMode;
00051 private LazyMessage<T> latchedMessage;
00052
00053 public MessageDispatcher(CircularBlockingDeque<LazyMessage<T>> lazyMessages,
00054 ExecutorService executorService) {
00055 this.lazyMessages = lazyMessages;
00056 messageListeners = new ListenerGroup<MessageListener<T>>(executorService);
00057 mutex = new Object();
00058 latchMode = false;
00059 }
00060
00069 public void addListener(MessageListener<T> messageListener, int limit) {
00070 if (DEBUG) {
00071 log.info("Adding listener.");
00072 }
00073 synchronized (mutex) {
00074 EventDispatcher<MessageListener<T>> eventDispatcher =
00075 messageListeners.add(messageListener, limit);
00076 if (latchMode && latchedMessage != null) {
00077 eventDispatcher.signal(newSignalRunnable(latchedMessage));
00078 }
00079 }
00080 }
00081
00090 private SignalRunnable<MessageListener<T>> newSignalRunnable(final LazyMessage<T> lazyMessage) {
00091 return new SignalRunnable<MessageListener<T>>() {
00092 @Override
00093 public void run(MessageListener<T> messageListener) {
00094 messageListener.onNewMessage(lazyMessage.get());
00095 }
00096 };
00097 }
00098
00104 public void setLatchMode(boolean enabled) {
00105 latchMode = enabled;
00106 }
00107
00111 public boolean getLatchMode() {
00112 return latchMode;
00113 }
00114
00115 @Override
00116 public void loop() throws InterruptedException {
00117 LazyMessage<T> lazyMessage = lazyMessages.takeFirst();
00118 synchronized (mutex) {
00119 latchedMessage = lazyMessage;
00120 if (DEBUG) {
00121 log.info("Dispatching message: " + latchedMessage.get());
00122 }
00123 messageListeners.signal(newSignalRunnable(latchedMessage));
00124 }
00125 }
00126
00127 @Override
00128 protected void handleInterruptedException(InterruptedException e) {
00129 messageListeners.shutdown();
00130 }
00131 }