36 #include "test_roscpp/ThroughputMessage.h"
37 #include "test_roscpp/LatencyMessage.h"
42 #include <boost/thread.hpp>
54 ThroughputTest(
double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
59 void sendThread(boost::barrier* all_connected);
62 void callback(
const test_roscpp::ThroughputMessageConstPtr& msg);
87 std::vector<ros::Publisher>
pubs_;
100 : test_duration_(test_duration)
102 , message_size_(message_size)
103 , sender_threads_(sender_threads)
104 , receiver_threads_(receiver_threads)
128 ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() <<
"] waiting for all threads to start");
134 ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() <<
"] running");
141 ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() <<
"] adding results and exiting");
143 boost::mutex::scoped_lock lock(
mutex_);
154 std::vector<ros::Publisher> pubs;
155 for (uint32_t i = 0; i <
streams_; ++i)
157 std::stringstream ss;
158 ss <<
"throughput_perf_test_" << i;
159 pubs.push_back(nh.
advertise<test_roscpp::ThroughputMessage>(ss.str(), 1));
164 boost::mutex::scoped_lock lock(
mutex_);
165 pubs_.insert(
pubs_.end(), pubs.begin(), pubs.end());
168 ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() <<
"] waiting for connections");
173 for (uint32_t i = 0; i <
streams_; ++i)
175 if (pubs[i].getNumSubscribers() == 0)
182 test_roscpp::ThroughputMessagePtr msg(boost::make_shared<test_roscpp::ThroughputMessage>());
185 all_connected->wait();
187 ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() <<
"] all connections established, beginning to publish");
196 while (!boost::this_thread::interruption_requested())
198 for (uint32_t j = 0; j < streams; ++j)
200 pubs[j].publish(msg);
206 boost::this_thread::yield();
209 catch (boost::thread_interrupted&)
213 ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() <<
"] exiting");
215 boost::mutex::scoped_lock lock(
mutex_);
221 ROS_INFO(
"Starting receive threads");
228 std::vector<ros::Subscriber> subs;
229 for (uint32_t i = 0; i <
streams_; ++i)
231 std::stringstream ss;
232 ss <<
"throughput_perf_test_" << i;
251 receiver_all_ready.wait();
253 receiver_start.wait();
256 sender_all_connected.wait();
259 ROS_INFO(
"All receive threads done");
263 ROS_INFO(
"All publish threads done");
282 std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator it =
receive_results_.begin();
283 std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator end =
receive_results_.end();
284 for (; it != end; ++it)
295 std::vector<boost::shared_ptr<SendThreadResult> >::iterator it =
send_results_.begin();
296 std::vector<boost::shared_ptr<SendThreadResult> >::iterator end =
send_results_.end();
297 for (; it != end; ++it)
314 ThroughputResult throughput(
double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
316 ROS_INFO_STREAM(
"*****************************************************");
317 ROS_INFO_STREAM(
"Running throughput test: "<<
"receiver_threads [" << receiver_threads <<
"], sender_threads [" << sender_threads <<
"], streams [" << streams <<
"], test_duration [" << test_duration <<
"], message_size [" << message_size <<
"]");
319 ThroughputTest t(test_duration, streams, message_size, sender_threads, receiver_threads);
336 LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
341 void sendThread(boost::barrier* b, uint32_t i);
356 std::vector<boost::shared_ptr<ThreadResult> >
results_;
369 LatencyTest::LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
370 : count_per_stream_(count_per_stream)
372 , message_size_(message_size)
373 , sender_threads_(sender_threads)
374 , receiver_threads_(receiver_threads)
381 test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
382 reply->receipt_time = receipt_time.
toSec();
389 if (msg->thread_index != thread_index)
394 thread_result_->latencies.push_back(msg->receipt_time - msg->publish_time);
397 test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
417 std::vector<ros::Publisher> pubs;
418 std::vector<ros::Subscriber> subs;
420 for (uint32_t i = 0; i <
streams_; ++i)
422 std::stringstream ss;
423 ss <<
"latency_perf_test_" << i;
424 pubs.push_back(nh.
advertise<test_roscpp::LatencyMessage>(ss.str(), 0));
434 for (uint32_t i = 0; i <
streams_; ++i)
436 if (pubs[i].getNumSubscribers() == 0)
443 std::vector<test_roscpp::LatencyMessagePtr> messages;
444 for (uint32_t i = 0; i <
streams_; ++i)
446 test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
447 msg->thread_index = thread_index;
449 messages.push_back(msg);
452 all_connected->wait();
458 const uint32_t total_messages = count * streams;
459 for (uint32_t j = 0; j < streams; ++j)
462 pubs[j].publish(messages[j]);
465 while (r.
latencies.size() < total_messages)
470 ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() <<
"] exiting");
472 boost::mutex::scoped_lock lock(
mutex_);
478 ROS_INFO(
"Starting receive threads");
485 std::vector<ros::Subscriber> subs;
486 std::vector<ros::Publisher> pubs;
488 for (uint32_t i = 0; i <
streams_; ++i)
490 std::stringstream ss;
491 ss <<
"latency_perf_test_" << i;
492 std::string sub_topic = ss.str();
494 std::string pub_topic = ss.str();
495 pubs.push_back(nh.
advertise<test_roscpp::LatencyMessage>(pub_topic, 0));
502 ROS_INFO(
"Starting publish threads");
508 ROS_INFO(
"Waiting for all connections to establish");
514 for (uint32_t i = 0; i <
streams_; ++i)
516 if (pubs[i].getNumSubscribers() == 0)
523 all_connected.wait();
524 ROS_INFO(
"All connections established");
527 receive_spinner.
start();
530 ROS_INFO(
"All publish threads done");
546 double latency_total = 0.0;
547 uint32_t latency_count = 0;
549 std::vector<boost::shared_ptr<ThreadResult> >::iterator it =
results_.begin();
550 std::vector<boost::shared_ptr<ThreadResult> >::iterator end =
results_.end();
551 for (; it != end; ++it)
556 std::vector<double>::iterator lat_it = tr.
latencies.begin();
557 std::vector<double>::iterator lat_end = tr.
latencies.end();
558 for (; lat_it != lat_end; ++lat_it)
576 LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
578 ROS_INFO_STREAM(
"*****************************************************");
579 ROS_INFO_STREAM(
"Running latency test: "<<
"receiver_threads [" << receiver_threads <<
"], sender_threads [" << sender_threads <<
"], streams [" << streams <<
"], count_per_stream [" << count_per_stream <<
"], message_size [" << message_size <<
"]");
581 LatencyTest t(count_per_stream, streams, message_size, sender_threads, receiver_threads);
619 : message_count_(message_count)
626 test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
627 reply->receipt_time = receipt_time.
toSec();
636 test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
650 ROS_INFO(
"Starting receive threads");
662 ROS_INFO(
"Waiting for all connections to establish");
671 ROS_INFO(
"All connections established");
673 test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
675 send_pub.publish(msg);
688 double latency_total = 0.0;
689 uint32_t latency_count = 0;
693 for (; lat_it != lat_end; ++lat_it)
712 ROS_INFO_STREAM(
"*****************************************************");
713 ROS_INFO_STREAM(
"Running single-threaded latency test: message count[" << message_count <<
"]");