00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/player.h"
00036 #include "rosbag/message_instance.h"
00037 #include "rosbag/view.h"
00038
00039 #if !defined(_MSC_VER)
00040 #include <sys/select.h>
00041 #endif
00042
00043 #include <boost/foreach.hpp>
00044 #include <boost/format.hpp>
00045
00046 #include "rosgraph_msgs/Clock.h"
00047
00048 #define foreach BOOST_FOREACH
00049
00050 using std::map;
00051 using std::pair;
00052 using std::string;
00053 using std::vector;
00054 using boost::shared_ptr;
00055 using ros::Exception;
00056
00057 namespace rosbag {
00058
00059 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size, const std::string& prefix) {
00060 ros::AdvertiseOptions opts(prefix + c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
00061 ros::M_string::const_iterator header_iter = c->header->find("latching");
00062 opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
00063 return opts;
00064 }
00065
00066
00067 ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& m, uint32_t queue_size, const std::string& prefix) {
00068 return ros::AdvertiseOptions(prefix + m.getTopic(), queue_size, m.getMD5Sum(), m.getDataType(), m.getMessageDefinition());
00069 }
00070
00071
00072
00073 PlayerOptions::PlayerOptions() :
00074 prefix(""),
00075 quiet(false),
00076 start_paused(false),
00077 at_once(false),
00078 bag_time(false),
00079 bag_time_frequency(0.0),
00080 time_scale(1.0),
00081 queue_size(0),
00082 advertise_sleep(0.2),
00083 try_future(false),
00084 has_time(false),
00085 loop(false),
00086 time(0.0f),
00087 has_duration(false),
00088 duration(0.0f),
00089 keep_alive(false),
00090 skip_empty(ros::DURATION_MAX)
00091 {
00092 }
00093
00094 void PlayerOptions::check() {
00095 if (bags.size() == 0)
00096 throw Exception("You must specify at least one bag file to play from");
00097 if (has_duration && duration <= 0.0)
00098 throw Exception("Invalid duration, must be > 0.0");
00099 }
00100
00101
00102
00103 Player::Player(PlayerOptions const& options) :
00104 options_(options),
00105 paused_(false),
00106
00107
00108 pause_for_topics_(options_.pause_topics.size() > 0),
00109 terminal_modified_(false)
00110 {
00111 }
00112
00113 Player::~Player() {
00114 foreach(shared_ptr<Bag> bag, bags_)
00115 bag->close();
00116
00117 restoreTerminal();
00118 }
00119
00120 void Player::publish() {
00121 options_.check();
00122
00123
00124 foreach(string const& filename, options_.bags) {
00125 ROS_INFO("Opening %s", filename.c_str());
00126
00127 try
00128 {
00129 shared_ptr<Bag> bag(boost::make_shared<Bag>());
00130 bag->open(filename, bagmode::Read);
00131 bags_.push_back(bag);
00132 }
00133 catch (BagUnindexedException ex) {
00134 std::cerr << "Bag file " << filename << " is unindexed. Run rosbag reindex." << std::endl;
00135 return;
00136 }
00137 }
00138
00139 setupTerminal();
00140
00141 if (!node_handle_.ok())
00142 return;
00143
00144 if (!options_.prefix.empty())
00145 {
00146 ROS_INFO_STREAM("Using prefix '" << options_.prefix << "'' for topics ");
00147 }
00148
00149 if (!options_.quiet)
00150 puts("");
00151
00152
00153 View full_view;
00154 foreach(shared_ptr<Bag> bag, bags_)
00155 full_view.addQuery(*bag);
00156
00157 ros::Time initial_time = full_view.getBeginTime();
00158
00159 initial_time += ros::Duration(options_.time);
00160
00161 ros::Time finish_time = ros::TIME_MAX;
00162 if (options_.has_duration)
00163 {
00164 finish_time = initial_time + ros::Duration(options_.duration);
00165 }
00166
00167 View view;
00168 TopicQuery topics(options_.topics);
00169
00170 if (options_.topics.empty())
00171 {
00172 foreach(shared_ptr<Bag> bag, bags_)
00173 view.addQuery(*bag, initial_time, finish_time);
00174 } else {
00175 foreach(shared_ptr<Bag> bag, bags_)
00176 view.addQuery(*bag, topics, initial_time, finish_time);
00177 }
00178
00179 if (view.size() == 0)
00180 {
00181 std::cerr << "No messages to play on specified topics. Exiting." << std::endl;
00182 ros::shutdown();
00183 return;
00184 }
00185
00186
00187 foreach(const ConnectionInfo* c, view.getConnections())
00188 {
00189 ros::M_string::const_iterator header_iter = c->header->find("callerid");
00190 std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
00191
00192 string callerid_topic = callerid + c->topic;
00193
00194 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00195 if (pub_iter == publishers_.end()) {
00196
00197 ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size, options_.prefix);
00198
00199 ros::Publisher pub = node_handle_.advertise(opts);
00200 publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
00201
00202 pub_iter = publishers_.find(callerid_topic);
00203 }
00204 }
00205
00206 std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
00207 options_.advertise_sleep.sleep();
00208 std::cout << " done." << std::endl;
00209
00210 std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
00211
00212 paused_ = options_.start_paused;
00213
00214 while (true) {
00215
00216
00217 time_translator_.setTimeScale(options_.time_scale);
00218
00219 start_time_ = view.begin()->getTime();
00220 time_translator_.setRealStartTime(start_time_);
00221 bag_length_ = view.getEndTime() - view.getBeginTime();
00222
00223 time_publisher_.setTime(start_time_);
00224
00225 ros::WallTime now_wt = ros::WallTime::now();
00226 time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec));
00227
00228
00229 time_publisher_.setTimeScale(options_.time_scale);
00230 if (options_.bag_time)
00231 time_publisher_.setPublishFrequency(options_.bag_time_frequency);
00232 else
00233 time_publisher_.setPublishFrequency(-1.0);
00234
00235 paused_time_ = now_wt;
00236
00237
00238 foreach(MessageInstance m, view) {
00239 if (!node_handle_.ok())
00240 break;
00241
00242 doPublish(m);
00243 }
00244
00245 if (options_.keep_alive)
00246 while (node_handle_.ok())
00247 doKeepAlive();
00248
00249 if (!node_handle_.ok()) {
00250 std::cout << std::endl;
00251 break;
00252 }
00253 if (!options_.loop) {
00254 std::cout << std::endl << "Done." << std::endl;
00255 break;
00256 }
00257 }
00258
00259 ros::shutdown();
00260 }
00261
00262 void Player::printTime()
00263 {
00264 if (!options_.quiet) {
00265
00266 ros::Time current_time = time_publisher_.getTime();
00267 ros::Duration d = current_time - start_time_;
00268
00269 if (paused_)
00270 {
00271 printf("\r [PAUSED] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00272 }
00273 else
00274 {
00275 printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00276 }
00277 fflush(stdout);
00278 }
00279 }
00280
00281 void Player::doPublish(MessageInstance const& m) {
00282 string const& topic = m.getTopic();
00283 ros::Time const& time = m.getTime();
00284 string callerid = m.getCallerId();
00285
00286 ros::Time translated = time_translator_.translate(time);
00287 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00288
00289 time_publisher_.setHorizon(time);
00290 time_publisher_.setWCHorizon(horizon);
00291
00292 string callerid_topic = callerid + topic;
00293
00294 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00295 ROS_ASSERT(pub_iter != publishers_.end());
00296
00297
00298 if (options_.at_once) {
00299 time_publisher_.stepClock();
00300 pub_iter->second.publish(m);
00301 printTime();
00302 return;
00303 }
00304
00305
00306 if (time - time_publisher_.getTime() > options_.skip_empty)
00307 {
00308 time_publisher_.stepClock();
00309
00310 ros::WallDuration shift = ros::WallTime::now() - horizon ;
00311 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00312 horizon += shift;
00313 time_publisher_.setWCHorizon(horizon);
00314 (pub_iter->second).publish(m);
00315 printTime();
00316 return;
00317 }
00318
00319 if (pause_for_topics_)
00320 {
00321 for (std::vector<std::string>::iterator i = options_.pause_topics.begin();
00322 i != options_.pause_topics.end();
00323 ++i)
00324 {
00325 if (topic == *i)
00326 {
00327 paused_ = true;
00328 paused_time_ = ros::WallTime::now();
00329 }
00330 }
00331 }
00332
00333 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00334 {
00335 bool charsleftorpaused = true;
00336 while (charsleftorpaused && node_handle_.ok())
00337 {
00338 switch (readCharFromStdin()){
00339 case ' ':
00340 paused_ = !paused_;
00341 if (paused_) {
00342 paused_time_ = ros::WallTime::now();
00343 }
00344 else
00345 {
00346 ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00347 paused_time_ = ros::WallTime::now();
00348
00349 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00350
00351 horizon += shift;
00352 time_publisher_.setWCHorizon(horizon);
00353 }
00354 break;
00355 case 's':
00356 if (paused_) {
00357 time_publisher_.stepClock();
00358
00359 ros::WallDuration shift = ros::WallTime::now() - horizon ;
00360 paused_time_ = ros::WallTime::now();
00361
00362 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00363
00364 horizon += shift;
00365 time_publisher_.setWCHorizon(horizon);
00366
00367 (pub_iter->second).publish(m);
00368
00369 printTime();
00370 return;
00371 }
00372 break;
00373 case 't':
00374 pause_for_topics_ = !pause_for_topics_;
00375 break;
00376 case EOF:
00377 if (paused_)
00378 {
00379 printTime();
00380 time_publisher_.runStalledClock(ros::WallDuration(.1));
00381 }
00382 else
00383 charsleftorpaused = false;
00384 }
00385 }
00386
00387 printTime();
00388 time_publisher_.runClock(ros::WallDuration(.1));
00389 }
00390
00391 pub_iter->second.publish(m);
00392 }
00393
00394
00395 void Player::doKeepAlive() {
00396
00397 ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
00398
00399 ros::Time translated = time_translator_.translate(time);
00400 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00401
00402 time_publisher_.setHorizon(time);
00403 time_publisher_.setWCHorizon(horizon);
00404
00405 if (options_.at_once) {
00406 return;
00407 }
00408
00409 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00410 {
00411 bool charsleftorpaused = true;
00412 while (charsleftorpaused && node_handle_.ok())
00413 {
00414 switch (readCharFromStdin()){
00415 case ' ':
00416 paused_ = !paused_;
00417 if (paused_) {
00418 paused_time_ = ros::WallTime::now();
00419 }
00420 else
00421 {
00422 ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00423 paused_time_ = ros::WallTime::now();
00424
00425 time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00426
00427 horizon += shift;
00428 time_publisher_.setWCHorizon(horizon);
00429 }
00430 break;
00431 case EOF:
00432 if (paused_)
00433 {
00434 printTime();
00435 time_publisher_.runStalledClock(ros::WallDuration(.1));
00436 }
00437 else
00438 charsleftorpaused = false;
00439 }
00440 }
00441
00442 printTime();
00443 time_publisher_.runClock(ros::WallDuration(.1));
00444 }
00445 }
00446
00447
00448
00449 void Player::setupTerminal() {
00450 if (terminal_modified_)
00451 return;
00452
00453 #if defined(_MSC_VER)
00454 input_handle = GetStdHandle(STD_INPUT_HANDLE);
00455 if (input_handle == INVALID_HANDLE_VALUE)
00456 {
00457 std::cout << "Failed to set up standard input handle." << std::endl;
00458 return;
00459 }
00460 if (! GetConsoleMode(input_handle, &stdin_set) )
00461 {
00462 std::cout << "Failed to save the console mode." << std::endl;
00463 return;
00464 }
00465
00466
00467
00468
00469
00470
00471
00472 terminal_modified_ = true;
00473 #else
00474 const int fd = fileno(stdin);
00475 termios flags;
00476 tcgetattr(fd, &orig_flags_);
00477 flags = orig_flags_;
00478 flags.c_lflag &= ~ICANON;
00479 flags.c_cc[VMIN] = 0;
00480 flags.c_cc[VTIME] = 0;
00481 tcsetattr(fd, TCSANOW, &flags);
00482
00483 FD_ZERO(&stdin_fdset_);
00484 FD_SET(fd, &stdin_fdset_);
00485 maxfd_ = fd + 1;
00486 terminal_modified_ = true;
00487 #endif
00488 }
00489
00490 void Player::restoreTerminal() {
00491 if (!terminal_modified_)
00492 return;
00493
00494 #if defined(_MSC_VER)
00495 SetConsoleMode(input_handle, stdin_set);
00496 #else
00497 const int fd = fileno(stdin);
00498 tcsetattr(fd, TCSANOW, &orig_flags_);
00499 #endif
00500 terminal_modified_ = false;
00501 }
00502
00503 int Player::readCharFromStdin() {
00504 #ifdef __APPLE__
00505 fd_set testfd;
00506 FD_COPY(&stdin_fdset_, &testfd);
00507 #elif !defined(_MSC_VER)
00508 fd_set testfd = stdin_fdset_;
00509 #endif
00510
00511 #if defined(_MSC_VER)
00512 DWORD events = 0;
00513 INPUT_RECORD input_record[1];
00514 DWORD input_size = 1;
00515 BOOL b = GetNumberOfConsoleInputEvents(input_handle, &events);
00516 if (b && events > 0)
00517 {
00518 b = ReadConsoleInput(input_handle, input_record, input_size, &events);
00519 if (b)
00520 {
00521 for (unsigned int i = 0; i < events; ++i)
00522 {
00523 if (input_record[i].EventType & KEY_EVENT & input_record[i].Event.KeyEvent.bKeyDown)
00524 {
00525 CHAR ch = input_record[i].Event.KeyEvent.uChar.AsciiChar;
00526 return ch;
00527 }
00528 }
00529 }
00530 }
00531 return EOF;
00532 #else
00533 timeval tv;
00534 tv.tv_sec = 0;
00535 tv.tv_usec = 0;
00536 if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
00537 return EOF;
00538 return getc(stdin);
00539 #endif
00540 }
00541
00542 TimePublisher::TimePublisher() : time_scale_(1.0)
00543 {
00544 setPublishFrequency(-1.0);
00545 time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
00546 }
00547
00548 void TimePublisher::setPublishFrequency(double publish_frequency)
00549 {
00550 publish_frequency_ = publish_frequency;
00551
00552 do_publish_ = (publish_frequency > 0.0);
00553
00554 wall_step_.fromSec(1.0 / publish_frequency);
00555 }
00556
00557 void TimePublisher::setTimeScale(double time_scale)
00558 {
00559 time_scale_ = time_scale;
00560 }
00561
00562 void TimePublisher::setHorizon(const ros::Time& horizon)
00563 {
00564 horizon_ = horizon;
00565 }
00566
00567 void TimePublisher::setWCHorizon(const ros::WallTime& horizon)
00568 {
00569 wc_horizon_ = horizon;
00570 }
00571
00572 void TimePublisher::setTime(const ros::Time& time)
00573 {
00574 current_ = time;
00575 }
00576
00577 ros::Time const& TimePublisher::getTime() const
00578 {
00579 return current_;
00580 }
00581
00582 void TimePublisher::runClock(const ros::WallDuration& duration)
00583 {
00584 if (do_publish_)
00585 {
00586 rosgraph_msgs::Clock pub_msg;
00587
00588 ros::WallTime t = ros::WallTime::now();
00589 ros::WallTime done = t + duration;
00590
00591 while (t < done && t < wc_horizon_)
00592 {
00593 ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00594
00595 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00596 d *= time_scale_;
00597
00598 current_ = horizon_ - d;
00599
00600 if (current_ >= horizon_)
00601 current_ = horizon_;
00602
00603 if (t >= next_pub_)
00604 {
00605 pub_msg.clock = current_;
00606 time_pub_.publish(pub_msg);
00607 next_pub_ = t + wall_step_;
00608 }
00609
00610 ros::WallTime target = done;
00611 if (target > wc_horizon_)
00612 target = wc_horizon_;
00613 if (target > next_pub_)
00614 target = next_pub_;
00615
00616 ros::WallTime::sleepUntil(target);
00617
00618 t = ros::WallTime::now();
00619 }
00620 } else {
00621
00622 ros::WallTime t = ros::WallTime::now();
00623
00624 ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00625
00626 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00627 d *= time_scale_;
00628
00629 current_ = horizon_ - d;
00630
00631 if (current_ >= horizon_)
00632 current_ = horizon_;
00633
00634 ros::WallTime target = ros::WallTime::now() + duration;
00635
00636 if (target > wc_horizon_)
00637 target = wc_horizon_;
00638
00639 ros::WallTime::sleepUntil(target);
00640 }
00641 }
00642
00643 void TimePublisher::stepClock()
00644 {
00645 if (do_publish_)
00646 {
00647 current_ = horizon_;
00648
00649 rosgraph_msgs::Clock pub_msg;
00650
00651 pub_msg.clock = current_;
00652 time_pub_.publish(pub_msg);
00653
00654 ros::WallTime t = ros::WallTime::now();
00655 next_pub_ = t + wall_step_;
00656 } else {
00657 current_ = horizon_;
00658 }
00659 }
00660
00661 void TimePublisher::runStalledClock(const ros::WallDuration& duration)
00662 {
00663 if (do_publish_)
00664 {
00665 rosgraph_msgs::Clock pub_msg;
00666
00667 ros::WallTime t = ros::WallTime::now();
00668 ros::WallTime done = t + duration;
00669
00670 while ( t < done )
00671 {
00672 if (t > next_pub_)
00673 {
00674 pub_msg.clock = current_;
00675 time_pub_.publish(pub_msg);
00676 next_pub_ = t + wall_step_;
00677 }
00678
00679 ros::WallTime target = done;
00680
00681 if (target > next_pub_)
00682 target = next_pub_;
00683
00684 ros::WallTime::sleepUntil(target);
00685
00686 t = ros::WallTime::now();
00687 }
00688 } else {
00689 duration.sleep();
00690 }
00691 }
00692
00693 bool TimePublisher::horizonReached()
00694 {
00695 return ros::WallTime::now() > wc_horizon_;
00696 }
00697
00698 }