Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.transport.queue;
00018
00019 import org.ros.concurrent.CircularBlockingDeque;
00020 import org.ros.internal.transport.tcp.NamedChannelHandler;
00021 import org.ros.message.MessageDeserializer;
00022 import org.ros.message.MessageListener;
00023
00024 import java.util.concurrent.ExecutorService;
00025
00029 public class IncomingMessageQueue<T> {
00030
00040 private static final int DEQUE_CAPACITY = 16;
00041
00042 private final MessageReceiver<T> messageReceiver;
00043 private final MessageDispatcher<T> messageDispatcher;
00044
00045 public IncomingMessageQueue(MessageDeserializer<T> deserializer, ExecutorService executorService) {
00046 CircularBlockingDeque<LazyMessage<T>> lazyMessages =
00047 new CircularBlockingDeque<LazyMessage<T>>(DEQUE_CAPACITY);
00048 messageReceiver = new MessageReceiver<T>(lazyMessages, deserializer);
00049 messageDispatcher = new MessageDispatcher<T>(lazyMessages, executorService);
00050 executorService.execute(messageDispatcher);
00051 }
00052
00056 public void setLatchMode(boolean enabled) {
00057 messageDispatcher.setLatchMode(enabled);
00058 }
00059
00063 public boolean getLatchMode() {
00064 return messageDispatcher.getLatchMode();
00065 }
00066
00070 public void addListener(final MessageListener<T> messageListener, int queueCapacity) {
00071 messageDispatcher.addListener(messageListener, queueCapacity);
00072 }
00073
00074 public void shutdown() {
00075 messageDispatcher.cancel();
00076 }
00077
00082 public NamedChannelHandler getMessageReceiver() {
00083 return messageReceiver;
00084 }
00085 }