Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.rosjava_benchmarks;
00018
00019 import org.ros.concurrent.CancellableLoop;
00020 import org.ros.concurrent.Rate;
00021 import org.ros.concurrent.WallTimeRate;
00022 import org.ros.message.Duration;
00023 import org.ros.message.MessageListener;
00024 import org.ros.message.Time;
00025 import org.ros.namespace.GraphName;
00026 import org.ros.node.AbstractNodeMain;
00027 import org.ros.node.ConnectedNode;
00028 import org.ros.node.topic.Publisher;
00029 import org.ros.node.topic.Subscriber;
00030 import tf2_msgs.TFMessage;
00031
00032 import java.util.concurrent.TimeUnit;
00033 import java.util.concurrent.atomic.AtomicInteger;
00034
00038 public class PubsubBenchmark extends AbstractNodeMain {
00039
00040 private final AtomicInteger counter;
00041
00042 private Publisher<std_msgs.String> statusPublisher;
00043 private Publisher<tf2_msgs.TFMessage> tfPublisher;
00044 private Subscriber<tf2_msgs.TFMessage> tfSubscriber;
00045 private Time time;
00046
00047 public PubsubBenchmark() {
00048 counter = new AtomicInteger();
00049 }
00050
00051 @Override
00052 public GraphName getDefaultNodeName() {
00053 return GraphName.of("pubsub_benchmark");
00054 }
00055
00056 @Override
00057 public void onStart(final ConnectedNode connectedNode) {
00058 tfSubscriber = connectedNode.newSubscriber("tf", tf2_msgs.TFMessage._TYPE);
00059 tfSubscriber.addMessageListener(new MessageListener<tf2_msgs.TFMessage>() {
00060 @Override
00061 public void onNewMessage(tf2_msgs.TFMessage message) {
00062 counter.incrementAndGet();
00063 }
00064 });
00065
00066 tfPublisher = connectedNode.newPublisher("tf", tf2_msgs.TFMessage._TYPE);
00067 final tf2_msgs.TFMessage tfMessage = tfPublisher.newMessage();
00068 geometry_msgs.TransformStamped transformStamped =
00069 connectedNode.getTopicMessageFactory().newFromType(geometry_msgs.TransformStamped._TYPE);
00070 tfMessage.getTransforms().add(transformStamped);
00071 connectedNode.executeCancellableLoop(new CancellableLoop() {
00072 @Override
00073 protected void loop() throws InterruptedException {
00074 tfPublisher.publish(tfMessage);
00075 }
00076 });
00077
00078 time = connectedNode.getCurrentTime();
00079 statusPublisher = connectedNode.newPublisher("status", std_msgs.String._TYPE);
00080 final Rate rate = new WallTimeRate(1);
00081 final std_msgs.String status = statusPublisher.newMessage();
00082 connectedNode.executeCancellableLoop(new CancellableLoop() {
00083 @Override
00084 protected void loop() throws InterruptedException {
00085 Time now = connectedNode.getCurrentTime();
00086 Duration delta = now.subtract(time);
00087 if (delta.totalNsecs() > TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS)) {
00088 double hz = counter.getAndSet(0) * 1e9 / delta.totalNsecs();
00089 status.setData(String.format("%.2f Hz", hz));
00090 statusPublisher.publish(status);
00091 time = now;
00092 }
00093 rate.sleep();
00094 }
00095 });
00096 }
00097 }