Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.transport;
00018
00019 import static org.junit.Assert.assertEquals;
00020 import static org.junit.Assert.assertTrue;
00021
00022 import org.apache.commons.logging.Log;
00023 import org.apache.commons.logging.LogFactory;
00024 import org.jboss.netty.bootstrap.ServerBootstrap;
00025 import org.jboss.netty.buffer.HeapChannelBufferFactory;
00026 import org.jboss.netty.channel.Channel;
00027 import org.jboss.netty.channel.ChannelHandlerContext;
00028 import org.jboss.netty.channel.ChannelPipeline;
00029 import org.jboss.netty.channel.ChannelStateEvent;
00030 import org.jboss.netty.channel.ExceptionEvent;
00031 import org.jboss.netty.channel.SimpleChannelHandler;
00032 import org.jboss.netty.channel.group.ChannelGroup;
00033 import org.jboss.netty.channel.group.DefaultChannelGroup;
00034 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
00035 import org.junit.After;
00036 import org.junit.Before;
00037 import org.junit.Test;
00038 import org.ros.concurrent.CancellableLoop;
00039 import org.ros.internal.message.DefaultMessageDeserializer;
00040 import org.ros.internal.message.DefaultMessageSerializer;
00041 import org.ros.internal.message.Message;
00042 import org.ros.internal.message.definition.MessageDefinitionReflectionProvider;
00043 import org.ros.internal.message.topic.TopicMessageFactory;
00044 import org.ros.internal.node.service.ServiceManager;
00045 import org.ros.internal.node.topic.TopicParticipantManager;
00046 import org.ros.internal.transport.queue.IncomingMessageQueue;
00047 import org.ros.internal.transport.queue.OutgoingMessageQueue;
00048 import org.ros.internal.transport.tcp.TcpClient;
00049 import org.ros.internal.transport.tcp.TcpClientManager;
00050 import org.ros.internal.transport.tcp.TcpServerPipelineFactory;
00051 import org.ros.message.MessageDefinitionProvider;
00052 import org.ros.message.MessageIdentifier;
00053 import org.ros.message.MessageListener;
00054
00055 import java.net.InetSocketAddress;
00056 import java.nio.ByteOrder;
00057 import java.util.concurrent.CountDownLatch;
00058 import java.util.concurrent.ExecutorService;
00059 import java.util.concurrent.Executors;
00060 import java.util.concurrent.TimeUnit;
00061
00065 public class MessageQueueIntegrationTest {
00066
00067 private static final boolean DEBUG = false;
00068 private static final Log log = LogFactory.getLog(MessageQueueIntegrationTest.class);
00069
00070 private static final int QUEUE_CAPACITY = 128;
00071
00072 private ExecutorService executorService;
00073 private TcpClientManager firstTcpClientManager;
00074 private TcpClientManager secondTcpClientManager;
00075 private OutgoingMessageQueue<Message> outgoingMessageQueue;
00076 private IncomingMessageQueue<std_msgs.String> firstIncomingMessageQueue;
00077 private IncomingMessageQueue<std_msgs.String> secondIncomingMessageQueue;
00078 private std_msgs.String expectedMessage;
00079
00080 private class ServerHandler extends SimpleChannelHandler {
00081 @Override
00082 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
00083 if (DEBUG) {
00084 log.info("Channel connected: " + e.getChannel().toString());
00085 }
00086 Channel channel = e.getChannel();
00087 outgoingMessageQueue.addChannel(channel);
00088 super.channelConnected(ctx, e);
00089 }
00090
00091 @Override
00092 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
00093 throws Exception {
00094 if (DEBUG) {
00095 log.info("Channel disconnected: " + e.getChannel().toString());
00096 }
00097 super.channelDisconnected(ctx, e);
00098 }
00099
00100 @Override
00101 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
00102 if (DEBUG) {
00103 log.info("Channel exception: " + e.getChannel().toString());
00104 }
00105 e.getChannel().close();
00106 throw new RuntimeException(e.getCause());
00107 }
00108 }
00109
00110 @Before
00111 public void setup() {
00112 executorService = Executors.newCachedThreadPool();
00113 MessageDefinitionProvider messageDefinitionProvider = new MessageDefinitionReflectionProvider();
00114 TopicMessageFactory topicMessageFactory = new TopicMessageFactory(messageDefinitionProvider);
00115 expectedMessage = topicMessageFactory.newFromType(std_msgs.String._TYPE);
00116 expectedMessage.setData("Would you like to play a game?");
00117 outgoingMessageQueue =
00118 new OutgoingMessageQueue<Message>(new DefaultMessageSerializer(), executorService);
00119 firstIncomingMessageQueue =
00120 new IncomingMessageQueue<std_msgs.String>(new DefaultMessageDeserializer<std_msgs.String>(
00121 MessageIdentifier.of(std_msgs.String._TYPE), topicMessageFactory), executorService);
00122 secondIncomingMessageQueue =
00123 new IncomingMessageQueue<std_msgs.String>(new DefaultMessageDeserializer<std_msgs.String>(
00124 MessageIdentifier.of(std_msgs.String._TYPE), topicMessageFactory), executorService);
00125 firstTcpClientManager = new TcpClientManager(executorService);
00126 firstTcpClientManager.addNamedChannelHandler(firstIncomingMessageQueue.getMessageReceiver());
00127 secondTcpClientManager = new TcpClientManager(executorService);
00128 secondTcpClientManager.addNamedChannelHandler(secondIncomingMessageQueue.getMessageReceiver());
00129 }
00130
00131 @After
00132 public void tearDown() {
00133 outgoingMessageQueue.shutdown();
00134 executorService.shutdown();
00135 }
00136
00137 private void startRepeatingPublisher() {
00138 executorService.execute(new CancellableLoop() {
00139 @Override
00140 protected void loop() throws InterruptedException {
00141 outgoingMessageQueue.add(expectedMessage);
00142 Thread.sleep(100);
00143 }
00144 });
00145 }
00146
00147 private Channel buildServerChannel() {
00148 TopicParticipantManager topicParticipantManager = new TopicParticipantManager();
00149 ServiceManager serviceManager = new ServiceManager();
00150 NioServerSocketChannelFactory channelFactory =
00151 new NioServerSocketChannelFactory(executorService, executorService);
00152 ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
00153 bootstrap.setOption("child.bufferFactory",
00154 new HeapChannelBufferFactory(ByteOrder.LITTLE_ENDIAN));
00155 bootstrap.setOption("child.keepAlive", true);
00156 ChannelGroup serverChannelGroup = new DefaultChannelGroup();
00157 TcpServerPipelineFactory serverPipelineFactory =
00158 new TcpServerPipelineFactory(serverChannelGroup, topicParticipantManager, serviceManager) {
00159 @Override
00160 public ChannelPipeline getPipeline() {
00161 ChannelPipeline pipeline = super.getPipeline();
00162
00163
00164
00165 pipeline.remove(TcpServerPipelineFactory.HANDSHAKE_HANDLER);
00166 pipeline.addLast("ServerHandler", new ServerHandler());
00167 return pipeline;
00168 }
00169 };
00170 bootstrap.setPipelineFactory(serverPipelineFactory);
00171 Channel serverChannel = bootstrap.bind(new InetSocketAddress(0));
00172 return serverChannel;
00173 }
00174
00175 private TcpClient connect(TcpClientManager TcpClientManager, Channel serverChannel) {
00176 return TcpClientManager.connect("Foo", serverChannel.getLocalAddress());
00177 }
00178
00179 private CountDownLatch expectMessage(IncomingMessageQueue<std_msgs.String> incomingMessageQueue)
00180 throws InterruptedException {
00181 final CountDownLatch latch = new CountDownLatch(1);
00182 incomingMessageQueue.addListener(new MessageListener<std_msgs.String>() {
00183 @Override
00184 public void onNewMessage(std_msgs.String message) {
00185 assertEquals(message, expectedMessage);
00186 latch.countDown();
00187 }
00188 }, QUEUE_CAPACITY);
00189 return latch;
00190 }
00191
00192 private void expectMessages() throws InterruptedException {
00193 CountDownLatch firstLatch = expectMessage(firstIncomingMessageQueue);
00194 CountDownLatch secondLatch = expectMessage(secondIncomingMessageQueue);
00195 assertTrue(firstLatch.await(3, TimeUnit.SECONDS));
00196 assertTrue(secondLatch.await(3, TimeUnit.SECONDS));
00197 }
00198
00199 @Test
00200 public void testSendAndReceiveMessage() throws InterruptedException {
00201 startRepeatingPublisher();
00202 Channel serverChannel = buildServerChannel();
00203 connect(firstTcpClientManager, serverChannel);
00204 connect(secondTcpClientManager, serverChannel);
00205 expectMessages();
00206 }
00207
00208 @Test
00209 public void testSendAndReceiveLatchedMessage() throws InterruptedException {
00210
00211
00212 outgoingMessageQueue.setLatchMode(true);
00213 outgoingMessageQueue.add(expectedMessage);
00214 Channel serverChannel = buildServerChannel();
00215 firstIncomingMessageQueue.setLatchMode(true);
00216 secondIncomingMessageQueue.setLatchMode(true);
00217 connect(firstTcpClientManager, serverChannel);
00218 connect(secondTcpClientManager, serverChannel);
00219
00220
00221
00222
00223 expectMessages();
00224
00225
00226 expectMessages();
00227 }
00228
00229 @Test
00230 public void testSendAfterIncomingQueueShutdown() throws InterruptedException {
00231 startRepeatingPublisher();
00232 Channel serverChannel = buildServerChannel();
00233 connect(firstTcpClientManager, serverChannel);
00234 firstTcpClientManager.shutdown();
00235 outgoingMessageQueue.add(expectedMessage);
00236 }
00237
00238 @Test
00239 public void testSendAfterServerChannelClosed() throws InterruptedException {
00240 startRepeatingPublisher();
00241 Channel serverChannel = buildServerChannel();
00242 connect(firstTcpClientManager, serverChannel);
00243 assertTrue(serverChannel.close().await(1, TimeUnit.SECONDS));
00244 outgoingMessageQueue.add(expectedMessage);
00245 }
00246
00247 @Test
00248 public void testSendAfterOutgoingQueueShutdown() throws InterruptedException {
00249 startRepeatingPublisher();
00250 Channel serverChannel = buildServerChannel();
00251 connect(firstTcpClientManager, serverChannel);
00252 outgoingMessageQueue.shutdown();
00253 outgoingMessageQueue.add(expectedMessage);
00254 }
00255 }