MessageQueueIntegrationTest.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.internal.transport;
00018 
00019 import static org.junit.Assert.assertEquals;
00020 import static org.junit.Assert.assertTrue;
00021 
00022 import org.apache.commons.logging.Log;
00023 import org.apache.commons.logging.LogFactory;
00024 import org.jboss.netty.bootstrap.ServerBootstrap;
00025 import org.jboss.netty.buffer.HeapChannelBufferFactory;
00026 import org.jboss.netty.channel.Channel;
00027 import org.jboss.netty.channel.ChannelHandlerContext;
00028 import org.jboss.netty.channel.ChannelPipeline;
00029 import org.jboss.netty.channel.ChannelStateEvent;
00030 import org.jboss.netty.channel.ExceptionEvent;
00031 import org.jboss.netty.channel.SimpleChannelHandler;
00032 import org.jboss.netty.channel.group.ChannelGroup;
00033 import org.jboss.netty.channel.group.DefaultChannelGroup;
00034 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
00035 import org.junit.After;
00036 import org.junit.Before;
00037 import org.junit.Test;
00038 import org.ros.concurrent.CancellableLoop;
00039 import org.ros.internal.message.DefaultMessageDeserializer;
00040 import org.ros.internal.message.DefaultMessageSerializer;
00041 import org.ros.internal.message.Message;
00042 import org.ros.internal.message.definition.MessageDefinitionReflectionProvider;
00043 import org.ros.internal.message.topic.TopicMessageFactory;
00044 import org.ros.internal.node.service.ServiceManager;
00045 import org.ros.internal.node.topic.TopicParticipantManager;
00046 import org.ros.internal.transport.queue.IncomingMessageQueue;
00047 import org.ros.internal.transport.queue.OutgoingMessageQueue;
00048 import org.ros.internal.transport.tcp.TcpClient;
00049 import org.ros.internal.transport.tcp.TcpClientManager;
00050 import org.ros.internal.transport.tcp.TcpServerPipelineFactory;
00051 import org.ros.message.MessageDefinitionProvider;
00052 import org.ros.message.MessageIdentifier;
00053 import org.ros.message.MessageListener;
00054 
00055 import java.net.InetSocketAddress;
00056 import java.nio.ByteOrder;
00057 import java.util.concurrent.CountDownLatch;
00058 import java.util.concurrent.ExecutorService;
00059 import java.util.concurrent.Executors;
00060 import java.util.concurrent.TimeUnit;
00061 
00065 public class MessageQueueIntegrationTest {
00066 
00067   private static final boolean DEBUG = false;
00068   private static final Log log = LogFactory.getLog(MessageQueueIntegrationTest.class);
00069 
00070   private static final int QUEUE_CAPACITY = 128;
00071 
00072   private ExecutorService executorService;
00073   private TcpClientManager firstTcpClientManager;
00074   private TcpClientManager secondTcpClientManager;
00075   private OutgoingMessageQueue<Message> outgoingMessageQueue;
00076   private IncomingMessageQueue<std_msgs.String> firstIncomingMessageQueue;
00077   private IncomingMessageQueue<std_msgs.String> secondIncomingMessageQueue;
00078   private std_msgs.String expectedMessage;
00079 
00080   private class ServerHandler extends SimpleChannelHandler {
00081     @Override
00082     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
00083       if (DEBUG) {
00084         log.info("Channel connected: " + e.getChannel().toString());
00085       }
00086       Channel channel = e.getChannel();
00087       outgoingMessageQueue.addChannel(channel);
00088       super.channelConnected(ctx, e);
00089     }
00090 
00091     @Override
00092     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
00093         throws Exception {
00094       if (DEBUG) {
00095         log.info("Channel disconnected: " + e.getChannel().toString());
00096       }
00097       super.channelDisconnected(ctx, e);
00098     }
00099 
00100     @Override
00101     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
00102       if (DEBUG) {
00103         log.info("Channel exception: " + e.getChannel().toString());
00104       }
00105       e.getChannel().close();
00106       throw new RuntimeException(e.getCause());
00107     }
00108   }
00109 
00110   @Before
00111   public void setup() {
00112     executorService = Executors.newCachedThreadPool();
00113     MessageDefinitionProvider messageDefinitionProvider = new MessageDefinitionReflectionProvider();
00114     TopicMessageFactory topicMessageFactory = new TopicMessageFactory(messageDefinitionProvider);
00115     expectedMessage = topicMessageFactory.newFromType(std_msgs.String._TYPE);
00116     expectedMessage.setData("Would you like to play a game?");
00117     outgoingMessageQueue =
00118         new OutgoingMessageQueue<Message>(new DefaultMessageSerializer(), executorService);
00119     firstIncomingMessageQueue =
00120         new IncomingMessageQueue<std_msgs.String>(new DefaultMessageDeserializer<std_msgs.String>(
00121             MessageIdentifier.of(std_msgs.String._TYPE), topicMessageFactory), executorService);
00122     secondIncomingMessageQueue =
00123         new IncomingMessageQueue<std_msgs.String>(new DefaultMessageDeserializer<std_msgs.String>(
00124             MessageIdentifier.of(std_msgs.String._TYPE), topicMessageFactory), executorService);
00125     firstTcpClientManager = new TcpClientManager(executorService);
00126     firstTcpClientManager.addNamedChannelHandler(firstIncomingMessageQueue.getMessageReceiver());
00127     secondTcpClientManager = new TcpClientManager(executorService);
00128     secondTcpClientManager.addNamedChannelHandler(secondIncomingMessageQueue.getMessageReceiver());
00129   }
00130 
00131   @After
00132   public void tearDown() {
00133     outgoingMessageQueue.shutdown();
00134     executorService.shutdown();
00135   }
00136 
00137   private void startRepeatingPublisher() {
00138     executorService.execute(new CancellableLoop() {
00139       @Override
00140       protected void loop() throws InterruptedException {
00141         outgoingMessageQueue.add(expectedMessage);
00142         Thread.sleep(100);
00143       }
00144     });
00145   }
00146 
00147   private Channel buildServerChannel() {
00148     TopicParticipantManager topicParticipantManager = new TopicParticipantManager();
00149     ServiceManager serviceManager = new ServiceManager();
00150     NioServerSocketChannelFactory channelFactory =
00151         new NioServerSocketChannelFactory(executorService, executorService);
00152     ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
00153     bootstrap.setOption("child.bufferFactory",
00154         new HeapChannelBufferFactory(ByteOrder.LITTLE_ENDIAN));
00155     bootstrap.setOption("child.keepAlive", true);
00156     ChannelGroup serverChannelGroup = new DefaultChannelGroup();
00157     TcpServerPipelineFactory serverPipelineFactory =
00158         new TcpServerPipelineFactory(serverChannelGroup, topicParticipantManager, serviceManager) {
00159           @Override
00160           public ChannelPipeline getPipeline() {
00161             ChannelPipeline pipeline = super.getPipeline();
00162             // We're not interested firstIncomingMessageQueue testing the
00163             // handshake here. Removing it means connections are established
00164             // immediately.
00165             pipeline.remove(TcpServerPipelineFactory.HANDSHAKE_HANDLER);
00166             pipeline.addLast("ServerHandler", new ServerHandler());
00167             return pipeline;
00168           }
00169         };
00170     bootstrap.setPipelineFactory(serverPipelineFactory);
00171     Channel serverChannel = bootstrap.bind(new InetSocketAddress(0));
00172     return serverChannel;
00173   }
00174 
00175   private TcpClient connect(TcpClientManager TcpClientManager, Channel serverChannel) {
00176     return TcpClientManager.connect("Foo", serverChannel.getLocalAddress());
00177   }
00178 
00179   private CountDownLatch expectMessage(IncomingMessageQueue<std_msgs.String> incomingMessageQueue)
00180       throws InterruptedException {
00181     final CountDownLatch latch = new CountDownLatch(1);
00182     incomingMessageQueue.addListener(new MessageListener<std_msgs.String>() {
00183       @Override
00184       public void onNewMessage(std_msgs.String message) {
00185         assertEquals(message, expectedMessage);
00186         latch.countDown();
00187       }
00188     }, QUEUE_CAPACITY);
00189     return latch;
00190   }
00191 
00192   private void expectMessages() throws InterruptedException {
00193     CountDownLatch firstLatch = expectMessage(firstIncomingMessageQueue);
00194     CountDownLatch secondLatch = expectMessage(secondIncomingMessageQueue);
00195     assertTrue(firstLatch.await(3, TimeUnit.SECONDS));
00196     assertTrue(secondLatch.await(3, TimeUnit.SECONDS));
00197   }
00198 
00199   @Test
00200   public void testSendAndReceiveMessage() throws InterruptedException {
00201     startRepeatingPublisher();
00202     Channel serverChannel = buildServerChannel();
00203     connect(firstTcpClientManager, serverChannel);
00204     connect(secondTcpClientManager, serverChannel);
00205     expectMessages();
00206   }
00207 
00208   @Test
00209   public void testSendAndReceiveLatchedMessage() throws InterruptedException {
00210     // Setting latched mode and writing a message should cause any
00211     // IncomingMessageQueues that connect in the future to receive the message.
00212     outgoingMessageQueue.setLatchMode(true);
00213     outgoingMessageQueue.add(expectedMessage);
00214     Channel serverChannel = buildServerChannel();
00215     firstIncomingMessageQueue.setLatchMode(true);
00216     secondIncomingMessageQueue.setLatchMode(true);
00217     connect(firstTcpClientManager, serverChannel);
00218     connect(secondTcpClientManager, serverChannel);
00219     // The first set of incoming messages could either be from the
00220     // OutgoingMessageQueue latching or the Subscriber latching. This is
00221     // equivalent to waiting for the message to arrive and ensures that we've
00222     // latched it in.
00223     expectMessages();
00224     // The second set of incoming messages can only be from the
00225     // IncomingMessageQueue latching since we only sent one message.
00226     expectMessages();
00227   }
00228 
00229   @Test
00230   public void testSendAfterIncomingQueueShutdown() throws InterruptedException {
00231     startRepeatingPublisher();
00232     Channel serverChannel = buildServerChannel();
00233     connect(firstTcpClientManager, serverChannel);
00234     firstTcpClientManager.shutdown();
00235     outgoingMessageQueue.add(expectedMessage);
00236   }
00237 
00238   @Test
00239   public void testSendAfterServerChannelClosed() throws InterruptedException {
00240     startRepeatingPublisher();
00241     Channel serverChannel = buildServerChannel();
00242     connect(firstTcpClientManager, serverChannel);
00243     assertTrue(serverChannel.close().await(1, TimeUnit.SECONDS));
00244     outgoingMessageQueue.add(expectedMessage);
00245   }
00246 
00247   @Test
00248   public void testSendAfterOutgoingQueueShutdown() throws InterruptedException {
00249     startRepeatingPublisher();
00250     Channel serverChannel = buildServerChannel();
00251     connect(firstTcpClientManager, serverChannel);
00252     outgoingMessageQueue.shutdown();
00253     outgoingMessageQueue.add(expectedMessage);
00254   }
00255 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49