player.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2008, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Willow Garage, Inc. nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 
35 #include "rosbag/player.h"
37 #include "rosbag/view.h"
38 
39 #if !defined(_MSC_VER)
40  #include <sys/select.h>
41 #endif
42 
43 #include <boost/format.hpp>
44 
45 #include "rosgraph_msgs/Clock.h"
46 
47 #include <set>
48 
49 using std::map;
50 using std::pair;
51 using std::string;
52 using std::vector;
53 using boost::shared_ptr;
54 using ros::Exception;
55 
56 namespace rosbag {
57 
58 bool isLatching(const ConnectionInfo* c)
59 {
60  ros::M_string::const_iterator header_iter = c->header->find("latching");
61  return (header_iter != c->header->end() && header_iter->second == "1");
62 }
63 
64 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size, const std::string& prefix) {
65  ros::AdvertiseOptions opts(prefix + c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
66  opts.latch = isLatching(c);
67  return opts;
68 }
69 
70 
71 ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& m, uint32_t queue_size, const std::string& prefix) {
72  return ros::AdvertiseOptions(prefix + m.getTopic(), queue_size, m.getMD5Sum(), m.getDataType(), m.getMessageDefinition());
73 }
74 
75 // PlayerOptions
76 
78  prefix(""),
79  quiet(false),
80  start_paused(false),
81  at_once(false),
82  bag_time(false),
83  bag_time_frequency(0.0),
84  time_scale(1.0),
85  queue_size(0),
86  advertise_sleep(0.2),
87  try_future(false),
88  has_time(false),
89  loop(false),
90  time(0.0f),
91  has_duration(false),
92  duration(0.0f),
93  keep_alive(false),
94  wait_for_subscribers(false),
95  rate_control_topic(""),
96  rate_control_max_delay(1.0f),
97  skip_empty(ros::DURATION_MAX)
98 {
99 }
100 
102  if (bags.size() == 0)
103  throw Exception("You must specify at least one bag file to play from");
104  if (has_duration && duration <= 0.0)
105  throw Exception("Invalid duration, must be > 0.0");
106 }
107 
108 // Player
109 
110 Player::Player(PlayerOptions const& options) :
111  options_(options),
112  paused_(options.start_paused),
113  // If we were given a list of topics to pause on, then go into that mode
114  // by default (it can be toggled later via 't' from the keyboard).
115  pause_for_topics_(options_.pause_topics.size() > 0),
116  pause_change_requested_(false),
117  requested_pause_state_(false),
118  terminal_modified_(false)
119 {
120  ros::NodeHandle private_node_handle("~");
121  pause_service_ = private_node_handle.advertiseService("pause_playback", &Player::pauseCallback, this);
122 }
123 
125  for (shared_ptr<Bag>& bag : bags_)
126  bag->close();
127 
128  restoreTerminal();
129 }
130 
132  options_.check();
133 
134  // Open all the bag files
135  for (string const& filename : options_.bags) {
136  ROS_INFO("Opening %s", filename.c_str());
137 
138  try
139  {
140  shared_ptr<Bag> bag(boost::make_shared<Bag>());
141  bag->open(filename, bagmode::Read);
142  bags_.push_back(bag);
143  }
144  catch (const BagUnindexedException& ex) {
145  std::cerr << "Bag file " << filename << " is unindexed. Run rosbag reindex." << std::endl;
146  return;
147  }
148  }
149 
150  setupTerminal();
151 
152  if (!node_handle_.ok())
153  return;
154 
155  if (!options_.prefix.empty())
156  {
157  ROS_INFO_STREAM("Using prefix '" << options_.prefix << "'' for topics ");
158  }
159 
160  if (!options_.quiet)
161  puts("");
162 
163  // Publish all messages in the bags
164  View full_view;
165  for (shared_ptr<Bag>& bag : bags_)
166  full_view.addQuery(*bag);
167 
168  const auto full_initial_time = full_view.getBeginTime();
169 
170  const auto initial_time = full_initial_time + ros::Duration(options_.time);
171 
172  ros::Time finish_time = ros::TIME_MAX;
174  {
175  finish_time = initial_time + ros::Duration(options_.duration);
176  }
177 
178  View view;
179  TopicQuery topics(options_.topics);
180 
181  if (options_.topics.empty())
182  {
183  for (shared_ptr<Bag>& bag : bags_)
184  view.addQuery(*bag, initial_time, finish_time);
185  } else {
186  for (shared_ptr<Bag>& bag : bags_)
187  view.addQuery(*bag, topics, initial_time, finish_time);
188  }
189 
190  if (view.size() == 0)
191  {
192  std::cerr << "No messages to play on specified topics. Exiting." << std::endl;
193  ros::shutdown();
194  return;
195  }
196 
197  // Advertise all of our messages
198  for (const ConnectionInfo* c : view.getConnections())
199  {
200  advertise(c);
201  }
202 
203  if (options_.rate_control_topic != "")
204  {
205  std::cout << "Creating rate control topic subscriber..." << std::flush;
206 
207  boost::shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
210  ops.queue_size = 10;
211  ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
212  ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
213  ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
215  boost::bind(&Player::updateRateTopicTime, this, _1));
216 
218 
219  std::cout << " done." << std::endl;
220  }
221 
222 
223  std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
225  std::cout << " done." << std::endl;
226 
227  std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
228 
229  // Publish last message from latch topics if the options_.time > 0.0:
230  if (options_.time > 0.0) {
231  // Retrieve all the latch topics before the initial time and create publishers if needed:
232  View full_latch_view;
233 
234  if (options_.topics.empty()) {
235  for (const auto& bag : bags_) {
236  full_latch_view.addQuery(*bag, full_initial_time, initial_time);
237  }
238  } else {
239  for (const auto& bag : bags_) {
240  full_latch_view.addQuery(*bag, topics, full_initial_time, initial_time);
241  }
242  }
243 
244  std::set<std::pair<std::string, std::string>> latch_topics;
245  for (const auto& c : full_latch_view.getConnections()) {
246  if (isLatching(c)) {
247  const auto header_iter = c->header->find("callerid");
248  const auto callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
249 
250  latch_topics.emplace(callerid, c->topic);
251 
252  advertise(c);
253  }
254  }
255 
258  }
259 
260  // Publish the last message of each latch topic per callerid:
261  for (const auto& item : latch_topics) {
262  const auto& callerid = item.first;
263  const auto& topic = item.second;
264 
265  View latch_view;
266  for (const auto& bag : bags_) {
267  latch_view.addQuery(*bag, TopicQuery(topic), full_initial_time, initial_time);
268  }
269 
270  auto last_message = latch_view.end();
271  for (auto iter = latch_view.begin(); iter != latch_view.end(); ++iter) {
272  if (iter->getCallerId() == callerid) {
273  last_message = iter;
274  }
275  }
276 
277  if (last_message != latch_view.end()) {
278  const auto publisher = publishers_.find(callerid + topic);
279  ROS_ASSERT(publisher != publishers_.end());
280 
281  publisher->second.publish(*last_message);
282  }
283  }
284  } else if (options_.wait_for_subscribers) {
286  }
287 
288  while (true) {
289  // Set up our time_translator and publishers
290 
292 
293  start_time_ = view.begin()->getTime();
295  bag_length_ = view.getEndTime() - view.getBeginTime();
296 
297  // Set the last rate control to now, so the program doesn't start delayed.
299 
301 
304 
305 
307  if (options_.bag_time)
309  else
311 
312  paused_time_ = now_wt;
313 
314  // Call do-publish for each message
315  for (const MessageInstance& m : view) {
316  if (!node_handle_.ok())
317  break;
318 
319  doPublish(m);
320  }
321 
322  if (options_.keep_alive)
323  while (node_handle_.ok())
324  doKeepAlive();
325 
326  if (!node_handle_.ok()) {
327  std::cout << std::endl;
328  break;
329  }
330  if (!options_.loop) {
331  std::cout << std::endl << "Done." << std::endl;
332  break;
333  }
334  }
335 
336  ros::shutdown();
337 }
338 
340 {
342  std::string def = ssmsg->getMessageDefinition();
343  size_t length = ros::serialization::serializationLength(*ssmsg);
344 
345  // Check the message definition.
346  std::istringstream f(def);
347  std::string s;
348  bool flag = false;
349  while(std::getline(f, s, '\n')) {
350  if (!s.empty() && s.find("#") != 0) {
351  // Does not start with #, is not a comment.
352  if (s.find("Header ") == 0) {
353  flag = true;
354  }
355  break;
356  }
357  }
358  // If the header is not the first element in the message according to the definition, throw an error.
359  if (!flag) {
360  std::cout << std::endl << "WARNING: Rate control topic is bad, header is not first. MSG may be malformed." << std::endl;
361  return;
362  }
363 
364  std::vector<uint8_t> buffer(length);
365  ros::serialization::OStream ostream(&buffer[0], length);
367 
368  // Assuming that the header is the first several bytes of the message.
369  //uint32_t header_sequence_id = buffer[0] | (uint32_t)buffer[1] << 8 | (uint32_t)buffer[2] << 16 | (uint32_t)buffer[3] << 24;
370  int32_t header_timestamp_sec = buffer[4] | (uint32_t)buffer[5] << 8 | (uint32_t)buffer[6] << 16 | (uint32_t)buffer[7] << 24;
371  int32_t header_timestamp_nsec = buffer[8] | (uint32_t)buffer[9] << 8 | (uint32_t)buffer[10] << 16 | (uint32_t)buffer[11] << 24;
372 
373  last_rate_control_ = ros::Time(header_timestamp_sec, header_timestamp_nsec);
374 }
375 
377 {
378  if (!options_.quiet) {
379 
380  ros::Time current_time = time_publisher_.getTime();
381  ros::Duration d = current_time - start_time_;
382 
383 
384  if (paused_)
385  {
386  printf("\r [PAUSED ] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
387  }
388  else if (delayed_)
389  {
390  ros::Duration time_since_rate = std::max(ros::Time::now() - last_rate_control_, ros::Duration(0));
391  printf("\r [DELAYED] Bag Time: %13.6f Duration: %.6f / %.6f Delay: %.2f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec(), time_since_rate.toSec());
392  }
393  else
394  {
395  printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
396  }
397  fflush(stdout);
398  }
399 }
400 
401 bool Player::pauseCallback(std_srvs::SetBool::Request &req, std_srvs::SetBool::Response &res)
402 {
403  pause_change_requested_ = (req.data != paused_);
404  requested_pause_state_ = req.data;
405 
406  res.success = pause_change_requested_;
407 
408  if (res.success)
409  {
410  res.message = std::string("Playback is now ") + (requested_pause_state_ ? "paused" : "resumed");
411  }
412  else
413  {
414  res.message = std::string("Bag is already ") + (requested_pause_state_ ? "paused." : "running.");
415  }
416 
417  return true;
418 }
419 
420 void Player::processPause(const bool paused, ros::WallTime &horizon)
421 {
422  paused_ = paused;
423 
424  if (paused_)
425  {
427  }
428  else
429  {
430  // Make sure time doesn't shift after leaving pause.
433 
435 
436  horizon += shift;
437  time_publisher_.setWCHorizon(horizon);
438  }
439 }
440 
442 {
443  bool all_topics_subscribed = false;
444  std::cout << "Waiting for subscribers." << std::endl;
445  while (!all_topics_subscribed) {
446  all_topics_subscribed = std::all_of(
447  std::begin(publishers_), std::end(publishers_),
448  [](const PublisherMap::value_type& pub) {
449  return pub.second.getNumSubscribers() > 0;
450  });
451  ros::WallDuration(0.1).sleep();
452  }
453  std::cout << "Finished waiting for subscribers." << std::endl;
454 }
455 
457 {
458  ros::M_string::const_iterator header_iter = c->header->find("callerid");
459  std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
460 
461  string callerid_topic = callerid + c->topic;
462 
463  map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
464  if (pub_iter == publishers_.end()) {
466 
468  publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
469 
470  pub_iter = publishers_.find(callerid_topic);
471  }
472 }
473 
475  string const& topic = m.getTopic();
476  ros::Time const& time = m.getTime();
477  string callerid = m.getCallerId();
478 
479  ros::Time translated = time_translator_.translate(time);
480  ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
481 
483  time_publisher_.setWCHorizon(horizon);
484 
485  string callerid_topic = callerid + topic;
486 
487  map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
488  ROS_ASSERT(pub_iter != publishers_.end());
489 
490  // Update subscribers.
491  ros::spinOnce();
492 
493  // If immediate specified, play immediately
494  if (options_.at_once) {
496  pub_iter->second.publish(m);
497  printTime();
498  return;
499  }
500 
501  // If skip_empty is specified, skip this region and shift.
503  {
505 
506  ros::WallDuration shift = ros::WallTime::now() - horizon ;
508  horizon += shift;
509  time_publisher_.setWCHorizon(horizon);
510  (pub_iter->second).publish(m);
511  printTime();
512  return;
513  }
514 
515  if (pause_for_topics_)
516  {
517  for (std::vector<std::string>::iterator i = options_.pause_topics.begin();
518  i != options_.pause_topics.end();
519  ++i)
520  {
521  if (topic == *i)
522  {
523  paused_ = true;
525  }
526  }
527  }
528 
529  // Check if the rate control topic has posted recently enough to continue, or if a delay is needed.
530  // Delayed is separated from paused to allow more verbose printing.
531  if (rate_control_sub_ != NULL) {
533  delayed_ = true;
535  }
536  }
537 
539  {
540  bool charsleftorpaused = true;
541  while (charsleftorpaused && node_handle_.ok())
542  {
543  ros::spinOnce();
544 
546  {
548  pause_change_requested_ = false;
549  }
550 
551  switch (readCharFromStdin()){
552  case ' ':
553  processPause(!paused_, horizon);
554  break;
555  case 's':
556  if (paused_) {
558 
559  ros::WallDuration shift = ros::WallTime::now() - horizon ;
561 
563 
564  horizon += shift;
565  time_publisher_.setWCHorizon(horizon);
566 
567  (pub_iter->second).publish(m);
568 
569  printTime();
570  return;
571  }
572  break;
573  case 't':
575  break;
576  case EOF:
577  if (paused_)
578  {
579  printTime();
581  ros::spinOnce();
582  }
583  else if (delayed_)
584  {
585  printTime();
587  ros::spinOnce();
588  // You need to check the rate here too.
590  delayed_ = false;
591  // Make sure time doesn't shift after leaving delay.
594 
596 
597  horizon += shift;
598  time_publisher_.setWCHorizon(horizon);
599  }
600  }
601  else
602  charsleftorpaused = false;
603  }
604  }
605 
606  printTime();
608  ros::spinOnce();
609  }
610 
611  pub_iter->second.publish(m);
612 }
613 
614 
616  //Keep pushing ourself out in 10-sec increments (avoids fancy math dealing with the end of time)
617  ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
618 
619  ros::Time translated = time_translator_.translate(time);
620  ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
621 
623  time_publisher_.setWCHorizon(horizon);
624 
625  if (options_.at_once) {
626  return;
627  }
628 
629  // If we're done and just staying alive, don't watch the rate control topic. We aren't publishing anyway.
630  delayed_ = false;
631 
633  {
634  bool charsleftorpaused = true;
635  while (charsleftorpaused && node_handle_.ok())
636  {
637  switch (readCharFromStdin()){
638  case ' ':
639  paused_ = !paused_;
640  if (paused_) {
642  }
643  else
644  {
645  // Make sure time doesn't shift after leaving pause.
648 
650 
651  horizon += shift;
652  time_publisher_.setWCHorizon(horizon);
653  }
654  break;
655  case EOF:
656  if (paused_)
657  {
658  printTime();
660  ros::spinOnce();
661  }
662  else
663  charsleftorpaused = false;
664  }
665  }
666 
667  printTime();
669  ros::spinOnce();
670  }
671 }
672 
673 
674 
676  if (terminal_modified_)
677  return;
678 
679 #if defined(_MSC_VER)
680  input_handle = GetStdHandle(STD_INPUT_HANDLE);
681  if (input_handle == INVALID_HANDLE_VALUE)
682  {
683  std::cout << "Failed to set up standard input handle." << std::endl;
684  return;
685  }
686  if (! GetConsoleMode(input_handle, &stdin_set) )
687  {
688  std::cout << "Failed to save the console mode." << std::endl;
689  return;
690  }
691  // don't actually need anything but the default, alternatively try this
692  //DWORD event_mode = ENABLE_WINDOW_INPUT | ENABLE_MOUSE_INPUT;
693  //if (! SetConsoleMode(input_handle, event_mode) )
694  //{
695  // std::cout << "Failed to set the console mode." << std::endl;
696  // return;
697  //}
698  terminal_modified_ = true;
699 #else
700  const int fd = fileno(stdin);
701  termios flags;
702  tcgetattr(fd, &orig_flags_);
703  flags = orig_flags_;
704  flags.c_lflag &= ~ICANON; // set raw (unset canonical modes)
705  flags.c_cc[VMIN] = 0; // i.e. min 1 char for blocking, 0 chars for non-blocking
706  flags.c_cc[VTIME] = 0; // block if waiting for char
707  tcsetattr(fd, TCSANOW, &flags);
708 
709  FD_ZERO(&stdin_fdset_);
710  FD_SET(fd, &stdin_fdset_);
711  maxfd_ = fd + 1;
712  terminal_modified_ = true;
713 #endif
714 }
715 
717  if (!terminal_modified_)
718  return;
719 
720 #if defined(_MSC_VER)
721  SetConsoleMode(input_handle, stdin_set);
722 #else
723  const int fd = fileno(stdin);
724  tcsetattr(fd, TCSANOW, &orig_flags_);
725 #endif
726  terminal_modified_ = false;
727 }
728 
730 #ifdef __APPLE__
731  fd_set testfd;
732  FD_COPY(&stdin_fdset_, &testfd);
733 #elif !defined(_MSC_VER)
734  fd_set testfd = stdin_fdset_;
735 #endif
736 
737 #if defined(_MSC_VER)
738  DWORD events = 0;
739  INPUT_RECORD input_record[1];
740  DWORD input_size = 1;
741  BOOL b = GetNumberOfConsoleInputEvents(input_handle, &events);
742  if (b && events > 0)
743  {
744  b = ReadConsoleInput(input_handle, input_record, input_size, &events);
745  if (b)
746  {
747  for (unsigned int i = 0; i < events; ++i)
748  {
749  if (input_record[i].EventType & KEY_EVENT & input_record[i].Event.KeyEvent.bKeyDown)
750  {
751  CHAR ch = input_record[i].Event.KeyEvent.uChar.AsciiChar;
752  return ch;
753  }
754  }
755  }
756  }
757  return EOF;
758 #else
759  timeval tv;
760  tv.tv_sec = 0;
761  tv.tv_usec = 0;
762  if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
763  return EOF;
764  return getc(stdin);
765 #endif
766 }
767 
768 TimePublisher::TimePublisher() : time_scale_(1.0)
769 {
770  setPublishFrequency(-1.0);
771  time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
772 }
773 
774 void TimePublisher::setPublishFrequency(double publish_frequency)
775 {
776  publish_frequency_ = publish_frequency;
777 
778  do_publish_ = (publish_frequency > 0.0);
779 
780  wall_step_.fromSec(1.0 / publish_frequency);
781 }
782 
783 void TimePublisher::setTimeScale(double time_scale)
784 {
785  time_scale_ = time_scale;
786 }
787 
789 {
790  horizon_ = horizon;
791 }
792 
794 {
795  wc_horizon_ = horizon;
796 }
797 
799 {
800  current_ = time;
801 }
802 
804 {
805  return current_;
806 }
807 
809 {
810  if (do_publish_)
811  {
812  rosgraph_msgs::Clock pub_msg;
813 
815  ros::WallTime done = t + duration;
816 
817  while (t < done && t < wc_horizon_)
818  {
819  ros::WallDuration leftHorizonWC = wc_horizon_ - t;
820 
821  ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
822  d *= time_scale_;
823 
824  current_ = horizon_ - d;
825 
826  if (current_ >= horizon_)
827  current_ = horizon_;
828 
829  if (t >= next_pub_)
830  {
831  pub_msg.clock = current_;
832  time_pub_.publish(pub_msg);
833  next_pub_ = t + wall_step_;
834  }
835 
836  ros::WallTime target = done;
837  if (target > wc_horizon_)
838  target = wc_horizon_;
839  if (target > next_pub_)
840  target = next_pub_;
841 
843 
844  t = ros::WallTime::now();
845  }
846  } else {
847 
849 
850  ros::WallDuration leftHorizonWC = wc_horizon_ - t;
851 
852  ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
853  d *= time_scale_;
854 
855  current_ = horizon_ - d;
856 
857  if (current_ >= horizon_)
858  current_ = horizon_;
859 
860  ros::WallTime target = ros::WallTime::now() + duration;
861 
862  if (target > wc_horizon_)
863  target = wc_horizon_;
864 
866  }
867 }
868 
870 {
871  if (do_publish_)
872  {
873  current_ = horizon_;
874 
875  rosgraph_msgs::Clock pub_msg;
876 
877  pub_msg.clock = current_;
878  time_pub_.publish(pub_msg);
879 
881  next_pub_ = t + wall_step_;
882  } else {
883  current_ = horizon_;
884  }
885 }
886 
888 {
889  if (do_publish_)
890  {
891  rosgraph_msgs::Clock pub_msg;
892 
894  ros::WallTime done = t + duration;
895 
896  while ( t < done )
897  {
898  if (t > next_pub_)
899  {
900  pub_msg.clock = current_;
901  time_pub_.publish(pub_msg);
902  next_pub_ = t + wall_step_;
903  }
904 
905  ros::WallTime target = done;
906 
907  if (target > next_pub_)
908  target = next_pub_;
909 
911 
912  t = ros::WallTime::now();
913  }
914  } else {
915  duration.sleep();
916  }
917 }
918 
920 {
921  return ros::WallTime::now() > wc_horizon_;
922 }
923 
924 } // namespace rosbag
void publish()
Definition: player.cpp:131
d
const boost::shared_ptr< ConstMessage > & getConstMessage() const
termios orig_flags_
Definition: player.h:230
static void write(Stream &stream, typename boost::call_traits< T >::param_type t)
void setupTerminal()
Definition: player.cpp:675
void setWCHorizon(const ros::WallTime &horizon)
Definition: player.cpp:793
bool wait_for_subscribers
Definition: player.h:96
ros::WallTime next_pub_
Definition: player.h:156
TimeTranslator time_translator_
Definition: player.h:235
std::string rate_control_topic
Definition: player.h:97
std::vector< const ConnectionInfo *> getConnections()
void setPublishFrequency(double publish_frequency)
Definition: player.cpp:774
f
ros::Time translate(ros::Time const &t)
PlayerOptions options_
Definition: player.h:201
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
void doPublish(rosbag::MessageInstance const &m)
Definition: player.cpp:474
std::vector< std::string > topics
Definition: player.h:102
ros::Time const & getTime() const
XmlRpcServer s
bool requested_pause_state_
Definition: player.h:214
std::vector< std::string > pause_topics
Definition: player.h:103
ros::WallDuration wall_step_
Definition: player.h:154
void doKeepAlive()
Definition: player.cpp:615
ros::Publisher time_pub_
Definition: player.h:152
std::string prefix
Definition: player.h:80
std::vector< std::string > bags
Definition: player.h:101
static bool sleepUntil(const WallTime &end)
ros::WallTime wc_horizon_
Definition: player.h:158
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
ros::Time const & getTime() const
Definition: player.cpp:803
float rate_control_max_delay
Definition: player.h:98
void setTranslatedStartTime(ros::Time const &t)
Increments the translated start time by shift. Useful for pausing.
ros::WallDuration advertise_sleep
Definition: player.h:88
double bag_time_frequency
Definition: player.h:85
uint32_t size()
void publish(const boost::shared_ptr< M > &message) const
SubscriptionCallbackHelperPtr helper
std::string const & getMD5Sum() const
ros::Time getBeginTime()
void addQuery(Bag const &bag, ros::Time const &start_time=ros::TIME_MIN, ros::Time const &end_time=ros::TIME_MAX)
WallDuration & fromSec(double t)
void setTimeScale(double const &s)
#define ROS_INFO(...)
ros::NodeHandle node_handle_
Definition: player.h:203
double publish_frequency_
Definition: player.h:148
ros::Time horizon_
Definition: player.h:159
void runClock(const ros::WallDuration &duration)
Definition: player.cpp:808
void setRealStartTime(ros::Time const &t)
ros::Subscriber rate_control_sub_
Definition: player.h:216
ROSTIME_DECL const Time TIME_MAX
bool paused_
Definition: player.h:207
void setTimeScale(double time_scale)
Definition: player.cpp:783
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
std::vector< boost::shared_ptr< Bag > > bags_
Definition: player.h:221
bool sleep() const
ros::ServiceServer pause_service_
Definition: player.h:205
void restoreTerminal()
Definition: player.cpp:716
void advertise(const ConnectionInfo *c)
Definition: player.cpp:456
void printTime()
Definition: player.cpp:376
bool delayed_
Definition: player.h:208
ros::WallTime paused_time_
Definition: player.h:219
bool pauseCallback(std_srvs::SetBool::Request &req, std_srvs::SetBool::Response &res)
Definition: player.cpp:401
Player(PlayerOptions const &options)
Definition: player.cpp:110
bool pause_for_topics_
Definition: player.h:210
fd_set stdin_fdset_
Definition: player.h:231
std::string const & getDataType() const
bool terminal_modified_
Definition: player.h:225
boost::shared_ptr< ros::M_string > header
void waitForSubscribers() const
Definition: player.cpp:441
std::string const & getMessageDefinition() const
uint32_t serializationLength(const T &t)
ros::Time getEndTime()
#define ROS_INFO_STREAM(args)
static WallTime now()
iterator end()
ros::NodeHandle node_handle_
Definition: player.h:151
iterator begin()
void setTime(const ros::Time &time)
Definition: player.cpp:798
ros::Duration bag_length_
Definition: player.h:239
void stepClock()
Step the clock to the horizon.
Definition: player.cpp:869
static Time now()
ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const &msg, uint32_t queue_size, const std::string &prefix="")
Helper function to create AdvertiseOptions from a MessageInstance.
Definition: player.cpp:71
void shift(ros::Duration const &d)
Increments the translated start time by shift. Useful for pausing.
ros::Time last_rate_control_
Definition: player.h:217
ROSCPP_DECL void shutdown()
int readCharFromStdin()
Definition: player.cpp:729
PublisherMap publishers_
Definition: player.h:222
ros::Duration skip_empty
Definition: player.h:99
bool isLatching(const ConnectionInfo *c)
Definition: player.cpp:58
std::string getCallerId() const
std::string const & getTopic() const
bool pause_change_requested_
Definition: player.h:212
#define ROS_ASSERT(cond)
ros::Time start_time_
Definition: player.h:238
void updateRateTopicTime(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event)
Definition: player.cpp:339
TimePublisher time_publisher_
Definition: player.h:236
ROSCPP_DECL void spinOnce()
bool ok() const
void processPause(const bool paused, ros::WallTime &horizon)
Definition: player.cpp:420
ROSTIME_DECL const Duration DURATION_MAX
void setHorizon(const ros::Time &horizon)
Definition: player.cpp:788
ros::Time current_
Definition: player.h:160
void runStalledClock(const ros::WallDuration &duration)
Sleep as necessary, but don&#39;t let the click run.
Definition: player.cpp:887


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:34:21