Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.transport.queue;
00018
00019 import static org.junit.Assert.assertTrue;
00020 import static org.junit.Assert.fail;
00021
00022 import org.junit.Before;
00023 import org.junit.Test;
00024 import org.ros.concurrent.CircularBlockingDeque;
00025 import org.ros.internal.message.DefaultMessageFactory;
00026 import org.ros.internal.message.definition.MessageDefinitionReflectionProvider;
00027 import org.ros.message.MessageFactory;
00028 import org.ros.message.MessageListener;
00029 import std_msgs.Int32;
00030
00031 import java.util.concurrent.CountDownLatch;
00032 import java.util.concurrent.ExecutorService;
00033 import java.util.concurrent.Executors;
00034 import java.util.concurrent.TimeUnit;
00035 import java.util.concurrent.atomic.AtomicInteger;
00036
00040 public class MessageDispatcherTest {
00041
00042 private static final int QUEUE_CAPACITY = 128;
00043
00044 private ExecutorService executorService;
00045 private CircularBlockingDeque<LazyMessage<std_msgs.Int32>> lazyMessages;
00046 private MessageFactory messageFactory;
00047
00048 @Before
00049 public void before() {
00050 executorService = Executors.newCachedThreadPool();
00051 lazyMessages = new CircularBlockingDeque<LazyMessage<std_msgs.Int32>>(128);
00052 messageFactory = new DefaultMessageFactory(new MessageDefinitionReflectionProvider());
00053 }
00054
00055 @Test
00056 public void testMessageOrder() throws InterruptedException {
00057 int numberOfMessages = 100;
00058 final CountDownLatch latch = new CountDownLatch(numberOfMessages);
00059
00060 MessageDispatcher<std_msgs.Int32> messageDispatcher =
00061 new MessageDispatcher<std_msgs.Int32>(lazyMessages, executorService);
00062 messageDispatcher.addListener(new MessageListener<std_msgs.Int32>() {
00063 private AtomicInteger count = new AtomicInteger();
00064
00065 @Override
00066 public void onNewMessage(Int32 message) {
00067 if (this.count.compareAndSet(message.getData(), message.getData() + 1)) {
00068 latch.countDown();
00069 } else {
00070 fail(String.format("Expected message data not equal to actual data: %d != %d",
00071 count.get(), message.getData()));
00072 }
00073 try {
00074
00075
00076 Thread.sleep(5);
00077 } catch (InterruptedException e) {
00078 }
00079 }
00080 }, QUEUE_CAPACITY);
00081 executorService.execute(messageDispatcher);
00082
00083 for (int i = 0; i < numberOfMessages; i++) {
00084 final int count = i;
00085 std_msgs.Int32 message = messageFactory.newFromType(std_msgs.Int32._TYPE);
00086 message.setData(count);
00087 lazyMessages.addLast(new LazyMessage<std_msgs.Int32>(message));
00088 }
00089
00090 assertTrue(latch.await(1, TimeUnit.SECONDS));
00091 }
00092 }