$search
00001 /********************************************************************* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 ********************************************************************/ 00034 00035 #include "rosbag/player.h" 00036 #include "rosbag/message_instance.h" 00037 #include "rosbag/view.h" 00038 00039 #include <sys/select.h> 00040 00041 #include <boost/foreach.hpp> 00042 #include <boost/format.hpp> 00043 00044 #include "rosgraph_msgs/Clock.h" 00045 00046 #define foreach BOOST_FOREACH 00047 00048 using std::map; 00049 using std::pair; 00050 using std::string; 00051 using std::vector; 00052 using boost::shared_ptr; 00053 using ros::Exception; 00054 00055 namespace rosbag { 00056 00057 // PlayerOptions 00058 00059 PlayerOptions::PlayerOptions() : 00060 quiet(false), 00061 start_paused(false), 00062 at_once(false), 00063 bag_time(false), 00064 bag_time_frequency(0.0), 00065 time_scale(1.0), 00066 queue_size(0), 00067 advertise_sleep(0.2), 00068 try_future(false), 00069 has_time(false), 00070 loop(false), 00071 time(0.0f), 00072 keep_alive(false), 00073 skip_empty(ros::DURATION_MAX) 00074 { 00075 } 00076 00077 void PlayerOptions::check() { 00078 if (bags.size() == 0) 00079 throw Exception("You must specify at least one bag file to play from"); 00080 } 00081 00082 // Player 00083 00084 Player::Player(PlayerOptions const& options) : 00085 options_(options), 00086 paused_(false), 00087 terminal_modified_(false) 00088 { 00089 } 00090 00091 Player::~Player() { 00092 foreach(shared_ptr<Bag> bag, bags_) 00093 bag->close(); 00094 00095 restoreTerminal(); 00096 } 00097 00098 void Player::publish() { 00099 options_.check(); 00100 00101 // Open all the bag files 00102 foreach(string const& filename, options_.bags) { 00103 ROS_INFO("Opening %s", filename.c_str()); 00104 00105 try 00106 { 00107 shared_ptr<Bag> bag(new Bag); 00108 bag->open(filename, bagmode::Read); 00109 bags_.push_back(bag); 00110 } 00111 catch (BagUnindexedException ex) { 00112 std::cerr << "Bag file " << filename << " is unindexed. Run rosbag reindex." << std::endl; 00113 return; 00114 } 00115 } 00116 00117 setupTerminal(); 00118 00119 if (!node_handle_.ok()) 00120 return; 00121 00122 if (!options_.quiet) 00123 puts(""); 00124 00125 // Publish all messages in the bags 00126 View full_view; 00127 foreach(shared_ptr<Bag> bag, bags_) 00128 full_view.addQuery(*bag); 00129 00130 ros::Time initial_time = full_view.getBeginTime(); 00131 00132 initial_time += ros::Duration(options_.time); 00133 00134 00135 View view; 00136 TopicQuery topics(options_.topics); 00137 00138 if (options_.topics.empty()) 00139 { 00140 foreach(shared_ptr<Bag> bag, bags_) 00141 view.addQuery(*bag, initial_time, ros::TIME_MAX); 00142 } else { 00143 foreach(shared_ptr<Bag> bag, bags_) 00144 view.addQuery(*bag, topics, initial_time, ros::TIME_MAX); 00145 } 00146 00147 if (view.size() == 0) 00148 { 00149 std::cerr << "No messages to play on specified topics. Exiting." << std::endl; 00150 ros::shutdown(); 00151 return; 00152 } 00153 00154 // Advertise all of our messages 00155 foreach(const ConnectionInfo* c, view.getConnections()) 00156 { 00157 ros::M_string::const_iterator header_iter = c->header->find("callerid"); 00158 std::string callerid = (header_iter != c->header->end() ? header_iter->second : string("")); 00159 00160 string callerid_topic = callerid + c->topic; 00161 00162 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic); 00163 if (pub_iter == publishers_.end()) { 00164 00165 ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size); 00166 00167 ros::Publisher pub = node_handle_.advertise(opts); 00168 publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub)); 00169 00170 pub_iter = publishers_.find(callerid_topic); 00171 } 00172 } 00173 00174 std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush; 00175 options_.advertise_sleep.sleep(); 00176 std::cout << " done." << std::endl; 00177 00178 std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl; 00179 00180 paused_ = options_.start_paused; 00181 00182 while (true) { 00183 // Set up our time_translator and publishers 00184 00185 time_translator_.setTimeScale(options_.time_scale); 00186 00187 start_time_ = view.begin()->getTime(); 00188 time_translator_.setRealStartTime(start_time_); 00189 bag_length_ = view.getEndTime() - view.getBeginTime(); 00190 00191 time_publisher_.setTime(start_time_); 00192 00193 ros::WallTime now_wt = ros::WallTime::now(); 00194 time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec)); 00195 00196 00197 time_publisher_.setTimeScale(options_.time_scale); 00198 if (options_.bag_time) 00199 time_publisher_.setPublishFrequency(options_.bag_time_frequency); 00200 else 00201 time_publisher_.setPublishFrequency(-1.0); 00202 00203 paused_time_ = now_wt; 00204 00205 // Call do-publish for each message 00206 foreach(MessageInstance m, view) { 00207 if (!node_handle_.ok()) 00208 break; 00209 00210 doPublish(m); 00211 } 00212 00213 if (options_.keep_alive) 00214 while (node_handle_.ok()) 00215 doKeepAlive(); 00216 00217 if (!node_handle_.ok()) { 00218 std::cout << std::endl; 00219 break; 00220 } 00221 if (!options_.loop) { 00222 std::cout << std::endl << "Done." << std::endl; 00223 break; 00224 } 00225 } 00226 00227 ros::shutdown(); 00228 } 00229 00230 void Player::printTime() 00231 { 00232 if (!options_.quiet) { 00233 00234 ros::Time current_time = time_publisher_.getTime(); 00235 ros::Duration d = current_time - start_time_; 00236 00237 if (paused_) 00238 { 00239 printf("\r [PAUSED] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec()); 00240 } 00241 else 00242 { 00243 printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec()); 00244 } 00245 fflush(stdout); 00246 } 00247 } 00248 00249 void Player::doPublish(MessageInstance const& m) { 00250 string const& topic = m.getTopic(); 00251 ros::Time const& time = m.getTime(); 00252 string callerid = m.getCallerId(); 00253 00254 ros::Time translated = time_translator_.translate(time); 00255 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec); 00256 00257 time_publisher_.setHorizon(time); 00258 time_publisher_.setWCHorizon(horizon); 00259 00260 string callerid_topic = callerid + topic; 00261 00262 map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic); 00263 ROS_ASSERT(pub_iter != publishers_.end()); 00264 00265 // If immediate specified, play immediately 00266 if (options_.at_once) { 00267 time_publisher_.stepClock(); 00268 pub_iter->second.publish(m); 00269 printTime(); 00270 return; 00271 } 00272 00273 // If skip_empty is specified, skip this region and shift. 00274 if (time - time_publisher_.getTime() > options_.skip_empty) 00275 { 00276 time_publisher_.stepClock(); 00277 00278 ros::WallDuration shift = ros::WallTime::now() - horizon ; 00279 time_translator_.shift(ros::Duration(shift.sec, shift.nsec)); 00280 horizon += shift; 00281 time_publisher_.setWCHorizon(horizon); 00282 (pub_iter->second).publish(m); 00283 printTime(); 00284 return; 00285 } 00286 00287 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok()) 00288 { 00289 bool charsleftorpaused = true; 00290 while (charsleftorpaused && node_handle_.ok()) 00291 { 00292 switch (readCharFromStdin()){ 00293 case ' ': 00294 paused_ = !paused_; 00295 if (paused_) { 00296 paused_time_ = ros::WallTime::now(); 00297 } 00298 else 00299 { 00300 ros::WallDuration shift = ros::WallTime::now() - paused_time_; 00301 paused_time_ = ros::WallTime::now(); 00302 00303 time_translator_.shift(ros::Duration(shift.sec, shift.nsec)); 00304 00305 horizon += shift; 00306 time_publisher_.setWCHorizon(horizon); 00307 } 00308 break; 00309 case 's': 00310 if (paused_) { 00311 time_publisher_.stepClock(); 00312 00313 ros::WallDuration shift = ros::WallTime::now() - horizon ; 00314 paused_time_ = ros::WallTime::now(); 00315 00316 time_translator_.shift(ros::Duration(shift.sec, shift.nsec)); 00317 00318 horizon += shift; 00319 time_publisher_.setWCHorizon(horizon); 00320 00321 (pub_iter->second).publish(m); 00322 00323 printTime(); 00324 return; 00325 } 00326 break; 00327 case EOF: 00328 if (paused_) 00329 { 00330 printTime(); 00331 time_publisher_.runStalledClock(ros::WallDuration(.1)); 00332 } 00333 else 00334 charsleftorpaused = false; 00335 } 00336 } 00337 00338 printTime(); 00339 time_publisher_.runClock(ros::WallDuration(.1)); 00340 } 00341 00342 pub_iter->second.publish(m); 00343 } 00344 00345 00346 void Player::doKeepAlive() { 00347 //Keep pushing ourself out in 10-sec increments (avoids fancy math dealing with the end of time) 00348 ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0); 00349 00350 ros::Time translated = time_translator_.translate(time); 00351 ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec); 00352 00353 time_publisher_.setHorizon(time); 00354 time_publisher_.setWCHorizon(horizon); 00355 00356 if (options_.at_once) { 00357 return; 00358 } 00359 00360 while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok()) 00361 { 00362 bool charsleftorpaused = true; 00363 while (charsleftorpaused && node_handle_.ok()) 00364 { 00365 switch (readCharFromStdin()){ 00366 case ' ': 00367 paused_ = !paused_; 00368 if (paused_) { 00369 paused_time_ = ros::WallTime::now(); 00370 } 00371 else 00372 { 00373 ros::WallDuration shift = ros::WallTime::now() - paused_time_; 00374 paused_time_ = ros::WallTime::now(); 00375 00376 time_translator_.shift(ros::Duration(shift.sec, shift.nsec)); 00377 00378 horizon += shift; 00379 time_publisher_.setWCHorizon(horizon); 00380 } 00381 break; 00382 case EOF: 00383 if (paused_) 00384 { 00385 printTime(); 00386 time_publisher_.runStalledClock(ros::WallDuration(.1)); 00387 } 00388 else 00389 charsleftorpaused = false; 00390 } 00391 } 00392 00393 printTime(); 00394 time_publisher_.runClock(ros::WallDuration(.1)); 00395 } 00396 } 00397 00398 00399 00400 void Player::setupTerminal() { 00401 if (terminal_modified_) 00402 return; 00403 00404 const int fd = fileno(stdin); 00405 termios flags; 00406 tcgetattr(fd, &orig_flags_); 00407 flags = orig_flags_; 00408 flags.c_lflag &= ~ICANON; // set raw (unset canonical modes) 00409 flags.c_cc[VMIN] = 0; // i.e. min 1 char for blocking, 0 chars for non-blocking 00410 flags.c_cc[VTIME] = 0; // block if waiting for char 00411 tcsetattr(fd, TCSANOW, &flags); 00412 00413 FD_ZERO(&stdin_fdset_); 00414 FD_SET(fd, &stdin_fdset_); 00415 maxfd_ = fd + 1; 00416 00417 terminal_modified_ = true; 00418 } 00419 00420 void Player::restoreTerminal() { 00421 if (!terminal_modified_) 00422 return; 00423 00424 const int fd = fileno(stdin); 00425 tcsetattr(fd, TCSANOW, &orig_flags_); 00426 00427 terminal_modified_ = false; 00428 } 00429 00430 char Player::readCharFromStdin() { 00431 #ifdef __APPLE__ 00432 fd_set testfd; 00433 FD_COPY(&stdin_fdset_, &testfd); 00434 #else 00435 fd_set testfd = stdin_fdset_; 00436 #endif 00437 00438 timeval tv; 00439 tv.tv_sec = 0; 00440 tv.tv_usec = 0; 00441 if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0) 00442 return EOF; 00443 00444 return getc(stdin); 00445 } 00446 00447 TimePublisher::TimePublisher() : time_scale_(1.0) 00448 { 00449 setPublishFrequency(-1.0); 00450 time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1); 00451 } 00452 00453 void TimePublisher::setPublishFrequency(double publish_frequency) 00454 { 00455 publish_frequency_ = publish_frequency; 00456 00457 do_publish_ = (publish_frequency > 0.0); 00458 00459 wall_step_.fromSec(1.0 / publish_frequency); 00460 } 00461 00462 void TimePublisher::setTimeScale(double time_scale) 00463 { 00464 time_scale_ = time_scale; 00465 } 00466 00467 void TimePublisher::setHorizon(const ros::Time& horizon) 00468 { 00469 horizon_ = horizon; 00470 } 00471 00472 void TimePublisher::setWCHorizon(const ros::WallTime& horizon) 00473 { 00474 wc_horizon_ = horizon; 00475 } 00476 00477 void TimePublisher::setTime(const ros::Time& time) 00478 { 00479 current_ = time; 00480 } 00481 00482 ros::Time const& TimePublisher::getTime() const 00483 { 00484 return current_; 00485 } 00486 00487 void TimePublisher::runClock(const ros::WallDuration& duration) 00488 { 00489 if (do_publish_) 00490 { 00491 rosgraph_msgs::Clock pub_msg; 00492 00493 ros::WallTime t = ros::WallTime::now(); 00494 ros::WallTime done = t + duration; 00495 00496 while (t < done && t < wc_horizon_) 00497 { 00498 ros::WallDuration leftHorizonWC = wc_horizon_ - t; 00499 00500 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec); 00501 d *= time_scale_; 00502 00503 current_ = horizon_ - d; 00504 00505 if (current_ >= horizon_) 00506 current_ = horizon_; 00507 00508 if (t >= next_pub_) 00509 { 00510 pub_msg.clock = current_; 00511 time_pub_.publish(pub_msg); 00512 next_pub_ = t + wall_step_; 00513 } 00514 00515 ros::WallTime target = done; 00516 if (target > wc_horizon_) 00517 target = wc_horizon_; 00518 if (target > next_pub_) 00519 target = next_pub_; 00520 00521 ros::WallTime::sleepUntil(target); 00522 00523 t = ros::WallTime::now(); 00524 } 00525 } else { 00526 00527 ros::WallTime t = ros::WallTime::now(); 00528 00529 ros::WallDuration leftHorizonWC = wc_horizon_ - t; 00530 00531 ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec); 00532 d *= time_scale_; 00533 00534 current_ = horizon_ - d; 00535 00536 if (current_ >= horizon_) 00537 current_ = horizon_; 00538 00539 ros::WallTime target = ros::WallTime::now() + duration; 00540 00541 if (target > wc_horizon_) 00542 target = wc_horizon_; 00543 00544 ros::WallTime::sleepUntil(target); 00545 } 00546 } 00547 00548 void TimePublisher::stepClock() 00549 { 00550 if (do_publish_) 00551 { 00552 current_ = horizon_; 00553 00554 rosgraph_msgs::Clock pub_msg; 00555 00556 pub_msg.clock = current_; 00557 time_pub_.publish(pub_msg); 00558 00559 ros::WallTime t = ros::WallTime::now(); 00560 next_pub_ = t + wall_step_; 00561 } else { 00562 current_ = horizon_; 00563 } 00564 } 00565 00566 void TimePublisher::runStalledClock(const ros::WallDuration& duration) 00567 { 00568 if (do_publish_) 00569 { 00570 rosgraph_msgs::Clock pub_msg; 00571 00572 ros::WallTime t = ros::WallTime::now(); 00573 ros::WallTime done = t + duration; 00574 00575 while ( t < done ) 00576 { 00577 if (t > next_pub_) 00578 { 00579 pub_msg.clock = current_; 00580 time_pub_.publish(pub_msg); 00581 next_pub_ = t + wall_step_; 00582 } 00583 00584 ros::WallTime target = done; 00585 00586 if (target > next_pub_) 00587 target = next_pub_; 00588 00589 ros::WallTime::sleepUntil(target); 00590 00591 t = ros::WallTime::now(); 00592 } 00593 } else { 00594 duration.sleep(); 00595 } 00596 } 00597 00598 bool TimePublisher::horizonReached() 00599 { 00600 return ros::WallTime::now() > wc_horizon_; 00601 } 00602 00603 } // namespace rosbag