36 #include "test_roscpp/ThroughputMessage.h" 37 #include "test_roscpp/LatencyMessage.h" 42 #include <boost/thread.hpp> 54 ThroughputTest(
double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
59 void sendThread(boost::barrier* all_connected);
62 void callback(
const test_roscpp::ThroughputMessageConstPtr& msg);
87 std::vector<ros::Publisher>
pubs_;
128 ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() <<
"] waiting for all threads to start");
134 ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() <<
"] running");
141 ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() <<
"] adding results and exiting");
143 boost::mutex::scoped_lock lock(
mutex_);
154 std::vector<ros::Publisher> pubs;
155 for (uint32_t i = 0; i <
streams_; ++i)
157 std::stringstream ss;
158 ss <<
"throughput_perf_test_" << i;
159 pubs.push_back(nh.
advertise<test_roscpp::ThroughputMessage>(ss.str(), 1));
164 boost::mutex::scoped_lock lock(
mutex_);
165 pubs_.insert(
pubs_.end(), pubs.begin(), pubs.end());
168 ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() <<
"] waiting for connections");
173 for (uint32_t i = 0; i <
streams_; ++i)
175 if (pubs[i].getNumSubscribers() == 0)
182 test_roscpp::ThroughputMessagePtr msg(boost::make_shared<test_roscpp::ThroughputMessage>());
185 all_connected->wait();
187 ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() <<
"] all connections established, beginning to publish");
196 while (!boost::this_thread::interruption_requested())
198 for (uint32_t j = 0; j < streams; ++j)
200 pubs[j].publish(msg);
206 boost::this_thread::yield();
209 catch (boost::thread_interrupted&)
213 ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() <<
"] exiting");
215 boost::mutex::scoped_lock lock(
mutex_);
221 ROS_INFO(
"Starting receive threads");
228 std::vector<ros::Subscriber> subs;
229 for (uint32_t i = 0; i <
streams_; ++i)
231 std::stringstream ss;
232 ss <<
"throughput_perf_test_" << i;
251 receiver_all_ready.wait();
253 receiver_start.wait();
256 sender_all_connected.wait();
259 ROS_INFO(
"All receive threads done");
263 ROS_INFO(
"All publish threads done");
282 std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator it =
receive_results_.begin();
283 std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator end =
receive_results_.end();
284 for (; it != end; ++it)
295 std::vector<boost::shared_ptr<SendThreadResult> >::iterator it =
send_results_.begin();
296 std::vector<boost::shared_ptr<SendThreadResult> >::iterator end =
send_results_.end();
297 for (; it != end; ++it)
314 ThroughputResult throughput(
double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
316 ROS_INFO_STREAM(
"*****************************************************");
317 ROS_INFO_STREAM(
"Running throughput test: "<<
"receiver_threads [" << receiver_threads <<
"], sender_threads [" << sender_threads <<
"], streams [" << streams <<
"], test_duration [" << test_duration <<
"], message_size [" << message_size <<
"]");
319 ThroughputTest t(test_duration, streams, message_size, sender_threads, receiver_threads);
336 LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
341 void sendThread(boost::barrier* b, uint32_t i);
344 void receiveCallback(
const test_roscpp::LatencyMessageConstPtr& msg,
ros::Publisher& pub);
345 void sendCallback(
const test_roscpp::LatencyMessageConstPtr& msg,
ros::Publisher& pub, uint32_t thread_index);
356 std::vector<boost::shared_ptr<ThreadResult> >
results_;
369 LatencyTest::LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
370 : count_per_stream_(count_per_stream)
381 test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
382 reply->receipt_time = receipt_time.
toSec();
389 if (msg->thread_index != thread_index)
394 thread_result_->latencies.push_back(msg->receipt_time - msg->publish_time);
397 test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
417 std::vector<ros::Publisher> pubs;
418 std::vector<ros::Subscriber> subs;
420 for (uint32_t i = 0; i <
streams_; ++i)
422 std::stringstream ss;
423 ss <<
"latency_perf_test_" << i;
424 pubs.push_back(nh.
advertise<test_roscpp::LatencyMessage>(ss.str(), 0));
434 for (uint32_t i = 0; i <
streams_; ++i)
436 if (pubs[i].getNumSubscribers() == 0)
443 std::vector<test_roscpp::LatencyMessagePtr> messages;
444 for (uint32_t i = 0; i <
streams_; ++i)
446 test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
447 msg->thread_index = thread_index;
449 messages.push_back(msg);
452 all_connected->wait();
458 const uint32_t total_messages = count * streams;
459 for (uint32_t j = 0; j < streams; ++j)
462 pubs[j].publish(messages[j]);
465 while (r.
latencies.size() < total_messages)
470 ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() <<
"] exiting");
472 boost::mutex::scoped_lock lock(
mutex_);
478 ROS_INFO(
"Starting receive threads");
485 std::vector<ros::Subscriber> subs;
486 std::vector<ros::Publisher> pubs;
488 for (uint32_t i = 0; i <
streams_; ++i)
490 std::stringstream ss;
491 ss <<
"latency_perf_test_" << i;
492 std::string sub_topic = ss.str();
494 std::string pub_topic = ss.str();
495 pubs.push_back(nh.
advertise<test_roscpp::LatencyMessage>(pub_topic, 0));
499 boost::barrier all_connected(1 + sender_threads_);
502 ROS_INFO(
"Starting publish threads");
508 ROS_INFO(
"Waiting for all connections to establish");
514 for (uint32_t i = 0; i <
streams_; ++i)
516 if (pubs[i].getNumSubscribers() == 0)
523 all_connected.wait();
524 ROS_INFO(
"All connections established");
527 receive_spinner.
start();
530 ROS_INFO(
"All publish threads done");
546 double latency_total = 0.0;
547 uint32_t latency_count = 0;
549 std::vector<boost::shared_ptr<ThreadResult> >::iterator it =
results_.begin();
550 std::vector<boost::shared_ptr<ThreadResult> >::iterator end =
results_.end();
551 for (; it != end; ++it)
556 std::vector<double>::iterator lat_it = tr.
latencies.begin();
557 std::vector<double>::iterator lat_end = tr.
latencies.end();
558 for (; lat_it != lat_end; ++lat_it)
576 LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
578 ROS_INFO_STREAM(
"*****************************************************");
579 ROS_INFO_STREAM(
"Running latency test: "<<
"receiver_threads [" << receiver_threads <<
"], sender_threads [" << sender_threads <<
"], streams [" << streams <<
"], count_per_stream [" << count_per_stream <<
"], message_size [" << message_size <<
"]");
581 LatencyTest t(count_per_stream, streams, message_size, sender_threads, receiver_threads);
619 : message_count_(message_count)
626 test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
627 reply->receipt_time = receipt_time.
toSec();
636 test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
650 ROS_INFO(
"Starting receive threads");
662 ROS_INFO(
"Waiting for all connections to establish");
671 ROS_INFO(
"All connections established");
673 test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
675 send_pub.publish(msg);
688 double latency_total = 0.0;
689 uint32_t latency_count = 0;
693 for (; lat_it != lat_end; ++lat_it)
705 ROS_INFO(
"Done collating results");
712 ROS_INFO_STREAM(
"*****************************************************");
713 ROS_INFO_STREAM(
"Running single-threaded latency test: message count[" << message_count <<
"]");
ros::CallbackQueue queue_
boost::thread_specific_ptr< SendThreadResult > send_thread_result_
boost::shared_ptr< void const > VoidConstPtr
uint32_t count_per_stream_
STLatencyResult stlatency(uint32_t message_count)
void sendCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
void publish(const boost::shared_ptr< M > &message) const
LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
static uint32_t serializedLength(typename boost::call_traits< T >::param_type t)
boost::thread_group send_threads_
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
void receiveCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
ros::WallTime first_send_time
std::vector< boost::shared_ptr< ThreadResult > > results_
uint32_t receiver_threads_
uint64_t messages_received
void sendThread(boost::barrier *all_connected)
std::vector< boost::shared_ptr< SendThreadResult > > send_results_
boost::thread_group send_threads_
ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
uint64_t messages_received
void sendCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub, uint32_t thread_index)
boost::thread_specific_ptr< ThreadResult > thread_result_
uint64_t count_per_stream
void callback(const test_roscpp::ThroughputMessageConstPtr &msg)
STLatencyTest(uint32_t message_count)
boost::thread_specific_ptr< ReceiveThreadResult > receive_thread_result_
void receiveCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
TransportHints & tcpNoDelay(bool nodelay=true)
uint32_t receiver_threads
std::vector< boost::shared_ptr< ReceiveThreadResult > > receive_results_
uint64_t total_bytes_received
std::vector< double > latencies
void setCallbackQueue(CallbackQueueInterface *queue)
ThroughputResult throughput(double duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
uint64_t total_message_count
void sendThread(boost::barrier *b, uint32_t i)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
std::vector< ros::Publisher > pubs_
std::vector< double > latencies
uint32_t getNumSubscribers() const
ros::CallbackQueue receive_queue_
uint64_t total_message_count
#define ROS_INFO_STREAM(args)
uint32_t receiver_threads
LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
ros::CallbackQueue receive_queue_
uint32_t receiver_threads_
void receiveThread(boost::barrier *all_started, boost::barrier *all_start, ros::WallTime *end_time)
ros::WallTime last_recv_time
uint64_t total_bytes_sent
boost::thread_group receive_threads_
uint64_t bytes_per_second