00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/player.h"
00036 #include "rosbag/message_instance.h"
00037 #include "rosbag/view.h"
00038
00039 #if !defined(_MSC_VER)
00040 #include <sys/select.h>
00041 #endif
00042
00043 #include <boost/foreach.hpp>
00044 #include <boost/format.hpp>
00045
00046 #include "rosgraph_msgs/Clock.h"
00047
00048 #define foreach BOOST_FOREACH
00049
00050 using std::map;
00051 using std::pair;
00052 using std::string;
00053 using std::vector;
00054 using boost::shared_ptr;
00055 using ros::Exception;
00056
00057 namespace rosbag {
00058
00059 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) {
00060 ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
00061 ros::M_string::const_iterator header_iter = c->header->find("latching");
00062 opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
00063 return opts;
00064 }
00065
00066
00067 ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& m, uint32_t queue_size) {
00068 return ros::AdvertiseOptions(m.getTopic(), queue_size, m.getMD5Sum(), m.getDataType(), m.getMessageDefinition());
00069 }
00070
00071
00072
00073 PlayerOptions::PlayerOptions() :
00074 quiet(false),
00075 start_paused(false),
00076 at_once(false),
00077 bag_time(false),
00078 bag_time_frequency(0.0),
00079 time_scale(1.0),
00080 queue_size(0),
00081 advertise_sleep(0.2),
00082 try_future(false),
00083 has_time(false),
00084 loop(false),
00085 time(0.0f),
00086 has_duration(false),
00087 duration(0.0f),
00088 keep_alive(false),
00089 skip_empty(ros::DURATION_MAX)
00090 {
00091 }
00092
00093 void PlayerOptions::check() {
00094 if (bags.size() == 0)
00095 throw Exception("You must specify at least one bag file to play from");
00096 if (has_duration && duration <= 0.0)
00097 throw Exception("Invalid duration, must be > 0.0");
00098 }
00099
00100
00101
00102 Player::Player(PlayerOptions const& options) :
00103 options_(options),
00104 paused_(false),
00105 terminal_modified_(false)
00106 {
00107 }
00108
00109 Player::~Player() {
00110 foreach(shared_ptr<Bag> bag, bags_)
00111 bag->close();
00112
00113 restoreTerminal();
00114 }
00115
00116 void Player::publish() {
00117 options_.check();
00118
00119
00120 foreach(string const& filename, options_.bags) {
00121 ROS_INFO("Opening %s", filename.c_str());
00122
00123 try
00124 {
00125 shared_ptr<Bag> bag(new Bag);
00126 bag->open(filename, bagmode::Read);
00127 bags_.push_back(bag);
00128 }
00129 catch (BagUnindexedException ex) {
00130 std::cerr << "Bag file " << filename << " is unindexed. Run rosbag reindex." << std::endl;
00131 return;
00132 }
00133 }
00134
00135 setupTerminal();
00136
00137 if (!node_handle_.ok())
00138 return;
00139
00140 if (!options_.quiet)
00141 puts("");
00142
00143
00144 View full_view;
00145 foreach(shared_ptr<Bag> bag, bags_)
00146 full_view.addQuery(*bag);
00147
00148 ros::Time initial_time = full_view.getBeginTime();
00149
00150 initial_time += ros::Duration(options_.time);
00151
00152 ros::Time finish_time = ros::TIME_MAX;
00153 if (options_.has_duration)
00154 {
00155 finish_time = initial_time + ros::Duration(options_.duration);
00156 }
00157
00158 View view;
00159 TopicQuery topics(options_.topics);
00160
00161 if (options_.topics.empty())
00162 {
00163 foreach(shared_ptr<Bag> bag, bags_)
00164 view.addQuery(*bag, initial_time, finish_time);
00165 } else {
00166 foreach(shared_ptr<Bag> bag, bags_)
00167 view.addQuery(*bag, topics, initial_time, finish_time);
00168 }
00169
00170 if (view.size() == 0)
00171 {
00172 std::cerr << "No messages to play on specified topics. Exiting." << std::endl;
00173 ros::shutdown();
00174 return;
00175 }
00176
00177
00178 foreach(const ConnectionInfo* c, view.getConnections())
00179 {
00180 ros::M_string::const_iterator header_iter = c->header->find("callerid");
00181 std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
00182
00183 string callerid_topic = callerid + c->topic;
00184
00185 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00186 if (pub_iter == publishers_.end()) {
00187
00188 ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size);
00189
00190 ros::Publisher pub = node_handle_.advertise(opts);
00191 publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
00192
00193 pub_iter = publishers_.find(callerid_topic);
00194 }
00195 }
00196
00197 std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
00198 options_.advertise_sleep.sleep();
00199 std::cout << " done." << std::endl;
00200
00201 std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
00202
00203 paused_ = options_.start_paused;
00204
00205 while (true) {
00206
00207
00208 time_translator_.setTimeScale(options_.time_scale);
00209
00210 start_time_ = view.begin()->getTime();
00211 time_translator_.setRealStartTime(start_time_);
00212 bag_length_ = view.getEndTime() - view.getBeginTime();
00213
00214 time_publisher_.setTime(start_time_);
00215
00216 ros::WallTime now_wt = ros::WallTime::now();
00217 time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec));
00218
00219
00220 time_publisher_.setTimeScale(options_.time_scale);
00221 if (options_.bag_time)
00222 time_publisher_.setPublishFrequency(options_.bag_time_frequency);
00223 else
00224 time_publisher_.setPublishFrequency(-1.0);
00225
00226 paused_time_ = now_wt;
00227
00228
00229 foreach(MessageInstance m, view) {
00230 if (!node_handle_.ok())
00231 break;
00232
00233 doPublish(m);
00234 }
00235
00236 if (options_.keep_alive)
00237 while (node_handle_.ok())
00238 doKeepAlive();
00239
00240 if (!node_handle_.ok()) {
00241 std::cout << std::endl;
00242 break;
00243 }
00244 if (!options_.loop) {
00245 std::cout << std::endl << "Done." << std::endl;
00246 break;
00247 }
00248 }
00249
00250 ros::shutdown();
00251 }
00252
00253 void Player::printTime()
00254 {
00255 if (!options_.quiet) {
00256
00257 ros::Time current_time = time_publisher_.getTime();
00258 ros::Duration d = current_time - start_time_;
00259
00260 if (paused_)
00261 {
00262 printf("\r [PAUSED] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00263 }
00264 else
00265 {
00266 printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00267 }
00268 fflush(stdout);
00269 }
00270 }
00271
00272 void Player::doPublish(MessageInstance const& m) {
00273 string const& topic = m.getTopic();
00274 ros::Time const& time = m.getTime();
00275 string callerid = m.getCallerId();
00276
00277 ros::Time translated = time_translator_.translate(time);
00278 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00279
00280 time_publisher_.setHorizon(time);
00281 time_publisher_.setWCHorizon(horizon);
00282
00283 string callerid_topic = callerid + topic;
00284
00285 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00286 ROS_ASSERT(pub_iter != publishers_.end());
00287
00288
00289 if (options_.at_once) {
00290 time_publisher_.stepClock();
00291 pub_iter->second.publish(m);
00292 printTime();
00293 return;
00294 }
00295
00296
00297 if (time - time_publisher_.getTime() > options_.skip_empty)
00298 {
00299 time_publisher_.stepClock();
00300
00301 ros::WallDuration shift = ros::WallTime::now() - horizon ;
00302 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00303 horizon += shift;
00304 time_publisher_.setWCHorizon(horizon);
00305 (pub_iter->second).publish(m);
00306 printTime();
00307 return;
00308 }
00309
00310 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00311 {
00312 bool charsleftorpaused = true;
00313 while (charsleftorpaused && node_handle_.ok())
00314 {
00315 switch (readCharFromStdin()){
00316 case ' ':
00317 paused_ = !paused_;
00318 if (paused_) {
00319 paused_time_ = ros::WallTime::now();
00320 }
00321 else
00322 {
00323 ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00324 paused_time_ = ros::WallTime::now();
00325
00326 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00327
00328 horizon += shift;
00329 time_publisher_.setWCHorizon(horizon);
00330 }
00331 break;
00332 case 's':
00333 if (paused_) {
00334 time_publisher_.stepClock();
00335
00336 ros::WallDuration shift = ros::WallTime::now() - horizon ;
00337 paused_time_ = ros::WallTime::now();
00338
00339 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00340
00341 horizon += shift;
00342 time_publisher_.setWCHorizon(horizon);
00343
00344 (pub_iter->second).publish(m);
00345
00346 printTime();
00347 return;
00348 }
00349 break;
00350 case EOF:
00351 if (paused_)
00352 {
00353 printTime();
00354 time_publisher_.runStalledClock(ros::WallDuration(.1));
00355 }
00356 else
00357 charsleftorpaused = false;
00358 }
00359 }
00360
00361 printTime();
00362 time_publisher_.runClock(ros::WallDuration(.1));
00363 }
00364
00365 pub_iter->second.publish(m);
00366 }
00367
00368
00369 void Player::doKeepAlive() {
00370
00371 ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
00372
00373 ros::Time translated = time_translator_.translate(time);
00374 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00375
00376 time_publisher_.setHorizon(time);
00377 time_publisher_.setWCHorizon(horizon);
00378
00379 if (options_.at_once) {
00380 return;
00381 }
00382
00383 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00384 {
00385 bool charsleftorpaused = true;
00386 while (charsleftorpaused && node_handle_.ok())
00387 {
00388 switch (readCharFromStdin()){
00389 case ' ':
00390 paused_ = !paused_;
00391 if (paused_) {
00392 paused_time_ = ros::WallTime::now();
00393 }
00394 else
00395 {
00396 ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00397 paused_time_ = ros::WallTime::now();
00398
00399 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00400
00401 horizon += shift;
00402 time_publisher_.setWCHorizon(horizon);
00403 }
00404 break;
00405 case EOF:
00406 if (paused_)
00407 {
00408 printTime();
00409 time_publisher_.runStalledClock(ros::WallDuration(.1));
00410 }
00411 else
00412 charsleftorpaused = false;
00413 }
00414 }
00415
00416 printTime();
00417 time_publisher_.runClock(ros::WallDuration(.1));
00418 }
00419 }
00420
00421
00422
00423 void Player::setupTerminal() {
00424 if (terminal_modified_)
00425 return;
00426
00427 #if defined(_MSC_VER)
00428 input_handle = GetStdHandle(STD_INPUT_HANDLE);
00429 if (input_handle == INVALID_HANDLE_VALUE)
00430 {
00431 std::cout << "Failed to set up standard input handle." << std::endl;
00432 return;
00433 }
00434 if (! GetConsoleMode(input_handle, &stdin_set) )
00435 {
00436 std::cout << "Failed to save the console mode." << std::endl;
00437 return;
00438 }
00439
00440
00441
00442
00443
00444
00445
00446 terminal_modified_ = true;
00447 #else
00448 const int fd = fileno(stdin);
00449 termios flags;
00450 tcgetattr(fd, &orig_flags_);
00451 flags = orig_flags_;
00452 flags.c_lflag &= ~ICANON;
00453 flags.c_cc[VMIN] = 0;
00454 flags.c_cc[VTIME] = 0;
00455 tcsetattr(fd, TCSANOW, &flags);
00456
00457 FD_ZERO(&stdin_fdset_);
00458 FD_SET(fd, &stdin_fdset_);
00459 maxfd_ = fd + 1;
00460 terminal_modified_ = true;
00461 #endif
00462 }
00463
00464 void Player::restoreTerminal() {
00465 if (!terminal_modified_)
00466 return;
00467
00468 #if defined(_MSC_VER)
00469 SetConsoleMode(input_handle, stdin_set);
00470 #else
00471 const int fd = fileno(stdin);
00472 tcsetattr(fd, TCSANOW, &orig_flags_);
00473 #endif
00474 terminal_modified_ = false;
00475 }
00476
00477 int Player::readCharFromStdin() {
00478 #ifdef __APPLE__
00479 fd_set testfd;
00480 FD_COPY(&stdin_fdset_, &testfd);
00481 #elif !defined(_MSC_VER)
00482 fd_set testfd = stdin_fdset_;
00483 #endif
00484
00485 #if defined(_MSC_VER)
00486 DWORD events = 0;
00487 INPUT_RECORD input_record[1];
00488 DWORD input_size = 1;
00489 BOOL b = GetNumberOfConsoleInputEvents(input_handle, &events);
00490 if (b && events > 0)
00491 {
00492 b = ReadConsoleInput(input_handle, input_record, input_size, &events);
00493 if (b)
00494 {
00495 for (unsigned int i = 0; i < events; i++)
00496 {
00497 if (input_record[i].EventType & KEY_EVENT & input_record[i].Event.KeyEvent.bKeyDown)
00498 {
00499 CHAR ch = input_record[i].Event.KeyEvent.uChar.AsciiChar;
00500 return ch;
00501 }
00502 }
00503 }
00504 }
00505 return EOF;
00506 #else
00507 timeval tv;
00508 tv.tv_sec = 0;
00509 tv.tv_usec = 0;
00510 if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
00511 return EOF;
00512 return getc(stdin);
00513 #endif
00514 }
00515
00516 TimePublisher::TimePublisher() : time_scale_(1.0)
00517 {
00518 setPublishFrequency(-1.0);
00519 time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
00520 }
00521
00522 void TimePublisher::setPublishFrequency(double publish_frequency)
00523 {
00524 publish_frequency_ = publish_frequency;
00525
00526 do_publish_ = (publish_frequency > 0.0);
00527
00528 wall_step_.fromSec(1.0 / publish_frequency);
00529 }
00530
00531 void TimePublisher::setTimeScale(double time_scale)
00532 {
00533 time_scale_ = time_scale;
00534 }
00535
00536 void TimePublisher::setHorizon(const ros::Time& horizon)
00537 {
00538 horizon_ = horizon;
00539 }
00540
00541 void TimePublisher::setWCHorizon(const ros::WallTime& horizon)
00542 {
00543 wc_horizon_ = horizon;
00544 }
00545
00546 void TimePublisher::setTime(const ros::Time& time)
00547 {
00548 current_ = time;
00549 }
00550
00551 ros::Time const& TimePublisher::getTime() const
00552 {
00553 return current_;
00554 }
00555
00556 void TimePublisher::runClock(const ros::WallDuration& duration)
00557 {
00558 if (do_publish_)
00559 {
00560 rosgraph_msgs::Clock pub_msg;
00561
00562 ros::WallTime t = ros::WallTime::now();
00563 ros::WallTime done = t + duration;
00564
00565 while (t < done && t < wc_horizon_)
00566 {
00567 ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00568
00569 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00570 d *= time_scale_;
00571
00572 current_ = horizon_ - d;
00573
00574 if (current_ >= horizon_)
00575 current_ = horizon_;
00576
00577 if (t >= next_pub_)
00578 {
00579 pub_msg.clock = current_;
00580 time_pub_.publish(pub_msg);
00581 next_pub_ = t + wall_step_;
00582 }
00583
00584 ros::WallTime target = done;
00585 if (target > wc_horizon_)
00586 target = wc_horizon_;
00587 if (target > next_pub_)
00588 target = next_pub_;
00589
00590 ros::WallTime::sleepUntil(target);
00591
00592 t = ros::WallTime::now();
00593 }
00594 } else {
00595
00596 ros::WallTime t = ros::WallTime::now();
00597
00598 ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00599
00600 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00601 d *= time_scale_;
00602
00603 current_ = horizon_ - d;
00604
00605 if (current_ >= horizon_)
00606 current_ = horizon_;
00607
00608 ros::WallTime target = ros::WallTime::now() + duration;
00609
00610 if (target > wc_horizon_)
00611 target = wc_horizon_;
00612
00613 ros::WallTime::sleepUntil(target);
00614 }
00615 }
00616
00617 void TimePublisher::stepClock()
00618 {
00619 if (do_publish_)
00620 {
00621 current_ = horizon_;
00622
00623 rosgraph_msgs::Clock pub_msg;
00624
00625 pub_msg.clock = current_;
00626 time_pub_.publish(pub_msg);
00627
00628 ros::WallTime t = ros::WallTime::now();
00629 next_pub_ = t + wall_step_;
00630 } else {
00631 current_ = horizon_;
00632 }
00633 }
00634
00635 void TimePublisher::runStalledClock(const ros::WallDuration& duration)
00636 {
00637 if (do_publish_)
00638 {
00639 rosgraph_msgs::Clock pub_msg;
00640
00641 ros::WallTime t = ros::WallTime::now();
00642 ros::WallTime done = t + duration;
00643
00644 while ( t < done )
00645 {
00646 if (t > next_pub_)
00647 {
00648 pub_msg.clock = current_;
00649 time_pub_.publish(pub_msg);
00650 next_pub_ = t + wall_step_;
00651 }
00652
00653 ros::WallTime target = done;
00654
00655 if (target > next_pub_)
00656 target = next_pub_;
00657
00658 ros::WallTime::sleepUntil(target);
00659
00660 t = ros::WallTime::now();
00661 }
00662 } else {
00663 duration.sleep();
00664 }
00665 }
00666
00667 bool TimePublisher::horizonReached()
00668 {
00669 return ros::WallTime::now() > wc_horizon_;
00670 }
00671
00672 }