36 #include "test_roscpp/ThroughputMessage.h"    37 #include "test_roscpp/LatencyMessage.h"    42 #include <boost/thread.hpp>    54   ThroughputTest(
double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
    59   void sendThread(boost::barrier* all_connected);
    62   void callback(
const test_roscpp::ThroughputMessageConstPtr& msg);
    87   std::vector<ros::Publisher> 
pubs_;
   128   ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() << 
"] waiting for all threads to start");
   134   ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() << 
"] running");
   141   ROS_INFO_STREAM(
"Receive thread [" << boost::this_thread::get_id() << 
"] adding results and exiting");
   143   boost::mutex::scoped_lock lock(
mutex_);
   154   std::vector<ros::Publisher> pubs;
   155   for (uint32_t i = 0; i < 
streams_; ++i)
   157     std::stringstream ss;
   158     ss << 
"throughput_perf_test_" << i;
   159     pubs.push_back(nh.
advertise<test_roscpp::ThroughputMessage>(ss.str(), 1));
   164     boost::mutex::scoped_lock lock(
mutex_);
   165     pubs_.insert(
pubs_.end(), pubs.begin(), pubs.end());
   168   ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() << 
"] waiting for connections");
   173     for (uint32_t i = 0; i < 
streams_; ++i)
   175       if (pubs[i].getNumSubscribers() == 0)
   182   test_roscpp::ThroughputMessagePtr msg(boost::make_shared<test_roscpp::ThroughputMessage>());
   185   all_connected->wait();
   187   ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() << 
"] all connections established, beginning to publish");
   196     while (!boost::this_thread::interruption_requested())
   198       for (uint32_t j = 0; j < streams; ++j)
   200         pubs[j].publish(msg);
   206       boost::this_thread::yield();
   209   catch (boost::thread_interrupted&)
   213   ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() << 
"] exiting");
   215   boost::mutex::scoped_lock lock(
mutex_);
   221   ROS_INFO(
"Starting receive threads");
   228   std::vector<ros::Subscriber> subs;
   229   for (uint32_t i = 0; i < 
streams_; ++i)
   231     std::stringstream ss;
   232     ss << 
"throughput_perf_test_" << i;
   251   receiver_all_ready.wait();
   253   receiver_start.wait();
   256   sender_all_connected.wait();
   259   ROS_INFO(
"All receive threads done");
   263   ROS_INFO(
"All publish threads done");
   282     std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator it = 
receive_results_.begin();
   283     std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator end = 
receive_results_.end();
   284     for (; it != end; ++it)
   295     std::vector<boost::shared_ptr<SendThreadResult> >::iterator it = 
send_results_.begin();
   296     std::vector<boost::shared_ptr<SendThreadResult> >::iterator end = 
