TopicIntegrationTest.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.node.topic;
00018 
00019 import static org.junit.Assert.assertEquals;
00020 import static org.junit.Assert.assertTrue;
00021 import static org.junit.Assert.fail;
00022 
00023 import org.junit.Test;
00024 import org.ros.RosTest;
00025 import org.ros.concurrent.CancellableLoop;
00026 import org.ros.internal.message.definition.MessageDefinitionReflectionProvider;
00027 import org.ros.internal.message.topic.TopicMessageFactory;
00028 import org.ros.internal.node.topic.DefaultSubscriber;
00029 import org.ros.internal.node.topic.PublisherIdentifier;
00030 import org.ros.message.MessageDefinitionProvider;
00031 import org.ros.message.MessageListener;
00032 import org.ros.namespace.GraphName;
00033 import org.ros.node.AbstractNodeMain;
00034 import org.ros.node.ConnectedNode;
00035 
00036 import java.net.InetSocketAddress;
00037 import java.util.concurrent.CountDownLatch;
00038 import java.util.concurrent.TimeUnit;
00039 
00045 public class TopicIntegrationTest extends RosTest {
00046 
00047   private static final int QUEUE_CAPACITY = 128;
00048 
00049   private final std_msgs.String expectedMessage;
00050 
00051   public TopicIntegrationTest() {
00052     MessageDefinitionProvider messageDefinitionProvider = new MessageDefinitionReflectionProvider();
00053     TopicMessageFactory topicMessageFactory = new TopicMessageFactory(messageDefinitionProvider);
00054     expectedMessage = topicMessageFactory.newFromType(std_msgs.String._TYPE);
00055     expectedMessage.setData("Would you like to play a game?");
00056   }
00057 
00058   @Test
00059   public void testOnePublisherToOneSubscriber() throws InterruptedException {
00060     nodeMainExecutor.execute(new AbstractNodeMain() {
00061       @Override
00062       public GraphName getDefaultNodeName() {
00063         return GraphName.of("publisher");
00064       }
00065 
00066       @Override
00067       public void onStart(ConnectedNode connectedNode) {
00068         Publisher<std_msgs.String> publisher =
00069             connectedNode.newPublisher("foo", std_msgs.String._TYPE);
00070         publisher.setLatchMode(true);
00071         publisher.publish(expectedMessage);
00072       }
00073     }, nodeConfiguration);
00074 
00075     final CountDownLatch messageReceived = new CountDownLatch(1);
00076     nodeMainExecutor.execute(new AbstractNodeMain() {
00077       @Override
00078       public GraphName getDefaultNodeName() {
00079         return GraphName.of("subscriber");
00080       }
00081 
00082       @Override
00083       public void onStart(ConnectedNode connectedNode) {
00084         Subscriber<std_msgs.String> subscriber =
00085             connectedNode.newSubscriber("foo", std_msgs.String._TYPE);
00086         subscriber.addMessageListener(new MessageListener<std_msgs.String>() {
00087           @Override
00088           public void onNewMessage(std_msgs.String message) {
00089             assertEquals(expectedMessage, message);
00090             messageReceived.countDown();
00091           }
00092         }, QUEUE_CAPACITY);
00093       }
00094     }, nodeConfiguration);
00095 
00096     assertTrue(messageReceived.await(1, TimeUnit.SECONDS));
00097   }
00098 
00108   @Test
00109   public void testSubscriberStartsBeforePublisher() throws InterruptedException {
00110     final CountDownSubscriberListener<std_msgs.String> subscriberListener =
00111         CountDownSubscriberListener.newDefault();
00112     final CountDownLatch messageReceived = new CountDownLatch(1);
00113     nodeMainExecutor.execute(new AbstractNodeMain() {
00114       @Override
00115       public GraphName getDefaultNodeName() {
00116         return GraphName.of("subscriber");
00117       }
00118 
00119       @Override
00120       public void onStart(ConnectedNode connectedNode) {
00121         Subscriber<std_msgs.String> subscriber =
00122             connectedNode.newSubscriber("foo", std_msgs.String._TYPE);
00123         subscriber.addSubscriberListener(subscriberListener);
00124         subscriber.addMessageListener(new MessageListener<std_msgs.String>() {
00125           @Override
00126           public void onNewMessage(std_msgs.String message) {
00127             assertEquals(expectedMessage, message);
00128             messageReceived.countDown();
00129           }
00130         }, QUEUE_CAPACITY);
00131       }
00132     }, nodeConfiguration);
00133 
00134     subscriberListener.awaitMasterRegistrationSuccess(1, TimeUnit.SECONDS);
00135 
00136     nodeMainExecutor.execute(new AbstractNodeMain() {
00137       @Override
00138       public GraphName getDefaultNodeName() {
00139         return GraphName.of("publisher");
00140       }
00141 
00142       @Override
00143       public void onStart(ConnectedNode connectedNode) {
00144         Publisher<std_msgs.String> publisher =
00145             connectedNode.newPublisher("foo", std_msgs.String._TYPE);
00146         publisher.setLatchMode(true);
00147         publisher.publish(expectedMessage);
00148       }
00149     }, nodeConfiguration);
00150 
00151     assertTrue(messageReceived.await(1, TimeUnit.SECONDS));
00152   }
00153 
00154   @Test
00155   public void testAddDisconnectedPublisher() {
00156     nodeMainExecutor.execute(new AbstractNodeMain() {
00157       @Override
00158       public GraphName getDefaultNodeName() {
00159         return GraphName.of("subscriber");
00160       }
00161 
00162       @Override
00163       public void onStart(ConnectedNode connectedNode) {
00164         DefaultSubscriber<std_msgs.String> subscriber =
00165             (DefaultSubscriber<std_msgs.String>) connectedNode.<std_msgs.String>newSubscriber(
00166                 "foo", std_msgs.String._TYPE);
00167         try {
00168           subscriber.addPublisher(PublisherIdentifier.newFromStrings("foo", "http://foo", "foo"),
00169               new InetSocketAddress(1234));
00170           fail();
00171         } catch (RuntimeException e) {
00172           // Connecting to a disconnected publisher should fail.
00173         }
00174       }
00175     }, nodeConfiguration);
00176   }
00177 
00178   private final class Listener implements MessageListener<rosjava_test_msgs.TestHeader> {
00179     private final CountDownLatch latch = new CountDownLatch(10);
00180 
00181     private rosjava_test_msgs.TestHeader lastMessage;
00182 
00183     @Override
00184     public void onNewMessage(rosjava_test_msgs.TestHeader message) {
00185       int seq = message.getHeader().getSeq();
00186       long stamp = message.getHeader().getStamp().totalNsecs();
00187       if (lastMessage != null) {
00188         int lastSeq = lastMessage.getHeader().getSeq();
00189         long lastStamp = lastMessage.getHeader().getStamp().totalNsecs();
00190         assertTrue(String.format("message seq %d <= previous seq %d", seq, lastSeq), seq > lastSeq);
00191         assertTrue(String.format("message stamp %d <= previous stamp %d", stamp, lastStamp),
00192             stamp > lastStamp);
00193       }
00194       lastMessage = message;
00195       latch.countDown();
00196     }
00197 
00198     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
00199       return latch.await(timeout, unit);
00200     }
00201   }
00202 
00203   @Test
00204   public void testHeader() throws InterruptedException {
00205     nodeMainExecutor.execute(new AbstractNodeMain() {
00206       @Override
00207       public GraphName getDefaultNodeName() {
00208         return GraphName.of("publisher");
00209       }
00210 
00211       @Override
00212       public void onStart(final ConnectedNode connectedNode) {
00213         final Publisher<rosjava_test_msgs.TestHeader> publisher =
00214             connectedNode.newPublisher("foo", rosjava_test_msgs.TestHeader._TYPE);
00215         connectedNode.executeCancellableLoop(new CancellableLoop() {
00216           @Override
00217           public void loop() throws InterruptedException {
00218             rosjava_test_msgs.TestHeader message =
00219                 connectedNode.getTopicMessageFactory().newFromType(rosjava_test_msgs.TestHeader._TYPE);
00220             message.getHeader().setStamp(connectedNode.getCurrentTime());
00221             publisher.publish(message);
00222             // There needs to be some time between messages in order to
00223             // guarantee that the timestamp increases.
00224             Thread.sleep(1);
00225           }
00226         });
00227       }
00228     }, nodeConfiguration);
00229 
00230     final Listener listener = new Listener();
00231     nodeMainExecutor.execute(new AbstractNodeMain() {
00232       @Override
00233       public GraphName getDefaultNodeName() {
00234         return GraphName.of("subscriber");
00235       }
00236 
00237       @Override
00238       public void onStart(ConnectedNode connectedNode) {
00239         Subscriber<rosjava_test_msgs.TestHeader> subscriber =
00240             connectedNode.newSubscriber("foo", rosjava_test_msgs.TestHeader._TYPE);
00241         subscriber.addMessageListener(listener, QUEUE_CAPACITY);
00242       }
00243     }, nodeConfiguration);
00244 
00245     assertTrue(listener.await(1, TimeUnit.SECONDS));
00246   }
00247 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49