Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.node.topic;
00018
00019 import static org.junit.Assert.assertEquals;
00020 import static org.junit.Assert.assertTrue;
00021 import static org.junit.Assert.fail;
00022
00023 import org.junit.Test;
00024 import org.ros.RosTest;
00025 import org.ros.concurrent.CancellableLoop;
00026 import org.ros.internal.message.definition.MessageDefinitionReflectionProvider;
00027 import org.ros.internal.message.topic.TopicMessageFactory;
00028 import org.ros.internal.node.topic.DefaultSubscriber;
00029 import org.ros.internal.node.topic.PublisherIdentifier;
00030 import org.ros.message.MessageDefinitionProvider;
00031 import org.ros.message.MessageListener;
00032 import org.ros.namespace.GraphName;
00033 import org.ros.node.AbstractNodeMain;
00034 import org.ros.node.ConnectedNode;
00035
00036 import java.net.InetSocketAddress;
00037 import java.util.concurrent.CountDownLatch;
00038 import java.util.concurrent.TimeUnit;
00039
00045 public class TopicIntegrationTest extends RosTest {
00046
00047 private static final int QUEUE_CAPACITY = 128;
00048
00049 private final std_msgs.String expectedMessage;
00050
00051 public TopicIntegrationTest() {
00052 MessageDefinitionProvider messageDefinitionProvider = new MessageDefinitionReflectionProvider();
00053 TopicMessageFactory topicMessageFactory = new TopicMessageFactory(messageDefinitionProvider);
00054 expectedMessage = topicMessageFactory.newFromType(std_msgs.String._TYPE);
00055 expectedMessage.setData("Would you like to play a game?");
00056 }
00057
00058 @Test
00059 public void testOnePublisherToOneSubscriber() throws InterruptedException {
00060 nodeMainExecutor.execute(new AbstractNodeMain() {
00061 @Override
00062 public GraphName getDefaultNodeName() {
00063 return GraphName.of("publisher");
00064 }
00065
00066 @Override
00067 public void onStart(ConnectedNode connectedNode) {
00068 Publisher<std_msgs.String> publisher =
00069 connectedNode.newPublisher("foo", std_msgs.String._TYPE);
00070 publisher.setLatchMode(true);
00071 publisher.publish(expectedMessage);
00072 }
00073 }, nodeConfiguration);
00074
00075 final CountDownLatch messageReceived = new CountDownLatch(1);
00076 nodeMainExecutor.execute(new AbstractNodeMain() {
00077 @Override
00078 public GraphName getDefaultNodeName() {
00079 return GraphName.of("subscriber");
00080 }
00081
00082 @Override
00083 public void onStart(ConnectedNode connectedNode) {
00084 Subscriber<std_msgs.String> subscriber =
00085 connectedNode.newSubscriber("foo", std_msgs.String._TYPE);
00086 subscriber.addMessageListener(new MessageListener<std_msgs.String>() {
00087 @Override
00088 public void onNewMessage(std_msgs.String message) {
00089 assertEquals(expectedMessage, message);
00090 messageReceived.countDown();
00091 }
00092 }, QUEUE_CAPACITY);
00093 }
00094 }, nodeConfiguration);
00095
00096 assertTrue(messageReceived.await(1, TimeUnit.SECONDS));
00097 }
00098
00108 @Test
00109 public void testSubscriberStartsBeforePublisher() throws InterruptedException {
00110 final CountDownSubscriberListener<std_msgs.String> subscriberListener =
00111 CountDownSubscriberListener.newDefault();
00112 final CountDownLatch messageReceived = new CountDownLatch(1);
00113 nodeMainExecutor.execute(new AbstractNodeMain() {
00114 @Override
00115 public GraphName getDefaultNodeName() {
00116 return GraphName.of("subscriber");
00117 }
00118
00119 @Override
00120 public void onStart(ConnectedNode connectedNode) {
00121 Subscriber<std_msgs.String> subscriber =
00122 connectedNode.newSubscriber("foo", std_msgs.String._TYPE);
00123 subscriber.addSubscriberListener(subscriberListener);
00124 subscriber.addMessageListener(new MessageListener<std_msgs.String>() {
00125 @Override
00126 public void onNewMessage(std_msgs.String message) {
00127 assertEquals(expectedMessage, message);
00128 messageReceived.countDown();
00129 }
00130 }, QUEUE_CAPACITY);
00131 }
00132 }, nodeConfiguration);
00133
00134 subscriberListener.awaitMasterRegistrationSuccess(1, TimeUnit.SECONDS);
00135
00136 nodeMainExecutor.execute(new AbstractNodeMain() {
00137 @Override
00138 public GraphName getDefaultNodeName() {
00139 return GraphName.of("publisher");
00140 }
00141
00142 @Override
00143 public void onStart(ConnectedNode connectedNode) {
00144 Publisher<std_msgs.String> publisher =
00145 connectedNode.newPublisher("foo", std_msgs.String._TYPE);
00146 publisher.setLatchMode(true);
00147 publisher.publish(expectedMessage);
00148 }
00149 }, nodeConfiguration);
00150
00151 assertTrue(messageReceived.await(1, TimeUnit.SECONDS));
00152 }
00153
00154 @Test
00155 public void testAddDisconnectedPublisher() {
00156 nodeMainExecutor.execute(new AbstractNodeMain() {
00157 @Override
00158 public GraphName getDefaultNodeName() {
00159 return GraphName.of("subscriber");
00160 }
00161
00162 @Override
00163 public void onStart(ConnectedNode connectedNode) {
00164 DefaultSubscriber<std_msgs.String> subscriber =
00165 (DefaultSubscriber<std_msgs.String>) connectedNode.<std_msgs.String>newSubscriber(
00166 "foo", std_msgs.String._TYPE);
00167 try {
00168 subscriber.addPublisher(PublisherIdentifier.newFromStrings("foo", "http://foo", "foo"),
00169 new InetSocketAddress(1234));
00170 fail();
00171 } catch (RuntimeException e) {
00172
00173 }
00174 }
00175 }, nodeConfiguration);
00176 }
00177
00178 private final class Listener implements MessageListener<rosjava_test_msgs.TestHeader> {
00179 private final CountDownLatch latch = new CountDownLatch(10);
00180
00181 private rosjava_test_msgs.TestHeader lastMessage;
00182
00183 @Override
00184 public void onNewMessage(rosjava_test_msgs.TestHeader message) {
00185 int seq = message.getHeader().getSeq();
00186 long stamp = message.getHeader().getStamp().totalNsecs();
00187 if (lastMessage != null) {
00188 int lastSeq = lastMessage.getHeader().getSeq();
00189 long lastStamp = lastMessage.getHeader().getStamp().totalNsecs();
00190 assertTrue(String.format("message seq %d <= previous seq %d", seq, lastSeq), seq > lastSeq);
00191 assertTrue(String.format("message stamp %d <= previous stamp %d", stamp, lastStamp),
00192 stamp > lastStamp);
00193 }
00194 lastMessage = message;
00195 latch.countDown();
00196 }
00197
00198 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
00199 return latch.await(timeout, unit);
00200 }
00201 }
00202
00203 @Test
00204 public void testHeader() throws InterruptedException {
00205 nodeMainExecutor.execute(new AbstractNodeMain() {
00206 @Override
00207 public GraphName getDefaultNodeName() {
00208 return GraphName.of("publisher");
00209 }
00210
00211 @Override
00212 public void onStart(final ConnectedNode connectedNode) {
00213 final Publisher<rosjava_test_msgs.TestHeader> publisher =
00214 connectedNode.newPublisher("foo", rosjava_test_msgs.TestHeader._TYPE);
00215 connectedNode.executeCancellableLoop(new CancellableLoop() {
00216 @Override
00217 public void loop() throws InterruptedException {
00218 rosjava_test_msgs.TestHeader message =
00219 connectedNode.getTopicMessageFactory().newFromType(rosjava_test_msgs.TestHeader._TYPE);
00220 message.getHeader().setStamp(connectedNode.getCurrentTime());
00221 publisher.publish(message);
00222
00223
00224 Thread.sleep(1);
00225 }
00226 });
00227 }
00228 }, nodeConfiguration);
00229
00230 final Listener listener = new Listener();
00231 nodeMainExecutor.execute(new AbstractNodeMain() {
00232 @Override
00233 public GraphName getDefaultNodeName() {
00234 return GraphName.of("subscriber");
00235 }
00236
00237 @Override
00238 public void onStart(ConnectedNode connectedNode) {
00239 Subscriber<rosjava_test_msgs.TestHeader> subscriber =
00240 connectedNode.newSubscriber("foo", rosjava_test_msgs.TestHeader._TYPE);
00241 subscriber.addMessageListener(listener, QUEUE_CAPACITY);
00242 }
00243 }, nodeConfiguration);
00244
00245 assertTrue(listener.await(1, TimeUnit.SECONDS));
00246 }
00247 }