send_results_.end();
   297     for (; it != end; ++it)
   314 ThroughputResult throughput(
double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
   316   ROS_INFO_STREAM(
"*****************************************************");
   317   ROS_INFO_STREAM(
"Running throughput test: "<< 
"receiver_threads [" << receiver_threads << 
"], sender_threads [" << sender_threads << 
"], streams [" << streams << 
"], test_duration [" << test_duration << 
"], message_size [" << message_size << 
"]");
   319   ThroughputTest t(test_duration, streams, message_size, sender_threads, receiver_threads);
   336   LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
   341   void sendThread(boost::barrier* b, uint32_t i);
   344   void receiveCallback(
const test_roscpp::LatencyMessageConstPtr& msg, 
ros::Publisher& pub);
   345   void sendCallback(
const test_roscpp::LatencyMessageConstPtr& msg, 
ros::Publisher& pub, uint32_t thread_index);
   356   std::vector<boost::shared_ptr<ThreadResult> > 
results_;
   369 LatencyTest::LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
   370 : count_per_stream_(count_per_stream)
   381   test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
   382   reply->receipt_time = receipt_time.
toSec();
   389   if (msg->thread_index != thread_index)
   394   thread_result_->latencies.push_back(msg->receipt_time - msg->publish_time);
   397   test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
   417   std::vector<ros::Publisher> pubs;
   418   std::vector<ros::Subscriber> subs;
   420   for (uint32_t i = 0; i < 
streams_; ++i)
   422     std::stringstream ss;
   423     ss << 
"latency_perf_test_" << i;
   424     pubs.push_back(nh.
advertise<test_roscpp::LatencyMessage>(ss.str(), 0));
   434     for (uint32_t i = 0; i < 
streams_; ++i)
   436       if (pubs[i].getNumSubscribers() == 0)
   443   std::vector<test_roscpp::LatencyMessagePtr> messages;
   444   for (uint32_t i = 0; i < 
streams_; ++i)
   446     test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
   447     msg->thread_index = thread_index;
   449     messages.push_back(msg);
   452   all_connected->wait();
   458   const uint32_t total_messages = count * streams;
   459   for (uint32_t j = 0; j < streams; ++j)
   462     pubs[j].publish(messages[j]);
   465   while (r.
latencies.size() < total_messages)
   470   ROS_INFO_STREAM(
"Publish thread [" << boost::this_thread::get_id() << 
"] exiting");
   472   boost::mutex::scoped_lock lock(
mutex_);
   478   ROS_INFO(
"Starting receive threads");
   485   std::vector<ros::Subscriber> subs;
   486   std::vector<ros::Publisher> pubs;
   488   for (uint32_t i = 0; i < 
streams_; ++i)
   490     std::stringstream ss;
   491     ss << 
"latency_perf_test_" << i;
   492     std::string sub_topic = ss.str();
   494     std::string pub_topic = ss.str();
   495     pubs.push_back(nh.
advertise<test_roscpp::LatencyMessage>(pub_topic, 0));
   499   boost::barrier all_connected(1 + sender_threads_);
   502   ROS_INFO(
"Starting publish threads");
   508   ROS_INFO(
"Waiting for all connections to establish");
   514     for (uint32_t i = 0; i < 
streams_; ++i)
   516       if (pubs[i].getNumSubscribers() == 0)
   523   all_connected.wait();
   524   ROS_INFO(
"All connections established");
   527   receive_spinner.
start();
   530   ROS_INFO(
"All publish threads done");
   546   double latency_total = 0.0;
   547   uint32_t latency_count = 0;
   549     std::vector<boost::shared_ptr<ThreadResult> >::iterator it = 
results_.begin();
   550     std::vector<boost::shared_ptr<ThreadResult> >::iterator end = 
results_.end();
   551     for (; it != end; ++it)
   556       std::vector<double>::iterator lat_it = tr.
latencies.begin();
   557       std::vector<double>::iterator lat_end = tr.
latencies.end();
   558       for (; lat_it != lat_end; ++lat_it)
   576 LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
   578   ROS_INFO_STREAM(
"*****************************************************");
   579   ROS_INFO_STREAM(
"Running latency test: "<< 
"receiver_threads [" << receiver_threads << 
"], sender_threads [" << sender_threads << 
"], streams [" << streams << 
"], count_per_stream [" << count_per_stream << 
"], message_size [" << message_size << 
"]");
   581   LatencyTest t(count_per_stream, streams, message_size, sender_threads, receiver_threads);
   619 : message_count_(message_count)
   626   test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
   627   reply->receipt_time = receipt_time.
toSec();
   636   test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
   650   ROS_INFO(
"Starting receive threads");
   662   ROS_INFO(
"Waiting for all connections to establish");
   671   ROS_INFO(
"All connections established");
   673   test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
   675   send_pub.publish(msg);
   688   double latency_total = 0.0;
   689   uint32_t latency_count = 0;
   693     for (; lat_it != lat_end; ++lat_it)
   705   ROS_INFO(
"Done collating results");
   712   ROS_INFO_STREAM(
"*****************************************************");
   713   ROS_INFO_STREAM(
"Running single-threaded latency test: message count[" << message_count << 
"]");
 ros::CallbackQueue queue_
boost::thread_specific_ptr< SendThreadResult > send_thread_result_
boost::shared_ptr< void const > VoidConstPtr
uint32_t count_per_stream_
STLatencyResult stlatency(uint32_t message_count)
void sendCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
static uint32_t serializedLength(typename boost::call_traits< T >::param_type t)
boost::thread_group send_threads_
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
void receiveCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
ros::WallTime first_send_time
std::vector< boost::shared_ptr< ThreadResult > > results_
uint32_t receiver_threads_
uint64_t messages_received
void sendThread(boost::barrier *all_connected)
std::vector< boost::shared_ptr< SendThreadResult > > send_results_
boost::thread_group send_threads_
ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
uint64_t messages_received
void sendCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub, uint32_t thread_index)
boost::thread_specific_ptr< ThreadResult > thread_result_
uint64_t count_per_stream
void callback(const test_roscpp::ThroughputMessageConstPtr &msg)
STLatencyTest(uint32_t message_count)
boost::thread_specific_ptr< ReceiveThreadResult > receive_thread_result_
void receiveCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
TransportHints & tcpNoDelay(bool nodelay=true)
uint32_t receiver_threads
void publish(const boost::shared_ptr< M > &message) const
std::vector< boost::shared_ptr< ReceiveThreadResult > > receive_results_
uint64_t total_bytes_received
std::vector< double > latencies
void setCallbackQueue(CallbackQueueInterface *queue)
ThroughputResult throughput(double duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
uint64_t total_message_count
void sendThread(boost::barrier *b, uint32_t i)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
std::vector< ros::Publisher > pubs_
std::vector< double > latencies
ros::CallbackQueue receive_queue_
uint64_t total_message_count
#define ROS_INFO_STREAM(args)
uint32_t receiver_threads
LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
ros::CallbackQueue receive_queue_
uint32_t receiver_threads_
uint32_t getNumSubscribers() const
void receiveThread(boost::barrier *all_started, boost::barrier *all_start, ros::WallTime *end_time)
ros::WallTime last_recv_time
uint64_t total_bytes_sent
boost::thread_group receive_threads_
uint64_t bytes_per_second