$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2009, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include "perf_roscpp/intra.h" 00036 #include "perf_roscpp/ThroughputMessage.h" 00037 #include "perf_roscpp/LatencyMessage.h" 00038 00039 #include <ros/ros.h> 00040 #include <ros/callback_queue.h> 00041 00042 #include <boost/thread.hpp> 00043 00044 #include <vector> 00045 00046 namespace perf_roscpp 00047 { 00048 namespace intra 00049 { 00050 00051 class ThroughputTest 00052 { 00053 public: 00054 ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads); 00055 00056 ThroughputResult run(); 00057 00058 private: 00059 void sendThread(boost::barrier* all_connected); 00060 void receiveThread(boost::barrier* all_started, boost::barrier* all_start, ros::WallTime* end_time); 00061 00062 void callback(const ThroughputMessageConstPtr& msg); 00063 00064 boost::mutex mutex_; 00065 00066 struct ReceiveThreadResult 00067 { 00068 uint64_t bytes_received; 00069 uint64_t messages_received; 00070 00071 ros::WallTime last_recv_time; 00072 }; 00073 boost::thread_specific_ptr<ReceiveThreadResult> receive_thread_result_; 00074 std::vector<boost::shared_ptr<ReceiveThreadResult> > receive_results_; 00075 00076 struct SendThreadResult 00077 { 00078 uint64_t bytes_sent; 00079 uint64_t messages_sent; 00080 00081 ros::WallTime first_send_time; 00082 }; 00083 boost::thread_specific_ptr<SendThreadResult> send_thread_result_; 00084 std::vector<boost::shared_ptr<SendThreadResult> > send_results_; 00085 00086 ros::CallbackQueue queue_; 00087 std::vector<ros::Publisher> pubs_; 00088 00089 boost::thread_group receive_threads_; 00090 boost::thread_group send_threads_; 00091 00092 double test_duration_; 00093 uint32_t streams_; 00094 uint32_t message_size_; 00095 uint32_t sender_threads_; 00096 uint32_t receiver_threads_; 00097 }; 00098 00099 ThroughputTest::ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads) 00100 : test_duration_(test_duration) 00101 , streams_(streams) 00102 , message_size_(message_size) 00103 , sender_threads_(sender_threads) 00104 , receiver_threads_(receiver_threads) 00105 { 00106 } 00107 00108 void ThroughputTest::callback(const ThroughputMessageConstPtr& msg) 00109 { 00110 ReceiveThreadResult& r = *receive_thread_result_; 00111 00112 r.bytes_received += ros::serialization::Serializer<ThroughputMessage>::serializedLength(*msg) + 4; // 4 byte message length field 00113 ++r.messages_received; 00114 00115 r.last_recv_time = ros::WallTime::now(); 00116 00117 //ROS_INFO_STREAM("Received message " << r.messages_received); 00118 } 00119 00120 void ThroughputTest::receiveThread(boost::barrier* all_ready, boost::barrier* all_start, ros::WallTime* end_time) 00121 { 00122 receive_thread_result_.reset(new ReceiveThreadResult); 00123 00124 ReceiveThreadResult& r = *receive_thread_result_; 00125 r.messages_received = 0; 00126 r.bytes_received = 0; 00127 00128 ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] waiting for all threads to start"); 00129 00130 all_ready->wait(); 00131 all_start->wait(); 00132 ros::WallTime local_end_time = *end_time; 00133 00134 ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] running"); 00135 00136 while (ros::WallTime::now() < local_end_time) 00137 { 00138 queue_.callOne(ros::WallDuration(0.01)); 00139 } 00140 00141 ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] adding results and exiting"); 00142 00143 boost::mutex::scoped_lock lock(mutex_); 00144 receive_results_.push_back(boost::shared_ptr<ReceiveThreadResult>(receive_thread_result_.release())); 00145 } 00146 00147 void ThroughputTest::sendThread(boost::barrier* all_connected) 00148 { 00149 send_thread_result_.reset(new SendThreadResult); 00150 SendThreadResult& r = *send_thread_result_; 00151 00152 ros::NodeHandle nh; 00153 nh.setCallbackQueue(&queue_); 00154 std::vector<ros::Publisher> pubs; 00155 for (uint32_t i = 0; i < streams_; ++i) 00156 { 00157 std::stringstream ss; 00158 ss << "throughput_perf_test_" << i; 00159 pubs.push_back(nh.advertise<ThroughputMessage>(ss.str(), 1)); 00160 } 00161 00162 // Need to keep around the publishers so the connections don't go away 00163 { 00164 boost::mutex::scoped_lock lock(mutex_); 00165 pubs_.insert(pubs_.end(), pubs.begin(), pubs.end()); 00166 } 00167 00168 ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] waiting for connections"); 00169 bool cont = true; 00170 while (cont) 00171 { 00172 cont = false; 00173 for (uint32_t i = 0; i < streams_; ++i) 00174 { 00175 if (pubs[i].getNumSubscribers() == 0) 00176 { 00177 cont = true; 00178 } 00179 } 00180 } 00181 00182 ThroughputMessagePtr msg(new ThroughputMessage); 00183 msg->array.resize(message_size_); 00184 00185 all_connected->wait(); 00186 00187 ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] all connections established, beginning to publish"); 00188 00189 r.first_send_time = ros::WallTime::now(); 00190 r.bytes_sent = 0; 00191 r.messages_sent = 0; 00192 00193 try 00194 { 00195 const uint32_t streams = streams_; 00196 while (!boost::this_thread::interruption_requested()) 00197 { 00198 for (uint32_t j = 0; j < streams; ++j) 00199 { 00200 pubs[j].publish(msg); 00201 00202 ++r.messages_sent; 00203 r.bytes_sent += ros::serialization::Serializer<ThroughputMessage>::serializedLength(*msg) + 4; 00204 } 00205 00206 boost::this_thread::yield(); 00207 } 00208 } 00209 catch (boost::thread_interrupted&) 00210 { 00211 } 00212 00213 ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] exiting"); 00214 00215 boost::mutex::scoped_lock lock(mutex_); 00216 send_results_.push_back(boost::shared_ptr<SendThreadResult>(send_thread_result_.release())); 00217 } 00218 00219 ThroughputResult ThroughputTest::run() 00220 { 00221 ROS_INFO("Starting receive threads"); 00222 ThroughputResult r; 00223 r.test_start = ros::WallTime::now(); 00224 00225 ros::NodeHandle nh; 00226 nh.setCallbackQueue(&queue_); 00227 00228 std::vector<ros::Subscriber> subs; 00229 for (uint32_t i = 0; i < streams_; ++i) 00230 { 00231 std::stringstream ss; 00232 ss << "throughput_perf_test_" << i; 00233 subs.push_back(nh.subscribe(ss.str(), 0, &ThroughputTest::callback, this, ros::TransportHints().tcpNoDelay())); 00234 } 00235 00236 boost::barrier sender_all_connected(sender_threads_ + 1); 00237 boost::barrier receiver_all_ready(receiver_threads_ + 1); 00238 boost::barrier receiver_start(receiver_threads_ + 1); 00239 ros::WallTime test_end_time; 00240 00241 for (uint32_t i = 0; i < receiver_threads_; ++i) 00242 { 00243 receive_threads_.create_thread(boost::bind(&ThroughputTest::receiveThread, this, &receiver_all_ready, &receiver_start, &test_end_time)); 00244 } 00245 00246 for (uint32_t i = 0; i < sender_threads_; ++i) 00247 { 00248 send_threads_.create_thread(boost::bind(&ThroughputTest::sendThread, this, &sender_all_connected)); 00249 } 00250 00251 receiver_all_ready.wait(); 00252 test_end_time = ros::WallTime::now() + ros::WallDuration(test_duration_); 00253 receiver_start.wait(); 00254 00255 ros::WallTime pub_start = ros::WallTime::now(); 00256 sender_all_connected.wait(); 00257 00258 receive_threads_.join_all(); 00259 ROS_INFO("All receive threads done"); 00260 00261 send_threads_.interrupt_all(); 00262 send_threads_.join_all(); 00263 ROS_INFO("All publish threads done"); 00264 00265 ROS_INFO("Collating results"); 00266 00267 r.test_end = ros::WallTime::now(); 00268 00269 r.bytes_per_second = 0; 00270 r.message_size = message_size_; 00271 r.messages_received = 0; 00272 r.messages_sent = 0; 00273 r.receiver_threads = receiver_threads_; 00274 r.sender_threads = sender_threads_; 00275 r.total_bytes_received = 0; 00276 r.total_bytes_sent = 0; 00277 r.test_duration = test_duration_; 00278 r.streams = streams_; 00279 00280 ros::WallTime rec_end; 00281 { 00282 std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator it = receive_results_.begin(); 00283 std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator end = receive_results_.end(); 00284 for (; it != end; ++it) 00285 { 00286 ReceiveThreadResult& rr = **it; 00287 r.total_bytes_received += rr.bytes_received; 00288 r.messages_received += rr.messages_received; 00289 00290 rec_end = std::max(rec_end, rr.last_recv_time); 00291 } 00292 } 00293 00294 { 00295 std::vector<boost::shared_ptr<SendThreadResult> >::iterator it = send_results_.begin(); 00296 std::vector<boost::shared_ptr<SendThreadResult> >::iterator end = send_results_.end(); 00297 for (; it != end; ++it) 00298 { 00299 SendThreadResult& sr = **it; 00300 r.total_bytes_sent += sr.bytes_sent; 00301 r.messages_sent += sr.messages_sent; 00302 00303 pub_start = std::min(pub_start, sr.first_send_time); 00304 } 00305 } 00306 00307 r.bytes_per_second = (double)r.total_bytes_received / (rec_end - pub_start).toSec(); 00308 00309 ROS_INFO("Done collating results"); 00310 00311 return r; 00312 } 00313 00314 ThroughputResult throughput(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads) 00315 { 00316 ROS_INFO_STREAM("*****************************************************"); 00317 ROS_INFO_STREAM("Running throughput test: "<< "receiver_threads [" << receiver_threads << "], sender_threads [" << sender_threads << "], streams [" << streams << "], test_duration [" << test_duration << "], message_size [" << message_size << "]"); 00318 00319 ThroughputTest t(test_duration, streams, message_size, sender_threads, receiver_threads); 00320 return t.run(); 00321 } 00322 00332 00333 class LatencyTest 00334 { 00335 public: 00336 LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads); 00337 00338 LatencyResult run(); 00339 00340 private: 00341 void sendThread(boost::barrier* b, uint32_t i); 00342 void receiveThread(); 00343 00344 void receiveCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub); 00345 void sendCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub, uint32_t thread_index); 00346 00347 boost::mutex mutex_; 00348 00349 struct ThreadResult 00350 { 00351 uint64_t message_count; 00352 00353 std::vector<double> latencies; 00354 }; 00355 boost::thread_specific_ptr<ThreadResult> thread_result_; 00356 std::vector<boost::shared_ptr<ThreadResult> > results_; 00357 00358 ros::CallbackQueue receive_queue_; 00359 00360 boost::thread_group send_threads_; 00361 00362 uint32_t count_per_stream_; 00363 uint32_t streams_; 00364 uint32_t message_size_; 00365 uint32_t sender_threads_; 00366 uint32_t receiver_threads_; 00367 }; 00368 00369 LatencyTest::LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads) 00370 : count_per_stream_(count_per_stream) 00371 , streams_(streams) 00372 , message_size_(message_size) 00373 , sender_threads_(sender_threads) 00374 , receiver_threads_(receiver_threads) 00375 { 00376 } 00377 00378 void LatencyTest::receiveCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub) 00379 { 00380 ros::WallTime receipt_time = ros::WallTime::now(); 00381 LatencyMessagePtr reply = boost::const_pointer_cast<LatencyMessage>(msg); 00382 reply->receipt_time = receipt_time.toSec(); 00383 pub.publish(reply); 00384 //ROS_INFO("Receiver received message %d", msg->count); 00385 } 00386 00387 void LatencyTest::sendCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub, uint32_t thread_index) 00388 { 00389 if (msg->thread_index != thread_index) 00390 { 00391 return; 00392 } 00393 00394 thread_result_->latencies.push_back(msg->receipt_time - msg->publish_time); 00395 ++thread_result_->message_count; 00396 00397 LatencyMessagePtr reply = boost::const_pointer_cast<LatencyMessage>(msg); 00398 reply->publish_time = ros::WallTime::now().toSec(); 00399 ++reply->count; 00400 00401 //ROS_INFO("Sender received return message %d", msg->count); 00402 00403 if (reply->count < count_per_stream_ * streams_) 00404 { 00405 pub.publish(reply); 00406 } 00407 } 00408 00409 void LatencyTest::sendThread(boost::barrier* all_connected, uint32_t thread_index) 00410 { 00411 thread_result_.reset(new ThreadResult); 00412 ThreadResult& r = *thread_result_; 00413 00414 ros::NodeHandle nh; 00415 ros::CallbackQueue queue; 00416 nh.setCallbackQueue(&queue); 00417 std::vector<ros::Publisher> pubs; 00418 std::vector<ros::Subscriber> subs; 00419 pubs.reserve(streams_); 00420 for (uint32_t i = 0; i < streams_; ++i) 00421 { 00422 std::stringstream ss; 00423 ss << "latency_perf_test_" << i; 00424 pubs.push_back(nh.advertise<LatencyMessage>(ss.str(), 0)); 00425 00426 ss << "_return"; 00427 subs.push_back(nh.subscribe<LatencyMessage>(ss.str(), 0, boost::bind(&LatencyTest::sendCallback, this, _1, boost::ref(pubs[i]), thread_index), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay())); 00428 } 00429 00430 bool cont = true; 00431 while (cont) 00432 { 00433 cont = false; 00434 for (uint32_t i = 0; i < streams_; ++i) 00435 { 00436 if (pubs[i].getNumSubscribers() == 0) 00437 { 00438 cont = true; 00439 } 00440 } 00441 } 00442 00443 std::vector<LatencyMessagePtr> messages; 00444 for (uint32_t i = 0; i < streams_; ++i) 00445 { 00446 LatencyMessagePtr msg(new LatencyMessage); 00447 msg->thread_index = thread_index; 00448 msg->array.resize(message_size_); 00449 messages.push_back(msg); 00450 } 00451 00452 all_connected->wait(); 00453 00454 r.message_count = 0; 00455 00456 const uint32_t count = count_per_stream_; 00457 const uint32_t streams = streams_; 00458 const uint32_t total_messages = count * streams; 00459 for (uint32_t j = 0; j < streams; ++j) 00460 { 00461 messages[j]->publish_time = ros::WallTime::now().toSec(); 00462 pubs[j].publish(messages[j]); 00463 } 00464 00465 while (r.latencies.size() < total_messages) 00466 { 00467 queue.callAvailable(ros::WallDuration(0.01)); 00468 } 00469 00470 ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] exiting"); 00471 00472 boost::mutex::scoped_lock lock(mutex_); 00473 results_.push_back(boost::shared_ptr<ThreadResult>(thread_result_.release())); 00474 } 00475 00476 LatencyResult LatencyTest::run() 00477 { 00478 ROS_INFO("Starting receive threads"); 00479 LatencyResult r; 00480 r.test_start = ros::WallTime::now(); 00481 00482 ros::NodeHandle nh; 00483 nh.setCallbackQueue(&receive_queue_); 00484 00485 std::vector<ros::Subscriber> subs; 00486 std::vector<ros::Publisher> pubs; 00487 pubs.reserve(streams_ * sender_threads_); 00488 for (uint32_t i = 0; i < streams_; ++i) 00489 { 00490 std::stringstream ss; 00491 ss << "latency_perf_test_" << i; 00492 std::string sub_topic = ss.str(); 00493 ss << "_return"; 00494 std::string pub_topic = ss.str(); 00495 pubs.push_back(nh.advertise<LatencyMessage>(pub_topic, 0)); 00496 subs.push_back(nh.subscribe<LatencyMessage>(sub_topic, 0, boost::bind(&LatencyTest::receiveCallback, this, _1, boost::ref(pubs.back())), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay())); 00497 } 00498 00499 boost::barrier all_connected(1 + sender_threads_); 00500 00501 ros::WallTime pub_start = ros::WallTime::now(); 00502 ROS_INFO("Starting publish threads"); 00503 for (uint32_t i = 0; i < sender_threads_; ++i) 00504 { 00505 send_threads_.create_thread(boost::bind(&LatencyTest::sendThread, this, &all_connected, i)); 00506 } 00507 00508 ROS_INFO("Waiting for all connections to establish"); 00509 00510 bool cont = true; 00511 while (cont) 00512 { 00513 cont = false; 00514 for (uint32_t i = 0; i < streams_; ++i) 00515 { 00516 if (pubs[i].getNumSubscribers() == 0) 00517 { 00518 cont = true; 00519 } 00520 } 00521 } 00522 00523 all_connected.wait(); 00524 ROS_INFO("All connections established"); 00525 00526 ros::AsyncSpinner receive_spinner(receiver_threads_, &receive_queue_); 00527 receive_spinner.start(); 00528 00529 send_threads_.join_all(); 00530 ROS_INFO("All publish threads done"); 00531 00532 ROS_INFO("Collating results"); 00533 00534 r.test_end = ros::WallTime::now(); 00535 00536 r.latency_avg = 0; 00537 r.latency_max = 0; 00538 r.latency_min = 9999999999999ULL; 00539 r.total_message_count = 0; 00540 r.message_size = message_size_; 00541 r.receiver_threads = receiver_threads_; 00542 r.sender_threads = sender_threads_; 00543 r.count_per_stream = count_per_stream_; 00544 r.streams = streams_; 00545 00546 double latency_total = 0.0; 00547 uint32_t latency_count = 0; 00548 { 00549 std::vector<boost::shared_ptr<ThreadResult> >::iterator it = results_.begin(); 00550 std::vector<boost::shared_ptr<ThreadResult> >::iterator end = results_.end(); 00551 for (; it != end; ++it) 00552 { 00553 ThreadResult& tr = **it; 00554 r.total_message_count += tr.message_count; 00555 00556 std::vector<double>::iterator lat_it = tr.latencies.begin(); 00557 std::vector<double>::iterator lat_end = tr.latencies.end(); 00558 for (; lat_it != lat_end; ++lat_it) 00559 { 00560 double latency = *lat_it; 00561 r.latency_min = std::min(r.latency_min, latency); 00562 r.latency_max = std::max(r.latency_max, latency); 00563 ++latency_count; 00564 latency_total += latency; 00565 } 00566 } 00567 } 00568 00569 r.latency_avg = latency_total / latency_count; 00570 00571 ROS_INFO("Done collating results"); 00572 00573 return r; 00574 } 00575 00576 LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads) 00577 { 00578 ROS_INFO_STREAM("*****************************************************"); 00579 ROS_INFO_STREAM("Running latency test: "<< "receiver_threads [" << receiver_threads << "], sender_threads [" << sender_threads << "], streams [" << streams << "], count_per_stream [" << count_per_stream << "], message_size [" << message_size << "]"); 00580 00581 LatencyTest t(count_per_stream, streams, message_size, sender_threads, receiver_threads); 00582 return t.run(); 00583 } 00584 00594 00595 class STLatencyTest 00596 { 00597 public: 00598 STLatencyTest(uint32_t message_count); 00599 00600 STLatencyResult run(); 00601 00602 private: 00603 00604 void receiveCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub); 00605 void sendCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub); 00606 00607 struct Result 00608 { 00609 std::vector<double> latencies; 00610 }; 00611 Result result_; 00612 00613 ros::CallbackQueue receive_queue_; 00614 00615 uint32_t message_count_; 00616 }; 00617 00618 STLatencyTest::STLatencyTest(uint32_t message_count) 00619 : message_count_(message_count) 00620 { 00621 } 00622 00623 void STLatencyTest::receiveCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub) 00624 { 00625 ros::WallTime receipt_time = ros::WallTime::now(); 00626 LatencyMessagePtr reply = boost::const_pointer_cast<LatencyMessage>(msg); 00627 reply->receipt_time = receipt_time.toSec(); 00628 pub.publish(reply); 00629 //ROS_INFO("Receiver received message %d", msg->count); 00630 } 00631 00632 void STLatencyTest::sendCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub) 00633 { 00634 result_.latencies.push_back(msg->receipt_time - msg->publish_time); 00635 00636 LatencyMessagePtr reply = boost::const_pointer_cast<LatencyMessage>(msg); 00637 reply->publish_time = ros::WallTime::now().toSec(); 00638 ++reply->count; 00639 00640 //ROS_INFO("Sender received return message %d", msg->count); 00641 00642 if (reply->count < message_count_) 00643 { 00644 pub.publish(reply); 00645 } 00646 } 00647 00648 STLatencyResult STLatencyTest::run() 00649 { 00650 ROS_INFO("Starting receive threads"); 00651 STLatencyResult r; 00652 r.test_start = ros::WallTime::now(); 00653 00654 ros::NodeHandle nh; 00655 nh.setCallbackQueue(&receive_queue_); 00656 00657 ros::Publisher recv_pub = nh.advertise<LatencyMessage>("stlatency_perf_test_return", 0); 00658 ros::Subscriber recv_sub = nh.subscribe<LatencyMessage>("stlatency_perf_test", 0, boost::bind(&STLatencyTest::receiveCallback, this, _1, boost::ref(recv_pub)), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay()); 00659 ros::Publisher send_pub = nh.advertise<LatencyMessage>("stlatency_perf_test", 0); 00660 ros::Subscriber send_sub = nh.subscribe<LatencyMessage>("stlatency_perf_test_return", 0, boost::bind(&STLatencyTest::sendCallback, this, _1, boost::ref(send_pub)), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay()); 00661 00662 ROS_INFO("Waiting for all connections to establish"); 00663 00664 bool cont = true; 00665 while (cont) 00666 { 00667 cont = recv_pub.getNumSubscribers() == 0 || send_pub.getNumSubscribers() == 0; 00668 ros::WallDuration(0.001).sleep(); 00669 } 00670 00671 ROS_INFO("All connections established"); 00672 00673 LatencyMessagePtr msg(new LatencyMessage); 00674 msg->publish_time = ros::WallTime::now().toSec(); 00675 send_pub.publish(msg); 00676 while (msg->count < message_count_) 00677 { 00678 receive_queue_.callAvailable(ros::WallDuration(0.1)); 00679 } 00680 00681 r.test_end = ros::WallTime::now(); 00682 00683 r.latency_avg = 0; 00684 r.latency_max = 0; 00685 r.latency_min = 9999999999999ULL; 00686 r.total_message_count = message_count_; 00687 00688 double latency_total = 0.0; 00689 uint32_t latency_count = 0; 00690 { 00691 std::vector<double>::iterator lat_it = result_.latencies.begin(); 00692 std::vector<double>::iterator lat_end = result_.latencies.end(); 00693 for (; lat_it != lat_end; ++lat_it) 00694 { 00695 double latency = *lat_it; 00696 r.latency_min = std::min(r.latency_min, latency); 00697 r.latency_max = std::max(r.latency_max, latency); 00698 ++latency_count; 00699 latency_total += latency; 00700 } 00701 } 00702 00703 r.latency_avg = latency_total / latency_count; 00704 00705 ROS_INFO("Done collating results"); 00706 00707 return r; 00708 } 00709 00710 STLatencyResult stlatency(uint32_t message_count) 00711 { 00712 ROS_INFO_STREAM("*****************************************************"); 00713 ROS_INFO_STREAM("Running single-threaded latency test: message count[" << message_count << "]"); 00714 00715 STLatencyTest t(message_count); 00716 return t.run(); 00717 } 00718 00719 } // namespace intra 00720 } // namespace perf_roscpp