intra.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2009, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "perf_roscpp/intra.h"
00036 #include "test_roscpp/ThroughputMessage.h"
00037 #include "test_roscpp/LatencyMessage.h"
00038 
00039 #include <ros/ros.h>
00040 #include <ros/callback_queue.h>
00041 
00042 #include <boost/thread.hpp>
00043 
00044 #include <vector>
00045 
00046 namespace perf_roscpp
00047 {
00048 namespace intra
00049 {
00050 
00051 class ThroughputTest
00052 {
00053 public:
00054   ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
00055 
00056   ThroughputResult run();
00057 
00058 private:
00059   void sendThread(boost::barrier* all_connected);
00060   void receiveThread(boost::barrier* all_started, boost::barrier* all_start, ros::WallTime* end_time);
00061 
00062   void callback(const test_roscpp::ThroughputMessageConstPtr& msg);
00063 
00064   boost::mutex mutex_;
00065 
00066   struct ReceiveThreadResult
00067   {
00068     uint64_t bytes_received;
00069     uint64_t messages_received;
00070 
00071     ros::WallTime last_recv_time;
00072   };
00073   boost::thread_specific_ptr<ReceiveThreadResult> receive_thread_result_;
00074   std::vector<boost::shared_ptr<ReceiveThreadResult> > receive_results_;
00075 
00076   struct SendThreadResult
00077   {
00078     uint64_t bytes_sent;
00079     uint64_t messages_sent;
00080 
00081     ros::WallTime first_send_time;
00082   };
00083   boost::thread_specific_ptr<SendThreadResult> send_thread_result_;
00084   std::vector<boost::shared_ptr<SendThreadResult> > send_results_;
00085 
00086   ros::CallbackQueue queue_;
00087   std::vector<ros::Publisher> pubs_;
00088 
00089   boost::thread_group receive_threads_;
00090   boost::thread_group send_threads_;
00091 
00092   double test_duration_;
00093   uint32_t streams_;
00094   uint32_t message_size_;
00095   uint32_t sender_threads_;
00096   uint32_t receiver_threads_;
00097 };
00098 
00099 ThroughputTest::ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
00100 : test_duration_(test_duration)
00101 , streams_(streams)
00102 , message_size_(message_size)
00103 , sender_threads_(sender_threads)
00104 , receiver_threads_(receiver_threads)
00105 {
00106 }
00107 
00108 void ThroughputTest::callback(const test_roscpp::ThroughputMessageConstPtr& msg)
00109 {
00110   ReceiveThreadResult& r = *receive_thread_result_;
00111 
00112   r.bytes_received += ros::serialization::Serializer<test_roscpp::ThroughputMessage>::serializedLength(*msg) + 4; // 4 byte message length field
00113   ++r.messages_received;
00114 
00115   r.last_recv_time = ros::WallTime::now();
00116 
00117   //ROS_INFO_STREAM("Received message " << r.messages_received);
00118 }
00119 
00120 void ThroughputTest::receiveThread(boost::barrier* all_ready, boost::barrier* all_start, ros::WallTime* end_time)
00121 {
00122   receive_thread_result_.reset(new ReceiveThreadResult);
00123 
00124   ReceiveThreadResult& r = *receive_thread_result_;
00125   r.messages_received = 0;
00126   r.bytes_received = 0;
00127 
00128   ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] waiting for all threads to start");
00129 
00130   all_ready->wait();
00131   all_start->wait();
00132   ros::WallTime local_end_time = *end_time;
00133 
00134   ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] running");
00135 
00136   while (ros::WallTime::now() < local_end_time)
00137   {
00138     queue_.callOne(ros::WallDuration(0.01));
00139   }
00140 
00141   ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] adding results and exiting");
00142 
00143   boost::mutex::scoped_lock lock(mutex_);
00144   receive_results_.push_back(boost::shared_ptr<ReceiveThreadResult>(receive_thread_result_.release()));
00145 }
00146 
00147 void ThroughputTest::sendThread(boost::barrier* all_connected)
00148 {
00149   send_thread_result_.reset(new SendThreadResult);
00150   SendThreadResult& r = *send_thread_result_;
00151 
00152   ros::NodeHandle nh;
00153   nh.setCallbackQueue(&queue_);
00154   std::vector<ros::Publisher> pubs;
00155   for (uint32_t i = 0; i < streams_; ++i)
00156   {
00157     std::stringstream ss;
00158     ss << "throughput_perf_test_" << i;
00159     pubs.push_back(nh.advertise<test_roscpp::ThroughputMessage>(ss.str(), 1));
00160   }
00161 
00162   // Need to keep around the publishers so the connections don't go away
00163   {
00164     boost::mutex::scoped_lock lock(mutex_);
00165     pubs_.insert(pubs_.end(), pubs.begin(), pubs.end());
00166   }
00167 
00168   ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] waiting for connections");
00169   bool cont = true;
00170   while (cont)
00171   {
00172     cont = false;
00173     for (uint32_t i = 0; i < streams_; ++i)
00174     {
00175       if (pubs[i].getNumSubscribers() == 0)
00176       {
00177         cont = true;
00178       }
00179     }
00180   }
00181 
00182   test_roscpp::ThroughputMessagePtr msg(boost::make_shared<test_roscpp::ThroughputMessage>());
00183   msg->array.resize(message_size_);
00184 
00185   all_connected->wait();
00186 
00187   ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] all connections established, beginning to publish");
00188 
00189   r.first_send_time = ros::WallTime::now();
00190   r.bytes_sent = 0;
00191   r.messages_sent = 0;
00192 
00193   try
00194   {
00195     const uint32_t streams = streams_;
00196     while (!boost::this_thread::interruption_requested())
00197     {
00198       for (uint32_t j = 0; j < streams; ++j)
00199       {
00200         pubs[j].publish(msg);
00201 
00202         ++r.messages_sent;
00203         r.bytes_sent += ros::serialization::Serializer<test_roscpp::ThroughputMessage>::serializedLength(*msg) + 4;
00204       }
00205 
00206       boost::this_thread::yield();
00207     }
00208   }
00209   catch (boost::thread_interrupted&)
00210   {
00211   }
00212 
00213   ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] exiting");
00214 
00215   boost::mutex::scoped_lock lock(mutex_);
00216   send_results_.push_back(boost::shared_ptr<SendThreadResult>(send_thread_result_.release()));
00217 }
00218 
00219 ThroughputResult ThroughputTest::run()
00220 {
00221   ROS_INFO("Starting receive threads");
00222   ThroughputResult r;
00223   r.test_start = ros::WallTime::now();
00224 
00225   ros::NodeHandle nh;
00226   nh.setCallbackQueue(&queue_);
00227 
00228   std::vector<ros::Subscriber> subs;
00229   for (uint32_t i = 0; i < streams_; ++i)
00230   {
00231     std::stringstream ss;
00232     ss << "throughput_perf_test_" << i;
00233     subs.push_back(nh.subscribe(ss.str(), 0, &ThroughputTest::callback, this, ros::TransportHints().tcpNoDelay()));
00234   }
00235 
00236   boost::barrier sender_all_connected(sender_threads_ + 1);
00237   boost::barrier receiver_all_ready(receiver_threads_ + 1);
00238   boost::barrier receiver_start(receiver_threads_ + 1);
00239   ros::WallTime test_end_time;
00240 
00241   for (uint32_t i = 0; i < receiver_threads_; ++i)
00242   {
00243     receive_threads_.create_thread(boost::bind(&ThroughputTest::receiveThread, this, &receiver_all_ready, &receiver_start, &test_end_time));
00244   }
00245 
00246   for (uint32_t i = 0; i < sender_threads_; ++i)
00247   {
00248     send_threads_.create_thread(boost::bind(&ThroughputTest::sendThread, this, &sender_all_connected));
00249   }
00250 
00251   receiver_all_ready.wait();
00252   test_end_time = ros::WallTime::now() + ros::WallDuration(test_duration_);
00253   receiver_start.wait();
00254 
00255   ros::WallTime pub_start = ros::WallTime::now();
00256   sender_all_connected.wait();
00257 
00258   receive_threads_.join_all();
00259   ROS_INFO("All receive threads done");
00260 
00261   send_threads_.interrupt_all();
00262   send_threads_.join_all();
00263   ROS_INFO("All publish threads done");
00264 
00265   ROS_INFO("Collating results");
00266 
00267   r.test_end = ros::WallTime::now();
00268 
00269   r.bytes_per_second = 0;
00270   r.message_size = message_size_;
00271   r.messages_received = 0;
00272   r.messages_sent = 0;
00273   r.receiver_threads = receiver_threads_;
00274   r.sender_threads = sender_threads_;
00275   r.total_bytes_received = 0;
00276   r.total_bytes_sent = 0;
00277   r.test_duration = test_duration_;
00278   r.streams = streams_;
00279 
00280   ros::WallTime rec_end;
00281   {
00282     std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator it = receive_results_.begin();
00283     std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator end = receive_results_.end();
00284     for (; it != end; ++it)
00285     {
00286       ReceiveThreadResult& rr = **it;
00287       r.total_bytes_received += rr.bytes_received;
00288       r.messages_received += rr.messages_received;
00289 
00290       rec_end = std::max(rec_end, rr.last_recv_time);
00291     }
00292   }
00293 
00294   {
00295     std::vector<boost::shared_ptr<SendThreadResult> >::iterator it = send_results_.begin();
00296     std::vector<boost::shared_ptr<SendThreadResult> >::iterator end = send_results_.end();
00297     for (; it != end; ++it)
00298     {
00299       SendThreadResult& sr = **it;
00300       r.total_bytes_sent += sr.bytes_sent;
00301       r.messages_sent += sr.messages_sent;
00302 
00303       pub_start = std::min(pub_start, sr.first_send_time);
00304     }
00305   }
00306 
00307   r.bytes_per_second = (double)r.total_bytes_received / (rec_end - pub_start).toSec();
00308 
00309   ROS_INFO("Done collating results");
00310 
00311   return r;
00312 }
00313 
00314 ThroughputResult throughput(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
00315 {
00316   ROS_INFO_STREAM("*****************************************************");
00317   ROS_INFO_STREAM("Running throughput test: "<< "receiver_threads [" << receiver_threads << "], sender_threads [" << sender_threads << "], streams [" << streams << "], test_duration [" << test_duration << "], message_size [" << message_size << "]");
00318 
00319   ThroughputTest t(test_duration, streams, message_size, sender_threads, receiver_threads);
00320   return t.run();
00321 }
00322 
00332 
00333 class LatencyTest
00334 {
00335 public:
00336   LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
00337 
00338   LatencyResult run();
00339 
00340 private:
00341   void sendThread(boost::barrier* b, uint32_t i);
00342   void receiveThread();
00343 
00344   void receiveCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub);
00345   void sendCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub, uint32_t thread_index);
00346 
00347   boost::mutex mutex_;
00348 
00349   struct ThreadResult
00350   {
00351     uint64_t message_count;
00352 
00353     std::vector<double> latencies;
00354   };
00355   boost::thread_specific_ptr<ThreadResult> thread_result_;
00356   std::vector<boost::shared_ptr<ThreadResult> > results_;
00357 
00358   ros::CallbackQueue receive_queue_;
00359 
00360   boost::thread_group send_threads_;
00361 
00362   uint32_t count_per_stream_;
00363   uint32_t streams_;
00364   uint32_t message_size_;
00365   uint32_t sender_threads_;
00366   uint32_t receiver_threads_;
00367 };
00368 
00369 LatencyTest::LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
00370 : count_per_stream_(count_per_stream)
00371 , streams_(streams)
00372 , message_size_(message_size)
00373 , sender_threads_(sender_threads)
00374 , receiver_threads_(receiver_threads)
00375 {
00376 }
00377 
00378 void LatencyTest::receiveCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub)
00379 {
00380   ros::WallTime receipt_time = ros::WallTime::now();
00381   test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
00382   reply->receipt_time = receipt_time.toSec();
00383   pub.publish(reply);
00384   //ROS_INFO("Receiver received message %d", msg->count);
00385 }
00386 
00387 void LatencyTest::sendCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub, uint32_t thread_index)
00388 {
00389   if (msg->thread_index != thread_index)
00390   {
00391     return;
00392   }
00393 
00394   thread_result_->latencies.push_back(msg->receipt_time - msg->publish_time);
00395   ++thread_result_->message_count;
00396 
00397   test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
00398   reply->publish_time = ros::WallTime::now().toSec();
00399   ++reply->count;
00400 
00401   //ROS_INFO("Sender received return message %d", msg->count);
00402 
00403   if (reply->count < count_per_stream_ * streams_)
00404   {
00405     pub.publish(reply);
00406   }
00407 }
00408 
00409 void LatencyTest::sendThread(boost::barrier* all_connected, uint32_t thread_index)
00410 {
00411   thread_result_.reset(new ThreadResult);
00412   ThreadResult& r = *thread_result_;
00413 
00414   ros::NodeHandle nh;
00415   ros::CallbackQueue queue;
00416   nh.setCallbackQueue(&queue);
00417   std::vector<ros::Publisher> pubs;
00418   std::vector<ros::Subscriber> subs;
00419   pubs.reserve(streams_);
00420   for (uint32_t i = 0; i < streams_; ++i)
00421   {
00422     std::stringstream ss;
00423     ss << "latency_perf_test_" << i;
00424     pubs.push_back(nh.advertise<test_roscpp::LatencyMessage>(ss.str(), 0));
00425 
00426     ss << "_return";
00427     subs.push_back(nh.subscribe<test_roscpp::LatencyMessage>(ss.str(), 0, boost::bind(&LatencyTest::sendCallback, this, _1, boost::ref(pubs[i]), thread_index), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay()));
00428   }
00429 
00430   bool cont = true;
00431   while (cont)
00432   {
00433     cont = false;
00434     for (uint32_t i = 0; i < streams_; ++i)
00435     {
00436       if (pubs[i].getNumSubscribers() == 0)
00437       {
00438         cont = true;
00439       }
00440     }
00441   }
00442 
00443   std::vector<test_roscpp::LatencyMessagePtr> messages;
00444   for (uint32_t i = 0; i < streams_; ++i)
00445   {
00446     test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
00447     msg->thread_index = thread_index;
00448     msg->array.resize(message_size_);
00449     messages.push_back(msg);
00450   }
00451 
00452   all_connected->wait();
00453 
00454   r.message_count = 0;
00455 
00456   const uint32_t count = count_per_stream_;
00457   const uint32_t streams = streams_;
00458   const uint32_t total_messages = count * streams;
00459   for (uint32_t j = 0; j < streams; ++j)
00460   {
00461     messages[j]->publish_time = ros::WallTime::now().toSec();
00462     pubs[j].publish(messages[j]);
00463   }
00464 
00465   while (r.latencies.size() < total_messages)
00466   {
00467     queue.callAvailable(ros::WallDuration(0.01));
00468   }
00469 
00470   ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] exiting");
00471 
00472   boost::mutex::scoped_lock lock(mutex_);
00473   results_.push_back(boost::shared_ptr<ThreadResult>(thread_result_.release()));
00474 }
00475 
00476 LatencyResult LatencyTest::run()
00477 {
00478   ROS_INFO("Starting receive threads");
00479   LatencyResult r;
00480   r.test_start = ros::WallTime::now();
00481 
00482   ros::NodeHandle nh;
00483   nh.setCallbackQueue(&receive_queue_);
00484 
00485   std::vector<ros::Subscriber> subs;
00486   std::vector<ros::Publisher> pubs;
00487   pubs.reserve(streams_ * sender_threads_);
00488   for (uint32_t i = 0; i < streams_; ++i)
00489   {
00490     std::stringstream ss;
00491     ss << "latency_perf_test_" << i;
00492     std::string sub_topic = ss.str();
00493     ss << "_return";
00494     std::string pub_topic = ss.str();
00495     pubs.push_back(nh.advertise<test_roscpp::LatencyMessage>(pub_topic, 0));
00496     subs.push_back(nh.subscribe<test_roscpp::LatencyMessage>(sub_topic, 0, boost::bind(&LatencyTest::receiveCallback, this, _1, boost::ref(pubs.back())), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay()));
00497   }
00498 
00499   boost::barrier all_connected(1 + sender_threads_);
00500 
00501   ros::WallTime pub_start = ros::WallTime::now();
00502   ROS_INFO("Starting publish threads");
00503   for (uint32_t i = 0; i < sender_threads_; ++i)
00504   {
00505     send_threads_.create_thread(boost::bind(&LatencyTest::sendThread, this, &all_connected, i));
00506   }
00507 
00508   ROS_INFO("Waiting for all connections to establish");
00509 
00510   bool cont = true;
00511   while (cont)
00512   {
00513     cont = false;
00514     for (uint32_t i = 0; i < streams_; ++i)
00515     {
00516       if (pubs[i].getNumSubscribers() == 0)
00517       {
00518         cont = true;
00519       }
00520     }
00521   }
00522 
00523   all_connected.wait();
00524   ROS_INFO("All connections established");
00525 
00526   ros::AsyncSpinner receive_spinner(receiver_threads_, &receive_queue_);
00527   receive_spinner.start();
00528 
00529   send_threads_.join_all();
00530   ROS_INFO("All publish threads done");
00531 
00532   ROS_INFO("Collating results");
00533 
00534   r.test_end = ros::WallTime::now();
00535 
00536   r.latency_avg = 0;
00537   r.latency_max = 0;
00538   r.latency_min = 9999999999999ULL;
00539   r.total_message_count = 0;
00540   r.message_size = message_size_;
00541   r.receiver_threads = receiver_threads_;
00542   r.sender_threads = sender_threads_;
00543   r.count_per_stream = count_per_stream_;
00544   r.streams = streams_;
00545 
00546   double latency_total = 0.0;
00547   uint32_t latency_count = 0;
00548   {
00549     std::vector<boost::shared_ptr<ThreadResult> >::iterator it = results_.begin();
00550     std::vector<boost::shared_ptr<ThreadResult> >::iterator end = results_.end();
00551     for (; it != end; ++it)
00552     {
00553       ThreadResult& tr = **it;
00554       r.total_message_count += tr.message_count;
00555 
00556       std::vector<double>::iterator lat_it = tr.latencies.begin();
00557       std::vector<double>::iterator lat_end = tr.latencies.end();
00558       for (; lat_it != lat_end; ++lat_it)
00559       {
00560         double latency = *lat_it;
00561         r.latency_min = std::min(r.latency_min, latency);
00562         r.latency_max = std::max(r.latency_max, latency);
00563         ++latency_count;
00564         latency_total += latency;
00565       }
00566     }
00567   }
00568 
00569   r.latency_avg = latency_total / latency_count;
00570 
00571   ROS_INFO("Done collating results");
00572 
00573   return r;
00574 }
00575 
00576 LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
00577 {
00578   ROS_INFO_STREAM("*****************************************************");
00579   ROS_INFO_STREAM("Running latency test: "<< "receiver_threads [" << receiver_threads << "], sender_threads [" << sender_threads << "], streams [" << streams << "], count_per_stream [" << count_per_stream << "], message_size [" << message_size << "]");
00580 
00581   LatencyTest t(count_per_stream, streams, message_size, sender_threads, receiver_threads);
00582   return t.run();
00583 }
00584 
00594 
00595 class STLatencyTest
00596 {
00597 public:
00598   STLatencyTest(uint32_t message_count);
00599 
00600   STLatencyResult run();
00601 
00602 private:
00603 
00604   void receiveCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub);
00605   void sendCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub);
00606 
00607   struct Result
00608   {
00609     std::vector<double> latencies;
00610   };
00611   Result result_;
00612 
00613   ros::CallbackQueue receive_queue_;
00614 
00615   uint32_t message_count_;
00616 };
00617 
00618 STLatencyTest::STLatencyTest(uint32_t message_count)
00619 : message_count_(message_count)
00620 {
00621 }
00622 
00623 void STLatencyTest::receiveCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub)
00624 {
00625   ros::WallTime receipt_time = ros::WallTime::now();
00626   test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
00627   reply->receipt_time = receipt_time.toSec();
00628   pub.publish(reply);
00629   //ROS_INFO("Receiver received message %d", msg->count);
00630 }
00631 
00632 void STLatencyTest::sendCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub)
00633 {
00634   result_.latencies.push_back(msg->receipt_time - msg->publish_time);
00635 
00636   test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
00637   reply->publish_time = ros::WallTime::now().toSec();
00638   ++reply->count;
00639 
00640   //ROS_INFO("Sender received return message %d", msg->count);
00641 
00642   if (reply->count < message_count_)
00643   {
00644     pub.publish(reply);
00645   }
00646 }
00647 
00648 STLatencyResult STLatencyTest::run()
00649 {
00650   ROS_INFO("Starting receive threads");
00651   STLatencyResult r;
00652   r.test_start = ros::WallTime::now();
00653 
00654   ros::NodeHandle nh;
00655   nh.setCallbackQueue(&receive_queue_);
00656 
00657   ros::Publisher recv_pub = nh.advertise<test_roscpp::LatencyMessage>("stlatency_perf_test_return", 0);
00658   ros::Subscriber recv_sub = nh.subscribe<test_roscpp::LatencyMessage>("stlatency_perf_test", 0, boost::bind(&STLatencyTest::receiveCallback, this, _1, boost::ref(recv_pub)), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay());
00659   ros::Publisher send_pub = nh.advertise<test_roscpp::LatencyMessage>("stlatency_perf_test", 0);
00660   ros::Subscriber send_sub = nh.subscribe<test_roscpp::LatencyMessage>("stlatency_perf_test_return", 0, boost::bind(&STLatencyTest::sendCallback, this, _1, boost::ref(send_pub)), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay());
00661 
00662   ROS_INFO("Waiting for all connections to establish");
00663 
00664   bool cont = true;
00665   while (cont)
00666   {
00667     cont = recv_pub.getNumSubscribers() == 0 || send_pub.getNumSubscribers() == 0;
00668     ros::WallDuration(0.001).sleep();
00669   }
00670 
00671   ROS_INFO("All connections established");
00672 
00673   test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
00674   msg->publish_time = ros::WallTime::now().toSec();
00675   send_pub.publish(msg);
00676   while (msg->count < message_count_)
00677   {
00678     receive_queue_.callAvailable(ros::WallDuration(0.1));
00679   }
00680 
00681   r.test_end = ros::WallTime::now();
00682 
00683   r.latency_avg = 0;
00684   r.latency_max = 0;
00685   r.latency_min = 9999999999999ULL;
00686   r.total_message_count = message_count_;
00687 
00688   double latency_total = 0.0;
00689   uint32_t latency_count = 0;
00690   {
00691     std::vector<double>::iterator lat_it = result_.latencies.begin();
00692     std::vector<double>::iterator lat_end = result_.latencies.end();
00693     for (; lat_it != lat_end; ++lat_it)
00694     {
00695       double latency = *lat_it;
00696       r.latency_min = std::min(r.latency_min, latency);
00697       r.latency_max = std::max(r.latency_max, latency);
00698       ++latency_count;
00699       latency_total += latency;
00700     }
00701   }
00702 
00703   r.latency_avg = latency_total / latency_count;
00704 
00705   ROS_INFO("Done collating results");
00706 
00707   return r;
00708 }
00709 
00710 STLatencyResult stlatency(uint32_t message_count)
00711 {
00712   ROS_INFO_STREAM("*****************************************************");
00713   ROS_INFO_STREAM("Running single-threaded latency test: message count[" << message_count << "]");
00714 
00715   STLatencyTest t(message_count);
00716   return t.run();
00717 }
00718 
00719 } // namespace intra
00720 } // namespace perf_roscpp


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:45:23