MessageDispatcherTest.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2012 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.internal.transport.queue;
00018 
00019 import static org.junit.Assert.assertTrue;
00020 import static org.junit.Assert.fail;
00021 
00022 import org.junit.Before;
00023 import org.junit.Test;
00024 import org.ros.concurrent.CircularBlockingDeque;
00025 import org.ros.internal.message.DefaultMessageFactory;
00026 import org.ros.internal.message.definition.MessageDefinitionReflectionProvider;
00027 import org.ros.message.MessageFactory;
00028 import org.ros.message.MessageListener;
00029 import std_msgs.Int32;
00030 
00031 import java.util.concurrent.CountDownLatch;
00032 import java.util.concurrent.ExecutorService;
00033 import java.util.concurrent.Executors;
00034 import java.util.concurrent.TimeUnit;
00035 import java.util.concurrent.atomic.AtomicInteger;
00036 
00040 public class MessageDispatcherTest {
00041 
00042   private static final int QUEUE_CAPACITY = 128;
00043 
00044   private ExecutorService executorService;
00045   private CircularBlockingDeque<LazyMessage<std_msgs.Int32>> lazyMessages;
00046   private MessageFactory messageFactory;
00047 
00048   @Before
00049   public void before() {
00050     executorService = Executors.newCachedThreadPool();
00051     lazyMessages = new CircularBlockingDeque<LazyMessage<std_msgs.Int32>>(128);
00052     messageFactory = new DefaultMessageFactory(new MessageDefinitionReflectionProvider());
00053   }
00054 
00055   @Test
00056   public void testMessageOrder() throws InterruptedException {
00057     int numberOfMessages = 100;
00058     final CountDownLatch latch = new CountDownLatch(numberOfMessages);
00059 
00060     MessageDispatcher<std_msgs.Int32> messageDispatcher =
00061         new MessageDispatcher<std_msgs.Int32>(lazyMessages, executorService);
00062     messageDispatcher.addListener(new MessageListener<std_msgs.Int32>() {
00063       private AtomicInteger count = new AtomicInteger();
00064 
00065       @Override
00066       public void onNewMessage(Int32 message) {
00067         if (this.count.compareAndSet(message.getData(), message.getData() + 1)) {
00068           latch.countDown();
00069         } else {
00070           fail(String.format("Expected message data not equal to actual data: %d != %d",
00071               count.get(), message.getData()));
00072         }
00073         try {
00074           // Sleeping allows the queue to fill up a bit by slowing down the
00075           // consumer.
00076           Thread.sleep(5);
00077         } catch (InterruptedException e) {
00078         }
00079       }
00080     }, QUEUE_CAPACITY);
00081     executorService.execute(messageDispatcher);
00082 
00083     for (int i = 0; i < numberOfMessages; i++) {
00084       final int count = i;
00085       std_msgs.Int32 message = messageFactory.newFromType(std_msgs.Int32._TYPE);
00086       message.setData(count);
00087       lazyMessages.addLast(new LazyMessage<std_msgs.Int32>(message));
00088     }
00089 
00090     assertTrue(latch.await(1, TimeUnit.SECONDS));
00091   }
00092 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49