player.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2008, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage, Inc. nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 ********************************************************************/
00034 
00035 #include "rosbag/player.h"
00036 #include "rosbag/message_instance.h"
00037 #include "rosbag/view.h"
00038 
00039 #if !defined(_MSC_VER)
00040   #include <sys/select.h>
00041 #endif
00042 
00043 #include <boost/foreach.hpp>
00044 #include <boost/format.hpp>
00045 
00046 #include "rosgraph_msgs/Clock.h"
00047 
00048 #define foreach BOOST_FOREACH
00049 
00050 using std::map;
00051 using std::pair;
00052 using std::string;
00053 using std::vector;
00054 using boost::shared_ptr;
00055 using ros::Exception;
00056 
00057 namespace rosbag {
00058 
00059 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) {
00060     ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
00061     ros::M_string::const_iterator header_iter = c->header->find("latching");
00062     opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
00063     return opts;
00064 }
00065 
00066 
00067 ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& m, uint32_t queue_size) {
00068     return ros::AdvertiseOptions(m.getTopic(), queue_size, m.getMD5Sum(), m.getDataType(), m.getMessageDefinition());
00069 }
00070 
00071 // PlayerOptions
00072 
00073 PlayerOptions::PlayerOptions() :
00074     quiet(false),
00075     start_paused(false),
00076     at_once(false),
00077     bag_time(false),
00078     bag_time_frequency(0.0),
00079     time_scale(1.0),
00080     queue_size(0),
00081     advertise_sleep(0.2),
00082     try_future(false),
00083     has_time(false),
00084     loop(false),
00085     time(0.0f),
00086     has_duration(false),
00087     duration(0.0f),
00088     keep_alive(false),
00089     skip_empty(ros::DURATION_MAX)
00090 {
00091 }
00092 
00093 void PlayerOptions::check() {
00094     if (bags.size() == 0)
00095         throw Exception("You must specify at least one bag file to play from");
00096     if (has_duration && duration <= 0.0)
00097         throw Exception("Invalid duration, must be > 0.0");
00098 }
00099 
00100 // Player
00101 
00102 Player::Player(PlayerOptions const& options) :
00103     options_(options),
00104     paused_(false),
00105     terminal_modified_(false)
00106 {
00107 }
00108 
00109 Player::~Player() {
00110     foreach(shared_ptr<Bag> bag, bags_)
00111         bag->close();
00112 
00113     restoreTerminal();
00114 }
00115 
00116 void Player::publish() {
00117     options_.check();
00118 
00119     // Open all the bag files
00120     foreach(string const& filename, options_.bags) {
00121         ROS_INFO("Opening %s", filename.c_str());
00122 
00123         try
00124         {
00125             shared_ptr<Bag> bag(new Bag);
00126             bag->open(filename, bagmode::Read);
00127             bags_.push_back(bag);
00128         }
00129         catch (BagUnindexedException ex) {
00130             std::cerr << "Bag file " << filename << " is unindexed.  Run rosbag reindex." << std::endl;
00131             return;
00132         }
00133     }
00134 
00135     setupTerminal();
00136 
00137     if (!node_handle_.ok())
00138       return;
00139 
00140     if (!options_.quiet)
00141       puts("");
00142     
00143     // Publish all messages in the bags
00144     View full_view;
00145     foreach(shared_ptr<Bag> bag, bags_)
00146         full_view.addQuery(*bag);
00147 
00148     ros::Time initial_time = full_view.getBeginTime();
00149 
00150     initial_time += ros::Duration(options_.time);
00151 
00152     ros::Time finish_time = ros::TIME_MAX;
00153     if (options_.has_duration)
00154     {
00155       finish_time = initial_time + ros::Duration(options_.duration);
00156     }
00157 
00158     View view;
00159     TopicQuery topics(options_.topics);
00160 
00161     if (options_.topics.empty())
00162     {
00163       foreach(shared_ptr<Bag> bag, bags_)
00164         view.addQuery(*bag, initial_time, finish_time);
00165     } else {
00166       foreach(shared_ptr<Bag> bag, bags_)
00167         view.addQuery(*bag, topics, initial_time, finish_time);
00168     }
00169 
00170     if (view.size() == 0)
00171     {
00172       std::cerr << "No messages to play on specified topics.  Exiting." << std::endl;
00173       ros::shutdown();
00174       return;
00175     }
00176 
00177     // Advertise all of our messages
00178     foreach(const ConnectionInfo* c, view.getConnections())
00179     {
00180         ros::M_string::const_iterator header_iter = c->header->find("callerid");
00181         std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
00182 
00183         string callerid_topic = callerid + c->topic;
00184 
00185         map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00186         if (pub_iter == publishers_.end()) {
00187 
00188             ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size);
00189 
00190             ros::Publisher pub = node_handle_.advertise(opts);
00191             publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
00192 
00193             pub_iter = publishers_.find(callerid_topic);
00194         }
00195     }
00196 
00197     std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
00198     options_.advertise_sleep.sleep();
00199     std::cout << " done." << std::endl;
00200 
00201     std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
00202 
00203     paused_ = options_.start_paused;
00204 
00205     while (true) {
00206         // Set up our time_translator and publishers
00207 
00208         time_translator_.setTimeScale(options_.time_scale);
00209 
00210         start_time_ = view.begin()->getTime();
00211         time_translator_.setRealStartTime(start_time_);
00212         bag_length_ = view.getEndTime() - view.getBeginTime();
00213 
00214         time_publisher_.setTime(start_time_);
00215 
00216         ros::WallTime now_wt = ros::WallTime::now();
00217         time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec));
00218 
00219 
00220         time_publisher_.setTimeScale(options_.time_scale);
00221         if (options_.bag_time)
00222             time_publisher_.setPublishFrequency(options_.bag_time_frequency);
00223         else
00224             time_publisher_.setPublishFrequency(-1.0);
00225 
00226         paused_time_ = now_wt;
00227 
00228         // Call do-publish for each message
00229         foreach(MessageInstance m, view) {
00230             if (!node_handle_.ok())
00231                 break;
00232 
00233             doPublish(m);
00234         }
00235 
00236         if (options_.keep_alive)
00237             while (node_handle_.ok())
00238                 doKeepAlive();
00239 
00240         if (!node_handle_.ok()) {
00241             std::cout << std::endl;
00242             break;
00243         }
00244         if (!options_.loop) {
00245             std::cout << std::endl << "Done." << std::endl;
00246             break;
00247         }
00248     }
00249 
00250     ros::shutdown();
00251 }
00252 
00253 void Player::printTime()
00254 {
00255     if (!options_.quiet) {
00256 
00257         ros::Time current_time = time_publisher_.getTime();
00258         ros::Duration d = current_time - start_time_;
00259 
00260         if (paused_)
00261         {
00262             printf("\r [PAUSED]   Bag Time: %13.6f   Duration: %.6f / %.6f     \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00263         }
00264         else
00265         {
00266             printf("\r [RUNNING]  Bag Time: %13.6f   Duration: %.6f / %.6f     \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00267         }
00268         fflush(stdout);
00269     }
00270 }
00271 
00272 void Player::doPublish(MessageInstance const& m) {
00273     string const& topic   = m.getTopic();
00274     ros::Time const& time = m.getTime();
00275     string callerid       = m.getCallerId();
00276     
00277     ros::Time translated = time_translator_.translate(time);
00278     ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00279 
00280     time_publisher_.setHorizon(time);
00281     time_publisher_.setWCHorizon(horizon);
00282 
00283     string callerid_topic = callerid + topic;
00284 
00285     map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00286     ROS_ASSERT(pub_iter != publishers_.end());
00287 
00288     // If immediate specified, play immediately
00289     if (options_.at_once) {
00290         time_publisher_.stepClock();
00291         pub_iter->second.publish(m);
00292         printTime();
00293         return;
00294     }
00295 
00296     // If skip_empty is specified, skip this region and shift.
00297     if (time - time_publisher_.getTime() > options_.skip_empty)
00298     {
00299       time_publisher_.stepClock();
00300 
00301       ros::WallDuration shift = ros::WallTime::now() - horizon ;
00302       time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00303       horizon += shift;
00304       time_publisher_.setWCHorizon(horizon);
00305       (pub_iter->second).publish(m);
00306       printTime();
00307       return;
00308     }
00309 
00310     while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00311     {
00312         bool charsleftorpaused = true;
00313         while (charsleftorpaused && node_handle_.ok())
00314         {
00315             switch (readCharFromStdin()){
00316             case ' ':
00317                 paused_ = !paused_;
00318                 if (paused_) {
00319                     paused_time_ = ros::WallTime::now();
00320                 }
00321                 else
00322                 {
00323                     ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00324                     paused_time_ = ros::WallTime::now();
00325          
00326                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00327 
00328                     horizon += shift;
00329                     time_publisher_.setWCHorizon(horizon);
00330                 }
00331                 break;
00332             case 's':
00333                 if (paused_) {
00334                     time_publisher_.stepClock();
00335 
00336                     ros::WallDuration shift = ros::WallTime::now() - horizon ;
00337                     paused_time_ = ros::WallTime::now();
00338 
00339                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00340 
00341                     horizon += shift;
00342                     time_publisher_.setWCHorizon(horizon);
00343             
00344                     (pub_iter->second).publish(m);
00345 
00346                     printTime();
00347                     return;
00348                 }
00349                 break;
00350             case EOF:
00351                 if (paused_)
00352                 {
00353                     printTime();
00354                     time_publisher_.runStalledClock(ros::WallDuration(.1));
00355                 }
00356                 else
00357                     charsleftorpaused = false;
00358             }
00359         }
00360 
00361         printTime();
00362         time_publisher_.runClock(ros::WallDuration(.1));
00363     }
00364 
00365     pub_iter->second.publish(m);
00366 }
00367 
00368 
00369 void Player::doKeepAlive() {
00370     //Keep pushing ourself out in 10-sec increments (avoids fancy math dealing with the end of time)
00371     ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
00372 
00373     ros::Time translated = time_translator_.translate(time);
00374     ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00375 
00376     time_publisher_.setHorizon(time);
00377     time_publisher_.setWCHorizon(horizon);
00378 
00379     if (options_.at_once) {
00380         return;
00381     }
00382 
00383     while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00384     {
00385         bool charsleftorpaused = true;
00386         while (charsleftorpaused && node_handle_.ok())
00387         {
00388             switch (readCharFromStdin()){
00389             case ' ':
00390                 paused_ = !paused_;
00391                 if (paused_) {
00392                     paused_time_ = ros::WallTime::now();
00393                 }
00394                 else
00395                 {
00396                     ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00397                     paused_time_ = ros::WallTime::now();
00398          
00399                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00400 
00401                     horizon += shift;
00402                     time_publisher_.setWCHorizon(horizon);
00403                 }
00404                 break;
00405             case EOF:
00406                 if (paused_)
00407                 {
00408                     printTime();
00409                     time_publisher_.runStalledClock(ros::WallDuration(.1));
00410                 }
00411                 else
00412                     charsleftorpaused = false;
00413             }
00414         }
00415 
00416         printTime();
00417         time_publisher_.runClock(ros::WallDuration(.1));
00418     }
00419 }
00420 
00421 
00422 
00423 void Player::setupTerminal() {
00424     if (terminal_modified_)
00425         return;
00426 
00427 #if defined(_MSC_VER)
00428     input_handle = GetStdHandle(STD_INPUT_HANDLE);
00429     if (input_handle == INVALID_HANDLE_VALUE)
00430     {
00431         std::cout << "Failed to set up standard input handle." << std::endl;
00432         return;
00433     }
00434     if (! GetConsoleMode(input_handle, &stdin_set) )
00435     {
00436         std::cout << "Failed to save the console mode." << std::endl;
00437         return;
00438     }
00439     // don't actually need anything but the default, alternatively try this
00440     //DWORD event_mode = ENABLE_WINDOW_INPUT | ENABLE_MOUSE_INPUT;
00441     //if (! SetConsoleMode(input_handle, event_mode) )
00442     //{
00443     // std::cout << "Failed to set the console mode." << std::endl;
00444     // return;
00445     //}
00446     terminal_modified_ = true;
00447 #else
00448     const int fd = fileno(stdin);
00449     termios flags;
00450     tcgetattr(fd, &orig_flags_);
00451     flags = orig_flags_;
00452     flags.c_lflag &= ~ICANON;      // set raw (unset canonical modes)
00453     flags.c_cc[VMIN]  = 0;         // i.e. min 1 char for blocking, 0 chars for non-blocking
00454     flags.c_cc[VTIME] = 0;         // block if waiting for char
00455     tcsetattr(fd, TCSANOW, &flags);
00456 
00457     FD_ZERO(&stdin_fdset_);
00458     FD_SET(fd, &stdin_fdset_);
00459     maxfd_ = fd + 1;
00460     terminal_modified_ = true;
00461 #endif
00462 }
00463 
00464 void Player::restoreTerminal() {
00465         if (!terminal_modified_)
00466                 return;
00467 
00468 #if defined(_MSC_VER)
00469     SetConsoleMode(input_handle, stdin_set);
00470 #else
00471     const int fd = fileno(stdin);
00472     tcsetattr(fd, TCSANOW, &orig_flags_);
00473 #endif
00474     terminal_modified_ = false;
00475 }
00476 
00477 int Player::readCharFromStdin() {
00478 #ifdef __APPLE__
00479     fd_set testfd;
00480     FD_COPY(&stdin_fdset_, &testfd);
00481 #elif !defined(_MSC_VER)
00482     fd_set testfd = stdin_fdset_;
00483 #endif
00484 
00485 #if defined(_MSC_VER)
00486     DWORD events = 0;
00487     INPUT_RECORD input_record[1];
00488     DWORD input_size = 1;
00489     BOOL b = GetNumberOfConsoleInputEvents(input_handle, &events);
00490     if (b && events > 0)
00491     {
00492         b = ReadConsoleInput(input_handle, input_record, input_size, &events);
00493         if (b)
00494         {
00495             for (unsigned int i = 0; i < events; i++)
00496             {
00497                 if (input_record[i].EventType & KEY_EVENT & input_record[i].Event.KeyEvent.bKeyDown)
00498                 {
00499                     CHAR ch = input_record[i].Event.KeyEvent.uChar.AsciiChar;
00500                     return ch;
00501                 }
00502             }
00503         }
00504     }
00505     return EOF;
00506 #else
00507     timeval tv;
00508     tv.tv_sec  = 0;
00509     tv.tv_usec = 0;
00510     if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
00511         return EOF;
00512     return getc(stdin);
00513 #endif
00514 }
00515 
00516 TimePublisher::TimePublisher() : time_scale_(1.0)
00517 {
00518   setPublishFrequency(-1.0);
00519   time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
00520 }
00521 
00522 void TimePublisher::setPublishFrequency(double publish_frequency)
00523 {
00524   publish_frequency_ = publish_frequency;
00525   
00526   do_publish_ = (publish_frequency > 0.0);
00527 
00528   wall_step_.fromSec(1.0 / publish_frequency);
00529 }
00530 
00531 void TimePublisher::setTimeScale(double time_scale)
00532 {
00533     time_scale_ = time_scale;
00534 }
00535 
00536 void TimePublisher::setHorizon(const ros::Time& horizon)
00537 {
00538     horizon_ = horizon;
00539 }
00540 
00541 void TimePublisher::setWCHorizon(const ros::WallTime& horizon)
00542 {
00543   wc_horizon_ = horizon;
00544 }
00545 
00546 void TimePublisher::setTime(const ros::Time& time)
00547 {
00548     current_ = time;
00549 }
00550 
00551 ros::Time const& TimePublisher::getTime() const
00552 {
00553     return current_;
00554 }
00555 
00556 void TimePublisher::runClock(const ros::WallDuration& duration)
00557 {
00558     if (do_publish_)
00559     {
00560         rosgraph_msgs::Clock pub_msg;
00561 
00562         ros::WallTime t = ros::WallTime::now();
00563         ros::WallTime done = t + duration;
00564 
00565         while (t < done && t < wc_horizon_)
00566         {
00567             ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00568 
00569             ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00570             d *= time_scale_;
00571 
00572             current_ = horizon_ - d;
00573 
00574             if (current_ >= horizon_)
00575               current_ = horizon_;
00576 
00577             if (t >= next_pub_)
00578             {
00579                 pub_msg.clock = current_;
00580                 time_pub_.publish(pub_msg);
00581                 next_pub_ = t + wall_step_;
00582             }
00583 
00584             ros::WallTime target = done;
00585             if (target > wc_horizon_)
00586               target = wc_horizon_;
00587             if (target > next_pub_)
00588               target = next_pub_;
00589 
00590             ros::WallTime::sleepUntil(target);
00591 
00592             t = ros::WallTime::now();
00593         }
00594     } else {
00595 
00596         ros::WallTime t = ros::WallTime::now();
00597 
00598         ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00599 
00600         ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00601         d *= time_scale_;
00602 
00603         current_ = horizon_ - d;
00604         
00605         if (current_ >= horizon_)
00606             current_ = horizon_;
00607 
00608         ros::WallTime target = ros::WallTime::now() + duration;
00609 
00610         if (target > wc_horizon_)
00611             target = wc_horizon_;
00612 
00613         ros::WallTime::sleepUntil(target);
00614     }
00615 }
00616 
00617 void TimePublisher::stepClock()
00618 {
00619     if (do_publish_)
00620     {
00621         current_ = horizon_;
00622 
00623         rosgraph_msgs::Clock pub_msg;
00624 
00625         pub_msg.clock = current_;
00626         time_pub_.publish(pub_msg);
00627 
00628         ros::WallTime t = ros::WallTime::now();
00629         next_pub_ = t + wall_step_;
00630     } else {
00631         current_ = horizon_;
00632     }
00633 }
00634 
00635 void TimePublisher::runStalledClock(const ros::WallDuration& duration)
00636 {
00637     if (do_publish_)
00638     {
00639         rosgraph_msgs::Clock pub_msg;
00640 
00641         ros::WallTime t = ros::WallTime::now();
00642         ros::WallTime done = t + duration;
00643 
00644         while ( t < done )
00645         {
00646             if (t > next_pub_)
00647             {
00648                 pub_msg.clock = current_;
00649                 time_pub_.publish(pub_msg);
00650                 next_pub_ = t + wall_step_;
00651             }
00652 
00653             ros::WallTime target = done;
00654 
00655             if (target > next_pub_)
00656               target = next_pub_;
00657 
00658             ros::WallTime::sleepUntil(target);
00659 
00660             t = ros::WallTime::now();
00661         }
00662     } else {
00663         duration.sleep();
00664     }
00665 }
00666 
00667 bool TimePublisher::horizonReached()
00668 {
00669   return ros::WallTime::now() > wc_horizon_;
00670 }
00671 
00672 } // namespace rosbag


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Fri Aug 28 2015 12:33:52