00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/player.h"
00036 #include "rosbag/message_instance.h"
00037 #include "rosbag/view.h"
00038
00039 #include <sys/select.h>
00040
00041 #include <boost/foreach.hpp>
00042 #include <boost/format.hpp>
00043
00044 #include "rosgraph_msgs/Clock.h"
00045
00046 #define foreach BOOST_FOREACH
00047
00048 using std::map;
00049 using std::pair;
00050 using std::string;
00051 using std::vector;
00052 using boost::shared_ptr;
00053 using ros::Exception;
00054
00055 namespace rosbag {
00056
00057
00058
00059 PlayerOptions::PlayerOptions() :
00060 quiet(false),
00061 start_paused(false),
00062 at_once(false),
00063 bag_time(false),
00064 bag_time_frequency(0.0),
00065 time_scale(1.0),
00066 queue_size(0),
00067 advertise_sleep(0.2),
00068 try_future(false),
00069 has_time(false),
00070 loop(false),
00071 time(0.0f),
00072 keep_alive(false),
00073 skip_empty(ros::DURATION_MAX)
00074 {
00075 }
00076
00077 void PlayerOptions::check() {
00078 if (bags.size() == 0)
00079 throw Exception("You must specify at least one bag file to play from");
00080 }
00081
00082
00083
00084 Player::Player(PlayerOptions const& options) :
00085 options_(options),
00086 paused_(false),
00087 terminal_modified_(false)
00088 {
00089 }
00090
00091 Player::~Player() {
00092 foreach(shared_ptr<Bag> bag, bags_)
00093 bag->close();
00094
00095 restoreTerminal();
00096 }
00097
00098 void Player::publish() {
00099 options_.check();
00100
00101
00102 foreach(string const& filename, options_.bags) {
00103 ROS_INFO("Opening %s", filename.c_str());
00104
00105 try
00106 {
00107 shared_ptr<Bag> bag(new Bag);
00108 bag->open(filename, bagmode::Read);
00109 bags_.push_back(bag);
00110 }
00111 catch (BagUnindexedException ex) {
00112 std::cerr << "Bag file " << filename << " is unindexed. Run rosbag reindex." << std::endl;
00113 return;
00114 }
00115 }
00116
00117 setupTerminal();
00118
00119 if (!node_handle_.ok())
00120 return;
00121
00122 if (!options_.quiet)
00123 puts("");
00124
00125
00126 View full_view;
00127 foreach(shared_ptr<Bag> bag, bags_)
00128 full_view.addQuery(*bag);
00129
00130 ros::Time initial_time = full_view.getBeginTime();
00131
00132 initial_time += ros::Duration(options_.time);
00133
00134
00135 View view;
00136 TopicQuery topics(options_.topics);
00137
00138 if (options_.topics.empty())
00139 {
00140 foreach(shared_ptr<Bag> bag, bags_)
00141 view.addQuery(*bag, initial_time, ros::TIME_MAX);
00142 } else {
00143 foreach(shared_ptr<Bag> bag, bags_)
00144 view.addQuery(*bag, topics, initial_time, ros::TIME_MAX);
00145 }
00146
00147 if (view.size() == 0)
00148 {
00149 std::cerr << "No messages to play on specified topics. Exiting." << std::endl;
00150 ros::shutdown();
00151 return;
00152 }
00153
00154
00155 foreach(const ConnectionInfo* c, view.getConnections())
00156 {
00157 ros::M_string::const_iterator header_iter = c->header->find("callerid");
00158 std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
00159
00160 string callerid_topic = callerid + c->topic;
00161
00162 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00163 if (pub_iter == publishers_.end()) {
00164
00165 ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size);
00166
00167 ros::Publisher pub = node_handle_.advertise(opts);
00168 publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
00169
00170 pub_iter = publishers_.find(callerid_topic);
00171 }
00172 }
00173
00174 std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
00175 options_.advertise_sleep.sleep();
00176 std::cout << " done." << std::endl;
00177
00178 std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
00179
00180 paused_ = options_.start_paused;
00181
00182 while (true) {
00183
00184
00185 time_translator_.setTimeScale(options_.time_scale);
00186
00187 start_time_ = view.begin()->getTime();
00188 time_translator_.setRealStartTime(start_time_);
00189 bag_length_ = view.getEndTime() - view.getBeginTime();
00190
00191 time_publisher_.setTime(start_time_);
00192
00193 ros::WallTime now_wt = ros::WallTime::now();
00194 time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec));
00195
00196
00197 time_publisher_.setTimeScale(options_.time_scale);
00198 if (options_.bag_time)
00199 time_publisher_.setPublishFrequency(options_.bag_time_frequency);
00200 else
00201 time_publisher_.setPublishFrequency(-1.0);
00202
00203 paused_time_ = now_wt;
00204
00205
00206 foreach(MessageInstance m, view) {
00207 if (!node_handle_.ok())
00208 break;
00209
00210 doPublish(m);
00211 }
00212
00213 if (options_.keep_alive)
00214 while (node_handle_.ok())
00215 doKeepAlive();
00216
00217 if (!node_handle_.ok()) {
00218 std::cout << std::endl;
00219 break;
00220 }
00221 if (!options_.loop) {
00222 std::cout << std::endl << "Done." << std::endl;
00223 break;
00224 }
00225 }
00226
00227 ros::shutdown();
00228 }
00229
00230 void Player::printTime()
00231 {
00232 if (!options_.quiet) {
00233
00234 ros::Time current_time = time_publisher_.getTime();
00235 ros::Duration d = current_time - start_time_;
00236
00237 if (paused_)
00238 {
00239 printf("\r [PAUSED] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00240 }
00241 else
00242 {
00243 printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00244 }
00245 fflush(stdout);
00246 }
00247 }
00248
00249 void Player::doPublish(MessageInstance const& m) {
00250 string const& topic = m.getTopic();
00251 ros::Time const& time = m.getTime();
00252 string callerid = m.getCallerId();
00253
00254 ros::Time translated = time_translator_.translate(time);
00255 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00256
00257 time_publisher_.setHorizon(time);
00258 time_publisher_.setWCHorizon(horizon);
00259
00260 string callerid_topic = callerid + topic;
00261
00262 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00263 ROS_ASSERT(pub_iter != publishers_.end());
00264
00265
00266 if (options_.at_once) {
00267 time_publisher_.stepClock();
00268 pub_iter->second.publish(m);
00269 printTime();
00270 return;
00271 }
00272
00273
00274 if (time - time_publisher_.getTime() > options_.skip_empty)
00275 {
00276 time_publisher_.stepClock();
00277
00278 ros::WallDuration shift = ros::WallTime::now() - horizon ;
00279 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00280 horizon += shift;
00281 time_publisher_.setWCHorizon(horizon);
00282 (pub_iter->second).publish(m);
00283 printTime();
00284 return;
00285 }
00286
00287 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00288 {
00289 bool charsleftorpaused = true;
00290 while (charsleftorpaused && node_handle_.ok())
00291 {
00292 switch (readCharFromStdin()){
00293 case ' ':
00294 paused_ = !paused_;
00295 if (paused_) {
00296 paused_time_ = ros::WallTime::now();
00297 }
00298 else
00299 {
00300 ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00301 paused_time_ = ros::WallTime::now();
00302
00303 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00304
00305 horizon += shift;
00306 time_publisher_.setWCHorizon(horizon);
00307 }
00308 break;
00309 case 's':
00310 if (paused_) {
00311 time_publisher_.stepClock();
00312
00313 ros::WallDuration shift = ros::WallTime::now() - horizon ;
00314 paused_time_ = ros::WallTime::now();
00315
00316 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00317
00318 horizon += shift;
00319 time_publisher_.setWCHorizon(horizon);
00320
00321 (pub_iter->second).publish(m);
00322
00323 printTime();
00324 return;
00325 }
00326 break;
00327 case EOF:
00328 if (paused_)
00329 {
00330 printTime();
00331 time_publisher_.runStalledClock(ros::WallDuration(.1));
00332 }
00333 else
00334 charsleftorpaused = false;
00335 }
00336 }
00337
00338 printTime();
00339 time_publisher_.runClock(ros::WallDuration(.1));
00340 }
00341
00342 pub_iter->second.publish(m);
00343 }
00344
00345
00346 void Player::doKeepAlive() {
00347
00348 ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
00349
00350 ros::Time translated = time_translator_.translate(time);
00351 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00352
00353 time_publisher_.setHorizon(time);
00354 time_publisher_.setWCHorizon(horizon);
00355
00356 if (options_.at_once) {
00357 return;
00358 }
00359
00360 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00361 {
00362 bool charsleftorpaused = true;
00363 while (charsleftorpaused && node_handle_.ok())
00364 {
00365 switch (readCharFromStdin()){
00366 case ' ':
00367 paused_ = !paused_;
00368 if (paused_) {
00369 paused_time_ = ros::WallTime::now();
00370 }
00371 else
00372 {
00373 ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00374 paused_time_ = ros::WallTime::now();
00375
00376 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00377
00378 horizon += shift;
00379 time_publisher_.setWCHorizon(horizon);
00380 }
00381 break;
00382 case EOF:
00383 if (paused_)
00384 {
00385 printTime();
00386 time_publisher_.runStalledClock(ros::WallDuration(.1));
00387 }
00388 else
00389 charsleftorpaused = false;
00390 }
00391 }
00392
00393 printTime();
00394 time_publisher_.runClock(ros::WallDuration(.1));
00395 }
00396 }
00397
00398
00399
00400 void Player::setupTerminal() {
00401 if (terminal_modified_)
00402 return;
00403
00404 const int fd = fileno(stdin);
00405 termios flags;
00406 tcgetattr(fd, &orig_flags_);
00407 flags = orig_flags_;
00408 flags.c_lflag &= ~ICANON;
00409 flags.c_cc[VMIN] = 0;
00410 flags.c_cc[VTIME] = 0;
00411 tcsetattr(fd, TCSANOW, &flags);
00412
00413 FD_ZERO(&stdin_fdset_);
00414 FD_SET(fd, &stdin_fdset_);
00415 maxfd_ = fd + 1;
00416
00417 terminal_modified_ = true;
00418 }
00419
00420 void Player::restoreTerminal() {
00421 if (!terminal_modified_)
00422 return;
00423
00424 const int fd = fileno(stdin);
00425 tcsetattr(fd, TCSANOW, &orig_flags_);
00426
00427 terminal_modified_ = false;
00428 }
00429
00430 char Player::readCharFromStdin() {
00431 #ifdef __APPLE__
00432 fd_set testfd;
00433 FD_COPY(&stdin_fdset_, &testfd);
00434 #else
00435 fd_set testfd = stdin_fdset_;
00436 #endif
00437
00438 timeval tv;
00439 tv.tv_sec = 0;
00440 tv.tv_usec = 0;
00441 if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
00442 return EOF;
00443
00444 return getc(stdin);
00445 }
00446
00447 TimePublisher::TimePublisher() : time_scale_(1.0)
00448 {
00449 setPublishFrequency(-1.0);
00450 time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
00451 }
00452
00453 void TimePublisher::setPublishFrequency(double publish_frequency)
00454 {
00455 publish_frequency_ = publish_frequency;
00456
00457 do_publish_ = (publish_frequency > 0.0);
00458
00459 wall_step_.fromSec(1.0 / publish_frequency);
00460 }
00461
00462 void TimePublisher::setTimeScale(double time_scale)
00463 {
00464 time_scale_ = time_scale;
00465 }
00466
00467 void TimePublisher::setHorizon(const ros::Time& horizon)
00468 {
00469 horizon_ = horizon;
00470 }
00471
00472 void TimePublisher::setWCHorizon(const ros::WallTime& horizon)
00473 {
00474 wc_horizon_ = horizon;
00475 }
00476
00477 void TimePublisher::setTime(const ros::Time& time)
00478 {
00479 current_ = time;
00480 }
00481
00482 ros::Time const& TimePublisher::getTime() const
00483 {
00484 return current_;
00485 }
00486
00487 void TimePublisher::runClock(const ros::WallDuration& duration)
00488 {
00489 if (do_publish_)
00490 {
00491 rosgraph_msgs::Clock pub_msg;
00492
00493 ros::WallTime t = ros::WallTime::now();
00494 ros::WallTime done = t + duration;
00495
00496 while (t < done && t < wc_horizon_)
00497 {
00498 ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00499
00500 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00501 d *= time_scale_;
00502
00503 current_ = horizon_ - d;
00504
00505 if (current_ >= horizon_)
00506 current_ = horizon_;
00507
00508 if (t >= next_pub_)
00509 {
00510 pub_msg.clock = current_;
00511 time_pub_.publish(pub_msg);
00512 next_pub_ = t + wall_step_;
00513 }
00514
00515 ros::WallTime target = done;
00516 if (target > wc_horizon_)
00517 target = wc_horizon_;
00518 if (target > next_pub_)
00519 target = next_pub_;
00520
00521 ros::WallTime::sleepUntil(target);
00522
00523 t = ros::WallTime::now();
00524 }
00525 } else {
00526
00527 ros::WallTime t = ros::WallTime::now();
00528
00529 ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00530
00531 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00532 d *= time_scale_;
00533
00534 current_ = horizon_ - d;
00535
00536 if (current_ >= horizon_)
00537 current_ = horizon_;
00538
00539 ros::WallTime target = ros::WallTime::now() + duration;
00540
00541 if (target > wc_horizon_)
00542 target = wc_horizon_;
00543
00544 ros::WallTime::sleepUntil(target);
00545 }
00546 }
00547
00548 void TimePublisher::stepClock()
00549 {
00550 if (do_publish_)
00551 {
00552 current_ = horizon_;
00553
00554 rosgraph_msgs::Clock pub_msg;
00555
00556 pub_msg.clock = current_;
00557 time_pub_.publish(pub_msg);
00558
00559 ros::WallTime t = ros::WallTime::now();
00560 next_pub_ = t + wall_step_;
00561 } else {
00562 current_ = horizon_;
00563 }
00564 }
00565
00566 void TimePublisher::runStalledClock(const ros::WallDuration& duration)
00567 {
00568 if (do_publish_)
00569 {
00570 rosgraph_msgs::Clock pub_msg;
00571
00572 ros::WallTime t = ros::WallTime::now();
00573 ros::WallTime done = t + duration;
00574
00575 while ( t < done )
00576 {
00577 if (t > next_pub_)
00578 {
00579 pub_msg.clock = current_;
00580 time_pub_.publish(pub_msg);
00581 next_pub_ = t + wall_step_;
00582 }
00583
00584 ros::WallTime target = done;
00585
00586 if (target > next_pub_)
00587 target = next_pub_;
00588
00589 ros::WallTime::sleepUntil(target);
00590
00591 t = ros::WallTime::now();
00592 }
00593 } else {
00594 duration.sleep();
00595 }
00596 }
00597
00598 bool TimePublisher::horizonReached()
00599 {
00600 return ros::WallTime::now() > wc_horizon_;
00601 }
00602
00603 }