player.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2008, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage, Inc. nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 ********************************************************************/
00034 
00035 #include "rosbag/player.h"
00036 #include "rosbag/message_instance.h"
00037 #include "rosbag/view.h"
00038 
00039 #if !defined(_MSC_VER)
00040   #include <sys/select.h>
00041 #endif
00042 
00043 #include <boost/foreach.hpp>
00044 #include <boost/format.hpp>
00045 
00046 #include "rosgraph_msgs/Clock.h"
00047 
00048 #define foreach BOOST_FOREACH
00049 
00050 using std::map;
00051 using std::pair;
00052 using std::string;
00053 using std::vector;
00054 using boost::shared_ptr;
00055 using ros::Exception;
00056 
00057 namespace rosbag {
00058 
00059 ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size, const std::string& prefix) {
00060     ros::AdvertiseOptions opts(prefix + c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
00061     ros::M_string::const_iterator header_iter = c->header->find("latching");
00062     opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
00063     return opts;
00064 }
00065 
00066 
00067 ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& m, uint32_t queue_size, const std::string& prefix) {
00068     return ros::AdvertiseOptions(prefix + m.getTopic(), queue_size, m.getMD5Sum(), m.getDataType(), m.getMessageDefinition());
00069 }
00070 
00071 // PlayerOptions
00072 
00073 PlayerOptions::PlayerOptions() :
00074     prefix(""),
00075     quiet(false),
00076     start_paused(false),
00077     at_once(false),
00078     bag_time(false),
00079     bag_time_frequency(0.0),
00080     time_scale(1.0),
00081     queue_size(0),
00082     advertise_sleep(0.2),
00083     try_future(false),
00084     has_time(false),
00085     loop(false),
00086     time(0.0f),
00087     has_duration(false),
00088     duration(0.0f),
00089     keep_alive(false),
00090     skip_empty(ros::DURATION_MAX)
00091 {
00092 }
00093 
00094 void PlayerOptions::check() {
00095     if (bags.size() == 0)
00096         throw Exception("You must specify at least one bag file to play from");
00097     if (has_duration && duration <= 0.0)
00098         throw Exception("Invalid duration, must be > 0.0");
00099 }
00100 
00101 // Player
00102 
00103 Player::Player(PlayerOptions const& options) :
00104     options_(options),
00105     paused_(false),
00106     // If we were given a list of topics to pause on, then go into that mode
00107     // by default (it can be toggled later via 't' from the keyboard).
00108     pause_for_topics_(options_.pause_topics.size() > 0),
00109     terminal_modified_(false)
00110 {
00111 }
00112 
00113 Player::~Player() {
00114     foreach(shared_ptr<Bag> bag, bags_)
00115         bag->close();
00116 
00117     restoreTerminal();
00118 }
00119 
00120 void Player::publish() {
00121     options_.check();
00122 
00123     // Open all the bag files
00124     foreach(string const& filename, options_.bags) {
00125         ROS_INFO("Opening %s", filename.c_str());
00126 
00127         try
00128         {
00129             shared_ptr<Bag> bag(boost::make_shared<Bag>());
00130             bag->open(filename, bagmode::Read);
00131             bags_.push_back(bag);
00132         }
00133         catch (BagUnindexedException ex) {
00134             std::cerr << "Bag file " << filename << " is unindexed.  Run rosbag reindex." << std::endl;
00135             return;
00136         }
00137     }
00138 
00139     setupTerminal();
00140 
00141     if (!node_handle_.ok())
00142       return;
00143 
00144     if (!options_.prefix.empty())
00145     {
00146       ROS_INFO_STREAM("Using prefix '" << options_.prefix << "'' for topics ");
00147     }
00148 
00149     if (!options_.quiet)
00150       puts("");
00151     
00152     // Publish all messages in the bags
00153     View full_view;
00154     foreach(shared_ptr<Bag> bag, bags_)
00155         full_view.addQuery(*bag);
00156 
00157     ros::Time initial_time = full_view.getBeginTime();
00158 
00159     initial_time += ros::Duration(options_.time);
00160 
00161     ros::Time finish_time = ros::TIME_MAX;
00162     if (options_.has_duration)
00163     {
00164       finish_time = initial_time + ros::Duration(options_.duration);
00165     }
00166 
00167     View view;
00168     TopicQuery topics(options_.topics);
00169 
00170     if (options_.topics.empty())
00171     {
00172       foreach(shared_ptr<Bag> bag, bags_)
00173         view.addQuery(*bag, initial_time, finish_time);
00174     } else {
00175       foreach(shared_ptr<Bag> bag, bags_)
00176         view.addQuery(*bag, topics, initial_time, finish_time);
00177     }
00178 
00179     if (view.size() == 0)
00180     {
00181       std::cerr << "No messages to play on specified topics.  Exiting." << std::endl;
00182       ros::shutdown();
00183       return;
00184     }
00185 
00186     // Advertise all of our messages
00187     foreach(const ConnectionInfo* c, view.getConnections())
00188     {
00189         ros::M_string::const_iterator header_iter = c->header->find("callerid");
00190         std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
00191 
00192         string callerid_topic = callerid + c->topic;
00193 
00194         map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00195         if (pub_iter == publishers_.end()) {
00196 
00197             ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size, options_.prefix);
00198 
00199             ros::Publisher pub = node_handle_.advertise(opts);
00200             publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
00201 
00202             pub_iter = publishers_.find(callerid_topic);
00203         }
00204     }
00205 
00206     std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
00207     options_.advertise_sleep.sleep();
00208     std::cout << " done." << std::endl;
00209 
00210     std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
00211 
00212     paused_ = options_.start_paused;
00213 
00214     while (true) {
00215         // Set up our time_translator and publishers
00216 
00217         time_translator_.setTimeScale(options_.time_scale);
00218 
00219         start_time_ = view.begin()->getTime();
00220         time_translator_.setRealStartTime(start_time_);
00221         bag_length_ = view.getEndTime() - view.getBeginTime();
00222 
00223         time_publisher_.setTime(start_time_);
00224 
00225         ros::WallTime now_wt = ros::WallTime::now();
00226         time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec));
00227 
00228 
00229         time_publisher_.setTimeScale(options_.time_scale);
00230         if (options_.bag_time)
00231             time_publisher_.setPublishFrequency(options_.bag_time_frequency);
00232         else
00233             time_publisher_.setPublishFrequency(-1.0);
00234 
00235         paused_time_ = now_wt;
00236 
00237         // Call do-publish for each message
00238         foreach(MessageInstance m, view) {
00239             if (!node_handle_.ok())
00240                 break;
00241 
00242             doPublish(m);
00243         }
00244 
00245         if (options_.keep_alive)
00246             while (node_handle_.ok())
00247                 doKeepAlive();
00248 
00249         if (!node_handle_.ok()) {
00250             std::cout << std::endl;
00251             break;
00252         }
00253         if (!options_.loop) {
00254             std::cout << std::endl << "Done." << std::endl;
00255             break;
00256         }
00257     }
00258 
00259     ros::shutdown();
00260 }
00261 
00262 void Player::printTime()
00263 {
00264     if (!options_.quiet) {
00265 
00266         ros::Time current_time = time_publisher_.getTime();
00267         ros::Duration d = current_time - start_time_;
00268 
00269         if (paused_)
00270         {
00271             printf("\r [PAUSED]   Bag Time: %13.6f   Duration: %.6f / %.6f     \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00272         }
00273         else
00274         {
00275             printf("\r [RUNNING]  Bag Time: %13.6f   Duration: %.6f / %.6f     \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00276         }
00277         fflush(stdout);
00278     }
00279 }
00280 
00281 void Player::doPublish(MessageInstance const& m) {
00282     string const& topic   = m.getTopic();
00283     ros::Time const& time = m.getTime();
00284     string callerid       = m.getCallerId();
00285     
00286     ros::Time translated = time_translator_.translate(time);
00287     ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00288 
00289     time_publisher_.setHorizon(time);
00290     time_publisher_.setWCHorizon(horizon);
00291 
00292     string callerid_topic = callerid + topic;
00293 
00294     map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00295     ROS_ASSERT(pub_iter != publishers_.end());
00296 
00297     // If immediate specified, play immediately
00298     if (options_.at_once) {
00299         time_publisher_.stepClock();
00300         pub_iter->second.publish(m);
00301         printTime();
00302         return;
00303     }
00304 
00305     // If skip_empty is specified, skip this region and shift.
00306     if (time - time_publisher_.getTime() > options_.skip_empty)
00307     {
00308       time_publisher_.stepClock();
00309 
00310       ros::WallDuration shift = ros::WallTime::now() - horizon ;
00311       time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00312       horizon += shift;
00313       time_publisher_.setWCHorizon(horizon);
00314       (pub_iter->second).publish(m);
00315       printTime();
00316       return;
00317     }
00318 
00319     if (pause_for_topics_)
00320     {
00321         for (std::vector<std::string>::iterator i = options_.pause_topics.begin();
00322              i != options_.pause_topics.end();
00323              ++i)
00324         {
00325             if (topic == *i)
00326             {
00327                 paused_ = true;
00328                 paused_time_ = ros::WallTime::now();
00329             }
00330         }
00331     }
00332 
00333     while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00334     {
00335         bool charsleftorpaused = true;
00336         while (charsleftorpaused && node_handle_.ok())
00337         {
00338             switch (readCharFromStdin()){
00339             case ' ':
00340                 paused_ = !paused_;
00341                 if (paused_) {
00342                     paused_time_ = ros::WallTime::now();
00343                 }
00344                 else
00345                 {
00346                     ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00347                     paused_time_ = ros::WallTime::now();
00348          
00349                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00350 
00351                     horizon += shift;
00352                     time_publisher_.setWCHorizon(horizon);
00353                 }
00354                 break;
00355             case 's':
00356                 if (paused_) {
00357                     time_publisher_.stepClock();
00358 
00359                     ros::WallDuration shift = ros::WallTime::now() - horizon ;
00360                     paused_time_ = ros::WallTime::now();
00361 
00362                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00363 
00364                     horizon += shift;
00365                     time_publisher_.setWCHorizon(horizon);
00366             
00367                     (pub_iter->second).publish(m);
00368 
00369                     printTime();
00370                     return;
00371                 }
00372                 break;
00373             case 't':
00374                 pause_for_topics_ = !pause_for_topics_;
00375                 break;
00376             case EOF:
00377                 if (paused_)
00378                 {
00379                     printTime();
00380                     time_publisher_.runStalledClock(ros::WallDuration(.1));
00381                 }
00382                 else
00383                     charsleftorpaused = false;
00384             }
00385         }
00386 
00387         printTime();
00388         time_publisher_.runClock(ros::WallDuration(.1));
00389     }
00390 
00391     pub_iter->second.publish(m);
00392 }
00393 
00394 
00395 void Player::doKeepAlive() {
00396     //Keep pushing ourself out in 10-sec increments (avoids fancy math dealing with the end of time)
00397     ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
00398 
00399     ros::Time translated = time_translator_.translate(time);
00400     ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00401 
00402     time_publisher_.setHorizon(time);
00403     time_publisher_.setWCHorizon(horizon);
00404 
00405     if (options_.at_once) {
00406         return;
00407     }
00408 
00409     while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00410     {
00411         bool charsleftorpaused = true;
00412         while (charsleftorpaused && node_handle_.ok())
00413         {
00414             switch (readCharFromStdin()){
00415             case ' ':
00416                 paused_ = !paused_;
00417                 if (paused_) {
00418                     paused_time_ = ros::WallTime::now();
00419                 }
00420                 else
00421                 {
00422                     ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00423                     paused_time_ = ros::WallTime::now();
00424          
00425                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00426 
00427                     horizon += shift;
00428                     time_publisher_.setWCHorizon(horizon);
00429                 }
00430                 break;
00431             case EOF:
00432                 if (paused_)
00433                 {
00434                     printTime();
00435                     time_publisher_.runStalledClock(ros::WallDuration(.1));
00436                 }
00437                 else
00438                     charsleftorpaused = false;
00439             }
00440         }
00441 
00442         printTime();
00443         time_publisher_.runClock(ros::WallDuration(.1));
00444     }
00445 }
00446 
00447 
00448 
00449 void Player::setupTerminal() {
00450     if (terminal_modified_)
00451         return;
00452 
00453 #if defined(_MSC_VER)
00454     input_handle = GetStdHandle(STD_INPUT_HANDLE);
00455     if (input_handle == INVALID_HANDLE_VALUE)
00456     {
00457         std::cout << "Failed to set up standard input handle." << std::endl;
00458         return;
00459     }
00460     if (! GetConsoleMode(input_handle, &stdin_set) )
00461     {
00462         std::cout << "Failed to save the console mode." << std::endl;
00463         return;
00464     }
00465     // don't actually need anything but the default, alternatively try this
00466     //DWORD event_mode = ENABLE_WINDOW_INPUT | ENABLE_MOUSE_INPUT;
00467     //if (! SetConsoleMode(input_handle, event_mode) )
00468     //{
00469     // std::cout << "Failed to set the console mode." << std::endl;
00470     // return;
00471     //}
00472     terminal_modified_ = true;
00473 #else
00474     const int fd = fileno(stdin);
00475     termios flags;
00476     tcgetattr(fd, &orig_flags_);
00477     flags = orig_flags_;
00478     flags.c_lflag &= ~ICANON;      // set raw (unset canonical modes)
00479     flags.c_cc[VMIN]  = 0;         // i.e. min 1 char for blocking, 0 chars for non-blocking
00480     flags.c_cc[VTIME] = 0;         // block if waiting for char
00481     tcsetattr(fd, TCSANOW, &flags);
00482 
00483     FD_ZERO(&stdin_fdset_);
00484     FD_SET(fd, &stdin_fdset_);
00485     maxfd_ = fd + 1;
00486     terminal_modified_ = true;
00487 #endif
00488 }
00489 
00490 void Player::restoreTerminal() {
00491         if (!terminal_modified_)
00492                 return;
00493 
00494 #if defined(_MSC_VER)
00495     SetConsoleMode(input_handle, stdin_set);
00496 #else
00497     const int fd = fileno(stdin);
00498     tcsetattr(fd, TCSANOW, &orig_flags_);
00499 #endif
00500     terminal_modified_ = false;
00501 }
00502 
00503 int Player::readCharFromStdin() {
00504 #ifdef __APPLE__
00505     fd_set testfd;
00506     FD_COPY(&stdin_fdset_, &testfd);
00507 #elif !defined(_MSC_VER)
00508     fd_set testfd = stdin_fdset_;
00509 #endif
00510 
00511 #if defined(_MSC_VER)
00512     DWORD events = 0;
00513     INPUT_RECORD input_record[1];
00514     DWORD input_size = 1;
00515     BOOL b = GetNumberOfConsoleInputEvents(input_handle, &events);
00516     if (b && events > 0)
00517     {
00518         b = ReadConsoleInput(input_handle, input_record, input_size, &events);
00519         if (b)
00520         {
00521             for (unsigned int i = 0; i < events; ++i)
00522             {
00523                 if (input_record[i].EventType & KEY_EVENT & input_record[i].Event.KeyEvent.bKeyDown)
00524                 {
00525                     CHAR ch = input_record[i].Event.KeyEvent.uChar.AsciiChar;
00526                     return ch;
00527                 }
00528             }
00529         }
00530     }
00531     return EOF;
00532 #else
00533     timeval tv;
00534     tv.tv_sec  = 0;
00535     tv.tv_usec = 0;
00536     if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
00537         return EOF;
00538     return getc(stdin);
00539 #endif
00540 }
00541 
00542 TimePublisher::TimePublisher() : time_scale_(1.0)
00543 {
00544   setPublishFrequency(-1.0);
00545   time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
00546 }
00547 
00548 void TimePublisher::setPublishFrequency(double publish_frequency)
00549 {
00550   publish_frequency_ = publish_frequency;
00551   
00552   do_publish_ = (publish_frequency > 0.0);
00553 
00554   wall_step_.fromSec(1.0 / publish_frequency);
00555 }
00556 
00557 void TimePublisher::setTimeScale(double time_scale)
00558 {
00559     time_scale_ = time_scale;
00560 }
00561 
00562 void TimePublisher::setHorizon(const ros::Time& horizon)
00563 {
00564     horizon_ = horizon;
00565 }
00566 
00567 void TimePublisher::setWCHorizon(const ros::WallTime& horizon)
00568 {
00569   wc_horizon_ = horizon;
00570 }
00571 
00572 void TimePublisher::setTime(const ros::Time& time)
00573 {
00574     current_ = time;
00575 }
00576 
00577 ros::Time const& TimePublisher::getTime() const
00578 {
00579     return current_;
00580 }
00581 
00582 void TimePublisher::runClock(const ros::WallDuration& duration)
00583 {
00584     if (do_publish_)
00585     {
00586         rosgraph_msgs::Clock pub_msg;
00587 
00588         ros::WallTime t = ros::WallTime::now();
00589         ros::WallTime done = t + duration;
00590 
00591         while (t < done && t < wc_horizon_)
00592         {
00593             ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00594 
00595             ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00596             d *= time_scale_;
00597 
00598             current_ = horizon_ - d;
00599 
00600             if (current_ >= horizon_)
00601               current_ = horizon_;
00602 
00603             if (t >= next_pub_)
00604             {
00605                 pub_msg.clock = current_;
00606                 time_pub_.publish(pub_msg);
00607                 next_pub_ = t + wall_step_;
00608             }
00609 
00610             ros::WallTime target = done;
00611             if (target > wc_horizon_)
00612               target = wc_horizon_;
00613             if (target > next_pub_)
00614               target = next_pub_;
00615 
00616             ros::WallTime::sleepUntil(target);
00617 
00618             t = ros::WallTime::now();
00619         }
00620     } else {
00621 
00622         ros::WallTime t = ros::WallTime::now();
00623 
00624         ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00625 
00626         ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00627         d *= time_scale_;
00628 
00629         current_ = horizon_ - d;
00630         
00631         if (current_ >= horizon_)
00632             current_ = horizon_;
00633 
00634         ros::WallTime target = ros::WallTime::now() + duration;
00635 
00636         if (target > wc_horizon_)
00637             target = wc_horizon_;
00638 
00639         ros::WallTime::sleepUntil(target);
00640     }
00641 }
00642 
00643 void TimePublisher::stepClock()
00644 {
00645     if (do_publish_)
00646     {
00647         current_ = horizon_;
00648 
00649         rosgraph_msgs::Clock pub_msg;
00650 
00651         pub_msg.clock = current_;
00652         time_pub_.publish(pub_msg);
00653 
00654         ros::WallTime t = ros::WallTime::now();
00655         next_pub_ = t + wall_step_;
00656     } else {
00657         current_ = horizon_;
00658     }
00659 }
00660 
00661 void TimePublisher::runStalledClock(const ros::WallDuration& duration)
00662 {
00663     if (do_publish_)
00664     {
00665         rosgraph_msgs::Clock pub_msg;
00666 
00667         ros::WallTime t = ros::WallTime::now();
00668         ros::WallTime done = t + duration;
00669 
00670         while ( t < done )
00671         {
00672             if (t > next_pub_)
00673             {
00674                 pub_msg.clock = current_;
00675                 time_pub_.publish(pub_msg);
00676                 next_pub_ = t + wall_step_;
00677             }
00678 
00679             ros::WallTime target = done;
00680 
00681             if (target > next_pub_)
00682               target = next_pub_;
00683 
00684             ros::WallTime::sleepUntil(target);
00685 
00686             t = ros::WallTime::now();
00687         }
00688     } else {
00689         duration.sleep();
00690     }
00691 }
00692 
00693 bool TimePublisher::horizonReached()
00694 {
00695   return ros::WallTime::now() > wc_horizon_;
00696 }
00697 
00698 } // namespace rosbag


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Tue Mar 7 2017 03:45:48