Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.transport.queue;
00018
00019 import com.google.common.annotations.VisibleForTesting;
00020
00021 import org.apache.commons.logging.Log;
00022 import org.apache.commons.logging.LogFactory;
00023 import org.jboss.netty.buffer.ChannelBuffer;
00024 import org.jboss.netty.channel.Channel;
00025 import org.jboss.netty.channel.group.ChannelGroup;
00026 import org.jboss.netty.channel.group.ChannelGroupFuture;
00027 import org.jboss.netty.channel.group.ChannelGroupFutureListener;
00028 import org.jboss.netty.channel.group.DefaultChannelGroup;
00029 import org.ros.concurrent.CancellableLoop;
00030 import org.ros.concurrent.CircularBlockingDeque;
00031 import org.ros.internal.message.MessageBufferPool;
00032 import org.ros.internal.message.MessageBuffers;
00033 import org.ros.message.MessageSerializer;
00034
00035 import java.util.concurrent.ExecutorService;
00036
00040 public class OutgoingMessageQueue<T> {
00041
00042 private static final boolean DEBUG = false;
00043 private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class);
00044
00045 private static final int DEQUE_CAPACITY = 16;
00046
00047 private final MessageSerializer<T> serializer;
00048 private final CircularBlockingDeque<T> deque;
00049 private final ChannelGroup channelGroup;
00050 private final Writer writer;
00051 private final MessageBufferPool messageBufferPool;
00052 private final ChannelBuffer latchedBuffer;
00053 private final Object mutex;
00054
00055 private boolean latchMode;
00056 private T latchedMessage;
00057
00058 private final class Writer extends CancellableLoop {
00059 @Override
00060 public void loop() throws InterruptedException {
00061 T message = deque.takeFirst();
00062 final ChannelBuffer buffer = messageBufferPool.acquire();
00063 serializer.serialize(message, buffer);
00064 if (DEBUG) {
00065 log.info(String.format("Writing %d bytes to %d channels.", buffer.readableBytes(),
00066 channelGroup.size()));
00067 }
00068
00069
00070
00071
00072 channelGroup.write(buffer).addListener(new ChannelGroupFutureListener() {
00073 @Override
00074 public void operationComplete(ChannelGroupFuture future) throws Exception {
00075 messageBufferPool.release(buffer);
00076 }
00077 });
00078 }
00079 }
00080
00081 public OutgoingMessageQueue(MessageSerializer<T> serializer, ExecutorService executorService) {
00082 this.serializer = serializer;
00083 deque = new CircularBlockingDeque<T>(DEQUE_CAPACITY);
00084 channelGroup = new DefaultChannelGroup();
00085 writer = new Writer();
00086 messageBufferPool = new MessageBufferPool();
00087 latchedBuffer = MessageBuffers.dynamicBuffer();
00088 mutex = new Object();
00089 latchMode = false;
00090 executorService.execute(writer);
00091 }
00092
00093 public void setLatchMode(boolean enabled) {
00094 latchMode = enabled;
00095 }
00096
00097 public boolean getLatchMode() {
00098 return latchMode;
00099 }
00100
00105 public void add(T message) {
00106 deque.addLast(message);
00107 setLatchedMessage(message);
00108 }
00109
00110 private void setLatchedMessage(T message) {
00111 synchronized (mutex) {
00112 latchedMessage = message;
00113 }
00114 }
00115
00119 public void shutdown() {
00120 writer.cancel();
00121 channelGroup.close().awaitUninterruptibly();
00122 }
00123
00128 public void addChannel(Channel channel) {
00129 if (!writer.isRunning()) {
00130 log.warn("Failed to add channel. Cannot add channels after shutdown.");
00131 return;
00132 }
00133 if (latchMode && latchedMessage != null) {
00134 writeLatchedMessage(channel);
00135 }
00136 channelGroup.add(channel);
00137 }
00138
00139
00140
00141 private void writeLatchedMessage(Channel channel) {
00142 synchronized (mutex) {
00143 latchedBuffer.clear();
00144 serializer.serialize(latchedMessage, latchedBuffer);
00145 channel.write(latchedBuffer);
00146 }
00147 }
00148
00152 public int getNumberOfChannels() {
00153 return channelGroup.size();
00154 }
00155
00156 @VisibleForTesting
00157 public ChannelGroup getChannelGroup() {
00158 return channelGroup;
00159 }
00160 }