intra.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2009, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "perf_roscpp/intra.h"
36 #include "test_roscpp/ThroughputMessage.h"
37 #include "test_roscpp/LatencyMessage.h"
38 
39 #include <ros/ros.h>
40 #include <ros/callback_queue.h>
41 
42 #include <boost/thread.hpp>
43 
44 #include <vector>
45 
46 namespace perf_roscpp
47 {
48 namespace intra
49 {
50 
52 {
53 public:
54  ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
55 
57 
58 private:
59  void sendThread(boost::barrier* all_connected);
60  void receiveThread(boost::barrier* all_started, boost::barrier* all_start, ros::WallTime* end_time);
61 
62  void callback(const test_roscpp::ThroughputMessageConstPtr& msg);
63 
64  boost::mutex mutex_;
65 
67  {
68  uint64_t bytes_received;
70 
72  };
73  boost::thread_specific_ptr<ReceiveThreadResult> receive_thread_result_;
74  std::vector<boost::shared_ptr<ReceiveThreadResult> > receive_results_;
75 
77  {
78  uint64_t bytes_sent;
79  uint64_t messages_sent;
80 
82  };
83  boost::thread_specific_ptr<SendThreadResult> send_thread_result_;
84  std::vector<boost::shared_ptr<SendThreadResult> > send_results_;
85 
87  std::vector<ros::Publisher> pubs_;
88 
89  boost::thread_group receive_threads_;
90  boost::thread_group send_threads_;
91 
93  uint32_t streams_;
94  uint32_t message_size_;
95  uint32_t sender_threads_;
97 };
98 
99 ThroughputTest::ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
100 : test_duration_(test_duration)
101 , streams_(streams)
102 , message_size_(message_size)
103 , sender_threads_(sender_threads)
104 , receiver_threads_(receiver_threads)
105 {
106 }
107 
108 void ThroughputTest::callback(const test_roscpp::ThroughputMessageConstPtr& msg)
109 {
111 
113  ++r.messages_received;
114 
116 
117  //ROS_INFO_STREAM("Received message " << r.messages_received);
118 }
119 
120 void ThroughputTest::receiveThread(boost::barrier* all_ready, boost::barrier* all_start, ros::WallTime* end_time)
121 {
123 
125  r.messages_received = 0;
126  r.bytes_received = 0;
127 
128  ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] waiting for all threads to start");
129 
130  all_ready->wait();
131  all_start->wait();
132  ros::WallTime local_end_time = *end_time;
133 
134  ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] running");
135 
136  while (ros::WallTime::now() < local_end_time)
137  {
139  }
140 
141  ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] adding results and exiting");
142 
143  boost::mutex::scoped_lock lock(mutex_);
145 }
146 
147 void ThroughputTest::sendThread(boost::barrier* all_connected)
148 {
151 
152  ros::NodeHandle nh;
154  std::vector<ros::Publisher> pubs;
155  for (uint32_t i = 0; i < streams_; ++i)
156  {
157  std::stringstream ss;
158  ss << "throughput_perf_test_" << i;
159  pubs.push_back(nh.advertise<test_roscpp::ThroughputMessage>(ss.str(), 1));
160  }
161 
162  // Need to keep around the publishers so the connections don't go away
163  {
164  boost::mutex::scoped_lock lock(mutex_);
165  pubs_.insert(pubs_.end(), pubs.begin(), pubs.end());
166  }
167 
168  ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] waiting for connections");
169  bool cont = true;
170  while (cont)
171  {
172  cont = false;
173  for (uint32_t i = 0; i < streams_; ++i)
174  {
175  if (pubs[i].getNumSubscribers() == 0)
176  {
177  cont = true;
178  }
179  }
180  }
181 
182  test_roscpp::ThroughputMessagePtr msg(boost::make_shared<test_roscpp::ThroughputMessage>());
183  msg->array.resize(message_size_);
184 
185  all_connected->wait();
186 
187  ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] all connections established, beginning to publish");
188 
190  r.bytes_sent = 0;
191  r.messages_sent = 0;
192 
193  try
194  {
195  const uint32_t streams = streams_;
196  while (!boost::this_thread::interruption_requested())
197  {
198  for (uint32_t j = 0; j < streams; ++j)
199  {
200  pubs[j].publish(msg);
201 
202  ++r.messages_sent;
204  }
205 
206  boost::this_thread::yield();
207  }
208  }
209  catch (boost::thread_interrupted&)
210  {
211  }
212 
213  ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] exiting");
214 
215  boost::mutex::scoped_lock lock(mutex_);
217 }
218 
220 {
221  ROS_INFO("Starting receive threads");
224 
225  ros::NodeHandle nh;
227 
228  std::vector<ros::Subscriber> subs;
229  for (uint32_t i = 0; i < streams_; ++i)
230  {
231  std::stringstream ss;
232  ss << "throughput_perf_test_" << i;
233  subs.push_back(nh.subscribe(ss.str(), 0, &ThroughputTest::callback, this, ros::TransportHints().tcpNoDelay()));
234  }
235 
236  boost::barrier sender_all_connected(sender_threads_ + 1);
237  boost::barrier receiver_all_ready(receiver_threads_ + 1);
238  boost::barrier receiver_start(receiver_threads_ + 1);
239  ros::WallTime test_end_time;
240 
241  for (uint32_t i = 0; i < receiver_threads_; ++i)
242  {
243  receive_threads_.create_thread(boost::bind(&ThroughputTest::receiveThread, this, &receiver_all_ready, &receiver_start, &test_end_time));
244  }
245 
246  for (uint32_t i = 0; i < sender_threads_; ++i)
247  {
248  send_threads_.create_thread(boost::bind(&ThroughputTest::sendThread, this, &sender_all_connected));
249  }
250 
251  receiver_all_ready.wait();
253  receiver_start.wait();
254 
255  ros::WallTime pub_start = ros::WallTime::now();
256  sender_all_connected.wait();
257 
258  receive_threads_.join_all();
259  ROS_INFO("All receive threads done");
260 
261  send_threads_.interrupt_all();
262  send_threads_.join_all();
263  ROS_INFO("All publish threads done");
264 
265  ROS_INFO("Collating results");
266 
268 
269  r.bytes_per_second = 0;
271  r.messages_received = 0;
272  r.messages_sent = 0;
275  r.total_bytes_received = 0;
276  r.total_bytes_sent = 0;
278  r.streams = streams_;
279 
280  ros::WallTime rec_end;
281  {
282  std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator it = receive_results_.begin();
283  std::vector<boost::shared_ptr<ReceiveThreadResult> >::iterator end = receive_results_.end();
284  for (; it != end; ++it)
285  {
286  ReceiveThreadResult& rr = **it;
289 
290  rec_end = std::max(rec_end, rr.last_recv_time);
291  }
292  }
293 
294  {
295  std::vector<boost::shared_ptr<SendThreadResult> >::iterator it = send_results_.begin();
296  std::vector<boost::shared_ptr<SendThreadResult> >::iterator end = send_results_.end();
297  for (; it != end; ++it)
298  {
299  SendThreadResult& sr = **it;
302 
303  pub_start = std::min(pub_start, sr.first_send_time);
304  }
305  }
306 
307  r.bytes_per_second = (double)r.total_bytes_received / (rec_end - pub_start).toSec();
308 
309  ROS_INFO("Done collating results");
310 
311  return r;
312 }
313 
314 ThroughputResult throughput(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
315 {
316  ROS_INFO_STREAM("*****************************************************");
317  ROS_INFO_STREAM("Running throughput test: "<< "receiver_threads [" << receiver_threads << "], sender_threads [" << sender_threads << "], streams [" << streams << "], test_duration [" << test_duration << "], message_size [" << message_size << "]");
318 
319  ThroughputTest t(test_duration, streams, message_size, sender_threads, receiver_threads);
320  return t.run();
321 }
322 
332 
334 {
335 public:
336  LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads);
337 
338  LatencyResult run();
339 
340 private:
341  void sendThread(boost::barrier* b, uint32_t i);
342  void receiveThread();
343 
344  void receiveCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub);
345  void sendCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub, uint32_t thread_index);
346 
347  boost::mutex mutex_;
348 
350  {
351  uint64_t message_count;
352 
353  std::vector<double> latencies;
354  };
355  boost::thread_specific_ptr<ThreadResult> thread_result_;
356  std::vector<boost::shared_ptr<ThreadResult> > results_;
357 
359 
360  boost::thread_group send_threads_;
361 
363  uint32_t streams_;
364  uint32_t message_size_;
365  uint32_t sender_threads_;
367 };
368 
369 LatencyTest::LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
370 : count_per_stream_(count_per_stream)
371 , streams_(streams)
372 , message_size_(message_size)
373 , sender_threads_(sender_threads)
374 , receiver_threads_(receiver_threads)
375 {
376 }
377 
378 void LatencyTest::receiveCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub)
379 {
380  ros::WallTime receipt_time = ros::WallTime::now();
381  test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
382  reply->receipt_time = receipt_time.toSec();
383  pub.publish(reply);
384  //ROS_INFO("Receiver received message %d", msg->count);
385 }
386 
387 void LatencyTest::sendCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub, uint32_t thread_index)
388 {
389  if (msg->thread_index != thread_index)
390  {
391  return;
392  }
393 
394  thread_result_->latencies.push_back(msg->receipt_time - msg->publish_time);
395  ++thread_result_->message_count;
396 
397  test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
398  reply->publish_time = ros::WallTime::now().toSec();
399  ++reply->count;
400 
401  //ROS_INFO("Sender received return message %d", msg->count);
402 
403  if (reply->count < count_per_stream_ * streams_)
404  {
405  pub.publish(reply);
406  }
407 }
408 
409 void LatencyTest::sendThread(boost::barrier* all_connected, uint32_t thread_index)
410 {
411  thread_result_.reset(new ThreadResult);
413 
414  ros::NodeHandle nh;
415  ros::CallbackQueue queue;
416  nh.setCallbackQueue(&queue);
417  std::vector<ros::Publisher> pubs;
418  std::vector<ros::Subscriber> subs;
419  pubs.reserve(streams_);
420  for (uint32_t i = 0; i < streams_; ++i)
421  {
422  std::stringstream ss;
423  ss << "latency_perf_test_" << i;
424  pubs.push_back(nh.advertise<test_roscpp::LatencyMessage>(ss.str(), 0));
425 
426  ss << "_return";
427  subs.push_back(nh.subscribe<test_roscpp::LatencyMessage>(ss.str(), 0, boost::bind(&LatencyTest::sendCallback, this, _1, boost::ref(pubs[i]), thread_index), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay()));
428  }
429 
430  bool cont = true;
431  while (cont)
432  {
433  cont = false;
434  for (uint32_t i = 0; i < streams_; ++i)
435  {
436  if (pubs[i].getNumSubscribers() == 0)
437  {
438  cont = true;
439  }
440  }
441  }
442 
443  std::vector<test_roscpp::LatencyMessagePtr> messages;
444  for (uint32_t i = 0; i < streams_; ++i)
445  {
446  test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
447  msg->thread_index = thread_index;
448  msg->array.resize(message_size_);
449  messages.push_back(msg);
450  }
451 
452  all_connected->wait();
453 
454  r.message_count = 0;
455 
456  const uint32_t count = count_per_stream_;
457  const uint32_t streams = streams_;
458  const uint32_t total_messages = count * streams;
459  for (uint32_t j = 0; j < streams; ++j)
460  {
461  messages[j]->publish_time = ros::WallTime::now().toSec();
462  pubs[j].publish(messages[j]);
463  }
464 
465  while (r.latencies.size() < total_messages)
466  {
467  queue.callAvailable(ros::WallDuration(0.01));
468  }
469 
470  ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] exiting");
471 
472  boost::mutex::scoped_lock lock(mutex_);
474 }
475 
477 {
478  ROS_INFO("Starting receive threads");
479  LatencyResult r;
481 
482  ros::NodeHandle nh;
484 
485  std::vector<ros::Subscriber> subs;
486  std::vector<ros::Publisher> pubs;
487  pubs.reserve(streams_ * sender_threads_);
488  for (uint32_t i = 0; i < streams_; ++i)
489  {
490  std::stringstream ss;
491  ss << "latency_perf_test_" << i;
492  std::string sub_topic = ss.str();
493  ss << "_return";
494  std::string pub_topic = ss.str();
495  pubs.push_back(nh.advertise<test_roscpp::LatencyMessage>(pub_topic, 0));
496  subs.push_back(nh.subscribe<test_roscpp::LatencyMessage>(sub_topic, 0, boost::bind(&LatencyTest::receiveCallback, this, _1, boost::ref(pubs.back())), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay()));
497  }
498 
499  boost::barrier all_connected(1 + sender_threads_);
500 
501  ros::WallTime pub_start = ros::WallTime::now();
502  ROS_INFO("Starting publish threads");
503  for (uint32_t i = 0; i < sender_threads_; ++i)
504  {
505  send_threads_.create_thread(boost::bind(&LatencyTest::sendThread, this, &all_connected, i));
506  }
507 
508  ROS_INFO("Waiting for all connections to establish");
509 
510  bool cont = true;
511  while (cont)
512  {
513  cont = false;
514  for (uint32_t i = 0; i < streams_; ++i)
515  {
516  if (pubs[i].getNumSubscribers() == 0)
517  {
518  cont = true;
519  }
520  }
521  }
522 
523  all_connected.wait();
524  ROS_INFO("All connections established");
525 
527  receive_spinner.start();
528 
529  send_threads_.join_all();
530  ROS_INFO("All publish threads done");
531 
532  ROS_INFO("Collating results");
533 
535 
536  r.latency_avg = 0;
537  r.latency_max = 0;
538  r.latency_min = 9999999999999ULL;
539  r.total_message_count = 0;
544  r.streams = streams_;
545 
546  double latency_total = 0.0;
547  uint32_t latency_count = 0;
548  {
549  std::vector<boost::shared_ptr<ThreadResult> >::iterator it = results_.begin();
550  std::vector<boost::shared_ptr<ThreadResult> >::iterator end = results_.end();
551  for (; it != end; ++it)
552  {
553  ThreadResult& tr = **it;
555 
556  std::vector<double>::iterator lat_it = tr.latencies.begin();
557  std::vector<double>::iterator lat_end = tr.latencies.end();
558  for (; lat_it != lat_end; ++lat_it)
559  {
560  double latency = *lat_it;
561  r.latency_min = std::min(r.latency_min, latency);
562  r.latency_max = std::max(r.latency_max, latency);
563  ++latency_count;
564  latency_total += latency;
565  }
566  }
567  }
568 
569  r.latency_avg = latency_total / latency_count;
570 
571  ROS_INFO("Done collating results");
572 
573  return r;
574 }
575 
576 LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
577 {
578  ROS_INFO_STREAM("*****************************************************");
579  ROS_INFO_STREAM("Running latency test: "<< "receiver_threads [" << receiver_threads << "], sender_threads [" << sender_threads << "], streams [" << streams << "], count_per_stream [" << count_per_stream << "], message_size [" << message_size << "]");
580 
581  LatencyTest t(count_per_stream, streams, message_size, sender_threads, receiver_threads);
582  return t.run();
583 }
584 
594 
596 {
597 public:
598  STLatencyTest(uint32_t message_count);
599 
601 
602 private:
603 
604  void receiveCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub);
605  void sendCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub);
606 
607  struct Result
608  {
609  std::vector<double> latencies;
610  };
612 
614 
615  uint32_t message_count_;
616 };
617 
618 STLatencyTest::STLatencyTest(uint32_t message_count)
619 : message_count_(message_count)
620 {
621 }
622 
623 void STLatencyTest::receiveCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub)
624 {
625  ros::WallTime receipt_time = ros::WallTime::now();
626  test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
627  reply->receipt_time = receipt_time.toSec();
628  pub.publish(reply);
629  //ROS_INFO("Receiver received message %d", msg->count);
630 }
631 
632 void STLatencyTest::sendCallback(const test_roscpp::LatencyMessageConstPtr& msg, ros::Publisher& pub)
633 {
634  result_.latencies.push_back(msg->receipt_time - msg->publish_time);
635 
636  test_roscpp::LatencyMessagePtr reply = boost::const_pointer_cast<test_roscpp::LatencyMessage>(msg);
637  reply->publish_time = ros::WallTime::now().toSec();
638  ++reply->count;
639 
640  //ROS_INFO("Sender received return message %d", msg->count);
641 
642  if (reply->count < message_count_)
643  {
644  pub.publish(reply);
645  }
646 }
647 
649 {
650  ROS_INFO("Starting receive threads");
651  STLatencyResult r;
653 
654  ros::NodeHandle nh;
656 
657  ros::Publisher recv_pub = nh.advertise<test_roscpp::LatencyMessage>("stlatency_perf_test_return", 0);
658  ros::Subscriber recv_sub = nh.subscribe<test_roscpp::LatencyMessage>("stlatency_perf_test", 0, boost::bind(&STLatencyTest::receiveCallback, this, _1, boost::ref(recv_pub)), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay());
659  ros::Publisher send_pub = nh.advertise<test_roscpp::LatencyMessage>("stlatency_perf_test", 0);
660  ros::Subscriber send_sub = nh.subscribe<test_roscpp::LatencyMessage>("stlatency_perf_test_return", 0, boost::bind(&STLatencyTest::sendCallback, this, _1, boost::ref(send_pub)), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay());
661 
662  ROS_INFO("Waiting for all connections to establish");
663 
664  bool cont = true;
665  while (cont)
666  {
667  cont = recv_pub.getNumSubscribers() == 0 || send_pub.getNumSubscribers() == 0;
668  ros::WallDuration(0.001).sleep();
669  }
670 
671  ROS_INFO("All connections established");
672 
673  test_roscpp::LatencyMessagePtr msg(boost::make_shared<test_roscpp::LatencyMessage>());
674  msg->publish_time = ros::WallTime::now().toSec();
675  send_pub.publish(msg);
676  while (msg->count < message_count_)
677  {
679  }
680 
682 
683  r.latency_avg = 0;
684  r.latency_max = 0;
685  r.latency_min = 9999999999999ULL;
687 
688  double latency_total = 0.0;
689  uint32_t latency_count = 0;
690  {
691  std::vector<double>::iterator lat_it = result_.latencies.begin();
692  std::vector<double>::iterator lat_end = result_.latencies.end();
693  for (; lat_it != lat_end; ++lat_it)
694  {
695  double latency = *lat_it;
696  r.latency_min = std::min(r.latency_min, latency);
697  r.latency_max = std::max(r.latency_max, latency);
698  ++latency_count;
699  latency_total += latency;
700  }
701  }
702 
703  r.latency_avg = latency_total / latency_count;
704 
705  ROS_INFO("Done collating results");
706 
707  return r;
708 }
709 
710 STLatencyResult stlatency(uint32_t message_count)
711 {
712  ROS_INFO_STREAM("*****************************************************");
713  ROS_INFO_STREAM("Running single-threaded latency test: message count[" << message_count << "]");
714 
715  STLatencyTest t(message_count);
716  return t.run();
717 }
718 
719 } // namespace intra
720 } // namespace perf_roscpp
ros::CallbackQueue queue_
Definition: intra.cpp:86
boost::thread_specific_ptr< SendThreadResult > send_thread_result_
Definition: intra.cpp:83
boost::shared_ptr< void const > VoidConstPtr
STLatencyResult stlatency(uint32_t message_count)
Definition: intra.cpp:710
void sendCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
Definition: intra.cpp:632
void publish(const boost::shared_ptr< M > &message) const
LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
Definition: intra.cpp:369
static uint32_t serializedLength(typename boost::call_traits< T >::param_type t)
boost::thread_group send_threads_
Definition: intra.cpp:90
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
void receiveCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
Definition: intra.cpp:378
std::vector< boost::shared_ptr< ThreadResult > > results_
Definition: intra.cpp:356
void sendThread(boost::barrier *all_connected)
Definition: intra.cpp:147
std::vector< boost::shared_ptr< SendThreadResult > > send_results_
Definition: intra.cpp:84
boost::thread_group send_threads_
Definition: intra.cpp:360
CallOneResult callOne()
ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
Definition: intra.cpp:99
void sendCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub, uint32_t thread_index)
Definition: intra.cpp:387
boost::thread_specific_ptr< ThreadResult > thread_result_
Definition: intra.cpp:355
void callback(const test_roscpp::ThroughputMessageConstPtr &msg)
Definition: intra.cpp:108
STLatencyTest(uint32_t message_count)
Definition: intra.cpp:618
boost::thread_specific_ptr< ReceiveThreadResult > receive_thread_result_
Definition: intra.cpp:73
void receiveCallback(const test_roscpp::LatencyMessageConstPtr &msg, ros::Publisher &pub)
Definition: intra.cpp:623
TransportHints & tcpNoDelay(bool nodelay=true)
std::vector< boost::shared_ptr< ReceiveThreadResult > > receive_results_
Definition: intra.cpp:74
bool sleep() const
#define ROS_INFO(...)
void setCallbackQueue(CallbackQueueInterface *queue)
ThroughputResult throughput(double duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
Definition: intra.cpp:314
void sendThread(boost::barrier *b, uint32_t i)
Definition: intra.cpp:409
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
std::vector< ros::Publisher > pubs_
Definition: intra.cpp:87
uint32_t getNumSubscribers() const
ros::CallbackQueue receive_queue_
Definition: intra.cpp:613
#define ROS_INFO_STREAM(args)
static WallTime now()
LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads)
Definition: intra.cpp:576
ros::CallbackQueue receive_queue_
Definition: intra.cpp:358
void receiveThread(boost::barrier *all_started, boost::barrier *all_start, ros::WallTime *end_time)
Definition: intra.cpp:120
ros::WallTime t
boost::thread_group receive_threads_
Definition: intra.cpp:89


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:46