00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/player.h"
00036 #include "rosbag/message_instance.h"
00037 #include "rosbag/view.h"
00038
00039 #if !defined(_MSC_VER)
00040 #include <sys/select.h>
00041 #endif
00042
00043 #include <boost/foreach.hpp>
00044 #include <boost/format.hpp>
00045
00046 #include "rosgraph_msgs/Clock.h"
00047
00048 #define foreach BOOST_FOREACH
00049
00050 using std::map;
00051 using std::pair;
00052 using std::string;
00053 using std::vector;
00054 using boost::shared_ptr;
00055 using ros::Exception;
00056
00057 namespace rosbag {
00058
00059
00060
00061 PlayerOptions::PlayerOptions() :
00062 quiet(false),
00063 start_paused(false),
00064 at_once(false),
00065 bag_time(false),
00066 bag_time_frequency(0.0),
00067 time_scale(1.0),
00068 queue_size(0),
00069 advertise_sleep(0.2),
00070 try_future(false),
00071 has_time(false),
00072 loop(false),
00073 time(0.0f),
00074 has_duration(false),
00075 duration(0.0f),
00076 keep_alive(false),
00077 skip_empty(ros::DURATION_MAX)
00078 {
00079 }
00080
00081 void PlayerOptions::check() {
00082 if (bags.size() == 0)
00083 throw Exception("You must specify at least one bag file to play from");
00084 if (has_duration && duration <= 0.0)
00085 throw Exception("Invalid duration, must be > 0.0");
00086 }
00087
00088
00089
00090 Player::Player(PlayerOptions const& options) :
00091 options_(options),
00092 paused_(false),
00093 terminal_modified_(false)
00094 {
00095 }
00096
00097 Player::~Player() {
00098 foreach(shared_ptr<Bag> bag, bags_)
00099 bag->close();
00100
00101 restoreTerminal();
00102 }
00103
00104 void Player::publish() {
00105 options_.check();
00106
00107
00108 foreach(string const& filename, options_.bags) {
00109 ROS_INFO("Opening %s", filename.c_str());
00110
00111 try
00112 {
00113 shared_ptr<Bag> bag(new Bag);
00114 bag->open(filename, bagmode::Read);
00115 bags_.push_back(bag);
00116 }
00117 catch (BagUnindexedException ex) {
00118 std::cerr << "Bag file " << filename << " is unindexed. Run rosbag reindex." << std::endl;
00119 return;
00120 }
00121 }
00122
00123 setupTerminal();
00124
00125 if (!node_handle_.ok())
00126 return;
00127
00128 if (!options_.quiet)
00129 puts("");
00130
00131
00132 View full_view;
00133 foreach(shared_ptr<Bag> bag, bags_)
00134 full_view.addQuery(*bag);
00135
00136 ros::Time initial_time = full_view.getBeginTime();
00137
00138 initial_time += ros::Duration(options_.time);
00139
00140 ros::Time finish_time = ros::TIME_MAX;
00141 if (options_.has_duration)
00142 {
00143 finish_time = initial_time + ros::Duration(options_.duration);
00144 }
00145
00146 View view;
00147 TopicQuery topics(options_.topics);
00148
00149 if (options_.topics.empty())
00150 {
00151 foreach(shared_ptr<Bag> bag, bags_)
00152 view.addQuery(*bag, initial_time, finish_time);
00153 } else {
00154 foreach(shared_ptr<Bag> bag, bags_)
00155 view.addQuery(*bag, topics, initial_time, finish_time);
00156 }
00157
00158 if (view.size() == 0)
00159 {
00160 std::cerr << "No messages to play on specified topics. Exiting." << std::endl;
00161 ros::shutdown();
00162 return;
00163 }
00164
00165
00166 foreach(const ConnectionInfo* c, view.getConnections())
00167 {
00168 ros::M_string::const_iterator header_iter = c->header->find("callerid");
00169 std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
00170
00171 string callerid_topic = callerid + c->topic;
00172
00173 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00174 if (pub_iter == publishers_.end()) {
00175
00176 ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size);
00177
00178 ros::Publisher pub = node_handle_.advertise(opts);
00179 publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
00180
00181 pub_iter = publishers_.find(callerid_topic);
00182 }
00183 }
00184
00185 std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
00186 options_.advertise_sleep.sleep();
00187 std::cout << " done." << std::endl;
00188
00189 std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
00190
00191 paused_ = options_.start_paused;
00192
00193 while (true) {
00194
00195
00196 time_translator_.setTimeScale(options_.time_scale);
00197
00198 start_time_ = view.begin()->getTime();
00199 time_translator_.setRealStartTime(start_time_);
00200 bag_length_ = view.getEndTime() - view.getBeginTime();
00201
00202 time_publisher_.setTime(start_time_);
00203
00204 ros::WallTime now_wt = ros::WallTime::now();
00205 time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec));
00206
00207
00208 time_publisher_.setTimeScale(options_.time_scale);
00209 if (options_.bag_time)
00210 time_publisher_.setPublishFrequency(options_.bag_time_frequency);
00211 else
00212 time_publisher_.setPublishFrequency(-1.0);
00213
00214 paused_time_ = now_wt;
00215
00216
00217 foreach(MessageInstance m, view) {
00218 if (!node_handle_.ok())
00219 break;
00220
00221 doPublish(m);
00222 }
00223
00224 if (options_.keep_alive)
00225 while (node_handle_.ok())
00226 doKeepAlive();
00227
00228 if (!node_handle_.ok()) {
00229 std::cout << std::endl;
00230 break;
00231 }
00232 if (!options_.loop) {
00233 std::cout << std::endl << "Done." << std::endl;
00234 break;
00235 }
00236 }
00237
00238 ros::shutdown();
00239 }
00240
00241 void Player::printTime()
00242 {
00243 if (!options_.quiet) {
00244
00245 ros::Time current_time = time_publisher_.getTime();
00246 ros::Duration d = current_time - start_time_;
00247
00248 if (paused_)
00249 {
00250 printf("\r [PAUSED] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00251 }
00252 else
00253 {
00254 printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00255 }
00256 fflush(stdout);
00257 }
00258 }
00259
00260 void Player::doPublish(MessageInstance const& m) {
00261 string const& topic = m.getTopic();
00262 ros::Time const& time = m.getTime();
00263 string callerid = m.getCallerId();
00264
00265 ros::Time translated = time_translator_.translate(time);
00266 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00267
00268 time_publisher_.setHorizon(time);
00269 time_publisher_.setWCHorizon(horizon);
00270
00271 string callerid_topic = callerid + topic;
00272
00273 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00274 ROS_ASSERT(pub_iter != publishers_.end());
00275
00276
00277 if (options_.at_once) {
00278 time_publisher_.stepClock();
00279 pub_iter->second.publish(m);
00280 printTime();
00281 return;
00282 }
00283
00284
00285 if (time - time_publisher_.getTime() > options_.skip_empty)
00286 {
00287 time_publisher_.stepClock();
00288
00289 ros::WallDuration shift = ros::WallTime::now() - horizon ;
00290 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00291 horizon += shift;
00292 time_publisher_.setWCHorizon(horizon);
00293 (pub_iter->second).publish(m);
00294 printTime();
00295 return;
00296 }
00297
00298 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00299 {
00300 bool charsleftorpaused = true;
00301 while (charsleftorpaused && node_handle_.ok())
00302 {
00303 switch (readCharFromStdin()){
00304 case ' ':
00305 paused_ = !paused_;
00306 if (paused_) {
00307 paused_time_ = ros::WallTime::now();
00308 }
00309 else
00310 {
00311 ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00312 paused_time_ = ros::WallTime::now();
00313
00314 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00315
00316 horizon += shift;
00317 time_publisher_.setWCHorizon(horizon);
00318 }
00319 break;
00320 case 's':
00321 if (paused_) {
00322 time_publisher_.stepClock();
00323
00324 ros::WallDuration shift = ros::WallTime::now() - horizon ;
00325 paused_time_ = ros::WallTime::now();
00326
00327 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00328
00329 horizon += shift;
00330 time_publisher_.setWCHorizon(horizon);
00331
00332 (pub_iter->second).publish(m);
00333
00334 printTime();
00335 return;
00336 }
00337 break;
00338 case EOF:
00339 if (paused_)
00340 {
00341 printTime();
00342 time_publisher_.runStalledClock(ros::WallDuration(.1));
00343 }
00344 else
00345 charsleftorpaused = false;
00346 }
00347 }
00348
00349 printTime();
00350 time_publisher_.runClock(ros::WallDuration(.1));
00351 }
00352
00353 pub_iter->second.publish(m);
00354 }
00355
00356
00357 void Player::doKeepAlive() {
00358
00359 ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
00360
00361 ros::Time translated = time_translator_.translate(time);
00362 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00363
00364 time_publisher_.setHorizon(time);
00365 time_publisher_.setWCHorizon(horizon);
00366
00367 if (options_.at_once) {
00368 return;
00369 }
00370
00371 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00372 {
00373 bool charsleftorpaused = true;
00374 while (charsleftorpaused && node_handle_.ok())
00375 {
00376 switch (readCharFromStdin()){
00377 case ' ':
00378 paused_ = !paused_;
00379 if (paused_) {
00380 paused_time_ = ros::WallTime::now();
00381 }
00382 else
00383 {
00384 ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00385 paused_time_ = ros::WallTime::now();
00386
00387 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00388
00389 horizon += shift;
00390 time_publisher_.setWCHorizon(horizon);
00391 }
00392 break;
00393 case EOF:
00394 if (paused_)
00395 {
00396 printTime();
00397 time_publisher_.runStalledClock(ros::WallDuration(.1));
00398 }
00399 else
00400 charsleftorpaused = false;
00401 }
00402 }
00403
00404 printTime();
00405 time_publisher_.runClock(ros::WallDuration(.1));
00406 }
00407 }
00408
00409
00410
00411 void Player::setupTerminal() {
00412 if (terminal_modified_)
00413 return;
00414
00415 #if defined(_MSC_VER)
00416 input_handle = GetStdHandle(STD_INPUT_HANDLE);
00417 if (input_handle == INVALID_HANDLE_VALUE)
00418 {
00419 std::cout << "Failed to set up standard input handle." << std::endl;
00420 return;
00421 }
00422 if (! GetConsoleMode(input_handle, &stdin_set) )
00423 {
00424 std::cout << "Failed to save the console mode." << std::endl;
00425 return;
00426 }
00427
00428
00429
00430
00431
00432
00433
00434 terminal_modified_ = true;
00435 #else
00436 const int fd = fileno(stdin);
00437 termios flags;
00438 tcgetattr(fd, &orig_flags_);
00439 flags = orig_flags_;
00440 flags.c_lflag &= ~ICANON;
00441 flags.c_cc[VMIN] = 0;
00442 flags.c_cc[VTIME] = 0;
00443 tcsetattr(fd, TCSANOW, &flags);
00444
00445 FD_ZERO(&stdin_fdset_);
00446 FD_SET(fd, &stdin_fdset_);
00447 maxfd_ = fd + 1;
00448 terminal_modified_ = true;
00449 #endif
00450 }
00451
00452 void Player::restoreTerminal() {
00453 if (!terminal_modified_)
00454 return;
00455
00456 #if defined(_MSC_VER)
00457 SetConsoleMode(input_handle, stdin_set);
00458 #else
00459 const int fd = fileno(stdin);
00460 tcsetattr(fd, TCSANOW, &orig_flags_);
00461 #endif
00462 terminal_modified_ = false;
00463 }
00464
00465 int Player::readCharFromStdin() {
00466 #ifdef __APPLE__
00467 fd_set testfd;
00468 FD_COPY(&stdin_fdset_, &testfd);
00469 #elif !defined(_MSC_VER)
00470 fd_set testfd = stdin_fdset_;
00471 #endif
00472
00473 #if defined(_MSC_VER)
00474 DWORD events = 0;
00475 INPUT_RECORD input_record[1];
00476 DWORD input_size = 1;
00477 BOOL b = GetNumberOfConsoleInputEvents(input_handle, &events);
00478 if (b && events > 0)
00479 {
00480 b = ReadConsoleInput(input_handle, input_record, input_size, &events);
00481 if (b)
00482 {
00483 for (unsigned int i = 0; i < events; i++)
00484 {
00485 if (input_record[i].EventType & KEY_EVENT & input_record[i].Event.KeyEvent.bKeyDown)
00486 {
00487 CHAR ch = input_record[i].Event.KeyEvent.uChar.AsciiChar;
00488 return ch;
00489 }
00490 }
00491 }
00492 }
00493 return EOF;
00494 #else
00495 timeval tv;
00496 tv.tv_sec = 0;
00497 tv.tv_usec = 0;
00498 if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
00499 return EOF;
00500 return getc(stdin);
00501 #endif
00502 }
00503
00504 TimePublisher::TimePublisher() : time_scale_(1.0)
00505 {
00506 setPublishFrequency(-1.0);
00507 time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
00508 }
00509
00510 void TimePublisher::setPublishFrequency(double publish_frequency)
00511 {
00512 publish_frequency_ = publish_frequency;
00513
00514 do_publish_ = (publish_frequency > 0.0);
00515
00516 wall_step_.fromSec(1.0 / publish_frequency);
00517 }
00518
00519 void TimePublisher::setTimeScale(double time_scale)
00520 {
00521 time_scale_ = time_scale;
00522 }
00523
00524 void TimePublisher::setHorizon(const ros::Time& horizon)
00525 {
00526 horizon_ = horizon;
00527 }
00528
00529 void TimePublisher::setWCHorizon(const ros::WallTime& horizon)
00530 {
00531 wc_horizon_ = horizon;
00532 }
00533
00534 void TimePublisher::setTime(const ros::Time& time)
00535 {
00536 current_ = time;
00537 }
00538
00539 ros::Time const& TimePublisher::getTime() const
00540 {
00541 return current_;
00542 }
00543
00544 void TimePublisher::runClock(const ros::WallDuration& duration)
00545 {
00546 if (do_publish_)
00547 {
00548 rosgraph_msgs::Clock pub_msg;
00549
00550 ros::WallTime t = ros::WallTime::now();
00551 ros::WallTime done = t + duration;
00552
00553 while (t < done && t < wc_horizon_)
00554 {
00555 ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00556
00557 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00558 d *= time_scale_;
00559
00560 current_ = horizon_ - d;
00561
00562 if (current_ >= horizon_)
00563 current_ = horizon_;
00564
00565 if (t >= next_pub_)
00566 {
00567 pub_msg.clock = current_;
00568 time_pub_.publish(pub_msg);
00569 next_pub_ = t + wall_step_;
00570 }
00571
00572 ros::WallTime target = done;
00573 if (target > wc_horizon_)
00574 target = wc_horizon_;
00575 if (target > next_pub_)
00576 target = next_pub_;
00577
00578 ros::WallTime::sleepUntil(target);
00579
00580 t = ros::WallTime::now();
00581 }
00582 } else {
00583
00584 ros::WallTime t = ros::WallTime::now();
00585
00586 ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00587
00588 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00589 d *= time_scale_;
00590
00591 current_ = horizon_ - d;
00592
00593 if (current_ >= horizon_)
00594 current_ = horizon_;
00595
00596 ros::WallTime target = ros::WallTime::now() + duration;
00597
00598 if (target > wc_horizon_)
00599 target = wc_horizon_;
00600
00601 ros::WallTime::sleepUntil(target);
00602 }
00603 }
00604
00605 void TimePublisher::stepClock()
00606 {
00607 if (do_publish_)
00608 {
00609 current_ = horizon_;
00610
00611 rosgraph_msgs::Clock pub_msg;
00612
00613 pub_msg.clock = current_;
00614 time_pub_.publish(pub_msg);
00615
00616 ros::WallTime t = ros::WallTime::now();
00617 next_pub_ = t + wall_step_;
00618 } else {
00619 current_ = horizon_;
00620 }
00621 }
00622
00623 void TimePublisher::runStalledClock(const ros::WallDuration& duration)
00624 {
00625 if (do_publish_)
00626 {
00627 rosgraph_msgs::Clock pub_msg;
00628
00629 ros::WallTime t = ros::WallTime::now();
00630 ros::WallTime done = t + duration;
00631
00632 while ( t < done )
00633 {
00634 if (t > next_pub_)
00635 {
00636 pub_msg.clock = current_;
00637 time_pub_.publish(pub_msg);
00638 next_pub_ = t + wall_step_;
00639 }
00640
00641 ros::WallTime target = done;
00642
00643 if (target > next_pub_)
00644 target = next_pub_;
00645
00646 ros::WallTime::sleepUntil(target);
00647
00648 t = ros::WallTime::now();
00649 }
00650 } else {
00651 duration.sleep();
00652 }
00653 }
00654
00655 bool TimePublisher::horizonReached()
00656 {
00657 return ros::WallTime::now() > wc_horizon_;
00658 }
00659
00660 }