player.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2008, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage, Inc. nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 ********************************************************************/
00034 
00035 #include "rosbag/player.h"
00036 #include "rosbag/message_instance.h"
00037 #include "rosbag/view.h"
00038 
00039 #if !defined(_MSC_VER)
00040   #include <sys/select.h>
00041 #endif
00042 
00043 #include <boost/foreach.hpp>
00044 #include <boost/format.hpp>
00045 
00046 #include "rosgraph_msgs/Clock.h"
00047 
00048 #define foreach BOOST_FOREACH
00049 
00050 using std::map;
00051 using std::pair;
00052 using std::string;
00053 using std::vector;
00054 using boost::shared_ptr;
00055 using ros::Exception;
00056 
00057 namespace rosbag {
00058 
00059 // PlayerOptions
00060 
00061 PlayerOptions::PlayerOptions() :
00062     quiet(false),
00063     start_paused(false),
00064     at_once(false),
00065     bag_time(false),
00066     bag_time_frequency(0.0),
00067     time_scale(1.0),
00068     queue_size(0),
00069     advertise_sleep(0.2),
00070     try_future(false),
00071     has_time(false),
00072     loop(false),
00073     time(0.0f),
00074     has_duration(false),
00075     duration(0.0f),
00076     keep_alive(false),
00077     skip_empty(ros::DURATION_MAX)
00078 {
00079 }
00080 
00081 void PlayerOptions::check() {
00082     if (bags.size() == 0)
00083         throw Exception("You must specify at least one bag file to play from");
00084     if (has_duration && duration <= 0.0)
00085         throw Exception("Invalid duration, must be > 0.0");
00086 }
00087 
00088 // Player
00089 
00090 Player::Player(PlayerOptions const& options) :
00091     options_(options),
00092     paused_(false),
00093     terminal_modified_(false)
00094 {
00095 }
00096 
00097 Player::~Player() {
00098     foreach(shared_ptr<Bag> bag, bags_)
00099         bag->close();
00100 
00101     restoreTerminal();
00102 }
00103 
00104 void Player::publish() {
00105     options_.check();
00106 
00107     // Open all the bag files
00108     foreach(string const& filename, options_.bags) {
00109         ROS_INFO("Opening %s", filename.c_str());
00110 
00111         try
00112         {
00113             shared_ptr<Bag> bag(new Bag);
00114             bag->open(filename, bagmode::Read);
00115             bags_.push_back(bag);
00116         }
00117         catch (BagUnindexedException ex) {
00118             std::cerr << "Bag file " << filename << " is unindexed.  Run rosbag reindex." << std::endl;
00119             return;
00120         }
00121     }
00122 
00123     setupTerminal();
00124 
00125     if (!node_handle_.ok())
00126       return;
00127 
00128     if (!options_.quiet)
00129       puts("");
00130     
00131     // Publish all messages in the bags
00132     View full_view;
00133     foreach(shared_ptr<Bag> bag, bags_)
00134         full_view.addQuery(*bag);
00135 
00136     ros::Time initial_time = full_view.getBeginTime();
00137 
00138     initial_time += ros::Duration(options_.time);
00139 
00140     ros::Time finish_time = ros::TIME_MAX;
00141     if (options_.has_duration)
00142     {
00143       finish_time = initial_time + ros::Duration(options_.duration);
00144     }
00145 
00146     View view;
00147     TopicQuery topics(options_.topics);
00148 
00149     if (options_.topics.empty())
00150     {
00151       foreach(shared_ptr<Bag> bag, bags_)
00152         view.addQuery(*bag, initial_time, finish_time);
00153     } else {
00154       foreach(shared_ptr<Bag> bag, bags_)
00155         view.addQuery(*bag, topics, initial_time, finish_time);
00156     }
00157 
00158     if (view.size() == 0)
00159     {
00160       std::cerr << "No messages to play on specified topics.  Exiting." << std::endl;
00161       ros::shutdown();
00162       return;
00163     }
00164 
00165     // Advertise all of our messages
00166     foreach(const ConnectionInfo* c, view.getConnections())
00167     {
00168         ros::M_string::const_iterator header_iter = c->header->find("callerid");
00169         std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
00170 
00171         string callerid_topic = callerid + c->topic;
00172 
00173         map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00174         if (pub_iter == publishers_.end()) {
00175 
00176             ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size);
00177 
00178             ros::Publisher pub = node_handle_.advertise(opts);
00179             publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
00180 
00181             pub_iter = publishers_.find(callerid_topic);
00182         }
00183     }
00184 
00185     std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
00186     options_.advertise_sleep.sleep();
00187     std::cout << " done." << std::endl;
00188 
00189     std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
00190 
00191     paused_ = options_.start_paused;
00192 
00193     while (true) {
00194         // Set up our time_translator and publishers
00195 
00196         time_translator_.setTimeScale(options_.time_scale);
00197 
00198         start_time_ = view.begin()->getTime();
00199         time_translator_.setRealStartTime(start_time_);
00200         bag_length_ = view.getEndTime() - view.getBeginTime();
00201 
00202         time_publisher_.setTime(start_time_);
00203 
00204         ros::WallTime now_wt = ros::WallTime::now();
00205         time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec));
00206 
00207 
00208         time_publisher_.setTimeScale(options_.time_scale);
00209         if (options_.bag_time)
00210             time_publisher_.setPublishFrequency(options_.bag_time_frequency);
00211         else
00212             time_publisher_.setPublishFrequency(-1.0);
00213 
00214         paused_time_ = now_wt;
00215 
00216         // Call do-publish for each message
00217         foreach(MessageInstance m, view) {
00218             if (!node_handle_.ok())
00219                 break;
00220 
00221             doPublish(m);
00222         }
00223 
00224         if (options_.keep_alive)
00225             while (node_handle_.ok())
00226                 doKeepAlive();
00227 
00228         if (!node_handle_.ok()) {
00229             std::cout << std::endl;
00230             break;
00231         }
00232         if (!options_.loop) {
00233             std::cout << std::endl << "Done." << std::endl;
00234             break;
00235         }
00236     }
00237 
00238     ros::shutdown();
00239 }
00240 
00241 void Player::printTime()
00242 {
00243     if (!options_.quiet) {
00244 
00245         ros::Time current_time = time_publisher_.getTime();
00246         ros::Duration d = current_time - start_time_;
00247 
00248         if (paused_)
00249         {
00250             printf("\r [PAUSED]   Bag Time: %13.6f   Duration: %.6f / %.6f     \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00251         }
00252         else
00253         {
00254             printf("\r [RUNNING]  Bag Time: %13.6f   Duration: %.6f / %.6f     \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00255         }
00256         fflush(stdout);
00257     }
00258 }
00259 
00260 void Player::doPublish(MessageInstance const& m) {
00261     string const& topic   = m.getTopic();
00262     ros::Time const& time = m.getTime();
00263     string callerid       = m.getCallerId();
00264     
00265     ros::Time translated = time_translator_.translate(time);
00266     ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00267 
00268     time_publisher_.setHorizon(time);
00269     time_publisher_.setWCHorizon(horizon);
00270 
00271     string callerid_topic = callerid + topic;
00272 
00273     map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00274     ROS_ASSERT(pub_iter != publishers_.end());
00275 
00276     // If immediate specified, play immediately
00277     if (options_.at_once) {
00278         time_publisher_.stepClock();
00279         pub_iter->second.publish(m);
00280         printTime();
00281         return;
00282     }
00283 
00284     // If skip_empty is specified, skip this region and shift.
00285     if (time - time_publisher_.getTime() > options_.skip_empty)
00286     {
00287       time_publisher_.stepClock();
00288 
00289       ros::WallDuration shift = ros::WallTime::now() - horizon ;
00290       time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00291       horizon += shift;
00292       time_publisher_.setWCHorizon(horizon);
00293       (pub_iter->second).publish(m);
00294       printTime();
00295       return;
00296     }
00297 
00298     while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00299     {
00300         bool charsleftorpaused = true;
00301         while (charsleftorpaused && node_handle_.ok())
00302         {
00303             switch (readCharFromStdin()){
00304             case ' ':
00305                 paused_ = !paused_;
00306                 if (paused_) {
00307                     paused_time_ = ros::WallTime::now();
00308                 }
00309                 else
00310                 {
00311                     ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00312                     paused_time_ = ros::WallTime::now();
00313          
00314                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00315 
00316                     horizon += shift;
00317                     time_publisher_.setWCHorizon(horizon);
00318                 }
00319                 break;
00320             case 's':
00321                 if (paused_) {
00322                     time_publisher_.stepClock();
00323 
00324                     ros::WallDuration shift = ros::WallTime::now() - horizon ;
00325                     paused_time_ = ros::WallTime::now();
00326 
00327                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00328 
00329                     horizon += shift;
00330                     time_publisher_.setWCHorizon(horizon);
00331             
00332                     (pub_iter->second).publish(m);
00333 
00334                     printTime();
00335                     return;
00336                 }
00337                 break;
00338             case EOF:
00339                 if (paused_)
00340                 {
00341                     printTime();
00342                     time_publisher_.runStalledClock(ros::WallDuration(.1));
00343                 }
00344                 else
00345                     charsleftorpaused = false;
00346             }
00347         }
00348 
00349         printTime();
00350         time_publisher_.runClock(ros::WallDuration(.1));
00351     }
00352 
00353     pub_iter->second.publish(m);
00354 }
00355 
00356 
00357 void Player::doKeepAlive() {
00358     //Keep pushing ourself out in 10-sec increments (avoids fancy math dealing with the end of time)
00359     ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
00360 
00361     ros::Time translated = time_translator_.translate(time);
00362     ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00363 
00364     time_publisher_.setHorizon(time);
00365     time_publisher_.setWCHorizon(horizon);
00366 
00367     if (options_.at_once) {
00368         return;
00369     }
00370 
00371     while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00372     {
00373         bool charsleftorpaused = true;
00374         while (charsleftorpaused && node_handle_.ok())
00375         {
00376             switch (readCharFromStdin()){
00377             case ' ':
00378                 paused_ = !paused_;
00379                 if (paused_) {
00380                     paused_time_ = ros::WallTime::now();
00381                 }
00382                 else
00383                 {
00384                     ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00385                     paused_time_ = ros::WallTime::now();
00386          
00387                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00388 
00389                     horizon += shift;
00390                     time_publisher_.setWCHorizon(horizon);
00391                 }
00392                 break;
00393             case EOF:
00394                 if (paused_)
00395                 {
00396                     printTime();
00397                     time_publisher_.runStalledClock(ros::WallDuration(.1));
00398                 }
00399                 else
00400                     charsleftorpaused = false;
00401             }
00402         }
00403 
00404         printTime();
00405         time_publisher_.runClock(ros::WallDuration(.1));
00406     }
00407 }
00408 
00409 
00410 
00411 void Player::setupTerminal() {
00412     if (terminal_modified_)
00413         return;
00414 
00415 #if defined(_MSC_VER)
00416     input_handle = GetStdHandle(STD_INPUT_HANDLE);
00417     if (input_handle == INVALID_HANDLE_VALUE)
00418     {
00419         std::cout << "Failed to set up standard input handle." << std::endl;
00420         return;
00421     }
00422     if (! GetConsoleMode(input_handle, &stdin_set) )
00423     {
00424         std::cout << "Failed to save the console mode." << std::endl;
00425         return;
00426     }
00427     // don't actually need anything but the default, alternatively try this
00428     //DWORD event_mode = ENABLE_WINDOW_INPUT | ENABLE_MOUSE_INPUT;
00429     //if (! SetConsoleMode(input_handle, event_mode) )
00430     //{
00431     // std::cout << "Failed to set the console mode." << std::endl;
00432     // return;
00433     //}
00434     terminal_modified_ = true;
00435 #else
00436     const int fd = fileno(stdin);
00437     termios flags;
00438     tcgetattr(fd, &orig_flags_);
00439     flags = orig_flags_;
00440     flags.c_lflag &= ~ICANON;      // set raw (unset canonical modes)
00441     flags.c_cc[VMIN]  = 0;         // i.e. min 1 char for blocking, 0 chars for non-blocking
00442     flags.c_cc[VTIME] = 0;         // block if waiting for char
00443     tcsetattr(fd, TCSANOW, &flags);
00444 
00445     FD_ZERO(&stdin_fdset_);
00446     FD_SET(fd, &stdin_fdset_);
00447     maxfd_ = fd + 1;
00448     terminal_modified_ = true;
00449 #endif
00450 }
00451 
00452 void Player::restoreTerminal() {
00453         if (!terminal_modified_)
00454                 return;
00455 
00456 #if defined(_MSC_VER)
00457     SetConsoleMode(input_handle, stdin_set);
00458 #else
00459     const int fd = fileno(stdin);
00460     tcsetattr(fd, TCSANOW, &orig_flags_);
00461 #endif
00462     terminal_modified_ = false;
00463 }
00464 
00465 int Player::readCharFromStdin() {
00466 #ifdef __APPLE__
00467     fd_set testfd;
00468     FD_COPY(&stdin_fdset_, &testfd);
00469 #elif !defined(_MSC_VER)
00470     fd_set testfd = stdin_fdset_;
00471 #endif
00472 
00473 #if defined(_MSC_VER)
00474     DWORD events = 0;
00475     INPUT_RECORD input_record[1];
00476     DWORD input_size = 1;
00477     BOOL b = GetNumberOfConsoleInputEvents(input_handle, &events);
00478     if (b && events > 0)
00479     {
00480         b = ReadConsoleInput(input_handle, input_record, input_size, &events);
00481         if (b)
00482         {
00483             for (unsigned int i = 0; i < events; i++)
00484             {
00485                 if (input_record[i].EventType & KEY_EVENT & input_record[i].Event.KeyEvent.bKeyDown)
00486                 {
00487                     CHAR ch = input_record[i].Event.KeyEvent.uChar.AsciiChar;
00488                     return ch;
00489                 }
00490             }
00491         }
00492     }
00493     return EOF;
00494 #else
00495     timeval tv;
00496     tv.tv_sec  = 0;
00497     tv.tv_usec = 0;
00498     if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
00499         return EOF;
00500     return getc(stdin);
00501 #endif
00502 }
00503 
00504 TimePublisher::TimePublisher() : time_scale_(1.0)
00505 {
00506   setPublishFrequency(-1.0);
00507   time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
00508 }
00509 
00510 void TimePublisher::setPublishFrequency(double publish_frequency)
00511 {
00512   publish_frequency_ = publish_frequency;
00513   
00514   do_publish_ = (publish_frequency > 0.0);
00515 
00516   wall_step_.fromSec(1.0 / publish_frequency);
00517 }
00518 
00519 void TimePublisher::setTimeScale(double time_scale)
00520 {
00521     time_scale_ = time_scale;
00522 }
00523 
00524 void TimePublisher::setHorizon(const ros::Time& horizon)
00525 {
00526     horizon_ = horizon;
00527 }
00528 
00529 void TimePublisher::setWCHorizon(const ros::WallTime& horizon)
00530 {
00531   wc_horizon_ = horizon;
00532 }
00533 
00534 void TimePublisher::setTime(const ros::Time& time)
00535 {
00536     current_ = time;
00537 }
00538 
00539 ros::Time const& TimePublisher::getTime() const
00540 {
00541     return current_;
00542 }
00543 
00544 void TimePublisher::runClock(const ros::WallDuration& duration)
00545 {
00546     if (do_publish_)
00547     {
00548         rosgraph_msgs::Clock pub_msg;
00549 
00550         ros::WallTime t = ros::WallTime::now();
00551         ros::WallTime done = t + duration;
00552 
00553         while (t < done && t < wc_horizon_)
00554         {
00555             ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00556 
00557             ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00558             d *= time_scale_;
00559 
00560             current_ = horizon_ - d;
00561 
00562             if (current_ >= horizon_)
00563               current_ = horizon_;
00564 
00565             if (t >= next_pub_)
00566             {
00567                 pub_msg.clock = current_;
00568                 time_pub_.publish(pub_msg);
00569                 next_pub_ = t + wall_step_;
00570             }
00571 
00572             ros::WallTime target = done;
00573             if (target > wc_horizon_)
00574               target = wc_horizon_;
00575             if (target > next_pub_)
00576               target = next_pub_;
00577 
00578             ros::WallTime::sleepUntil(target);
00579 
00580             t = ros::WallTime::now();
00581         }
00582     } else {
00583 
00584         ros::WallTime t = ros::WallTime::now();
00585 
00586         ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00587 
00588         ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00589         d *= time_scale_;
00590 
00591         current_ = horizon_ - d;
00592         
00593         if (current_ >= horizon_)
00594             current_ = horizon_;
00595 
00596         ros::WallTime target = ros::WallTime::now() + duration;
00597 
00598         if (target > wc_horizon_)
00599             target = wc_horizon_;
00600 
00601         ros::WallTime::sleepUntil(target);
00602     }
00603 }
00604 
00605 void TimePublisher::stepClock()
00606 {
00607     if (do_publish_)
00608     {
00609         current_ = horizon_;
00610 
00611         rosgraph_msgs::Clock pub_msg;
00612 
00613         pub_msg.clock = current_;
00614         time_pub_.publish(pub_msg);
00615 
00616         ros::WallTime t = ros::WallTime::now();
00617         next_pub_ = t + wall_step_;
00618     } else {
00619         current_ = horizon_;
00620     }
00621 }
00622 
00623 void TimePublisher::runStalledClock(const ros::WallDuration& duration)
00624 {
00625     if (do_publish_)
00626     {
00627         rosgraph_msgs::Clock pub_msg;
00628 
00629         ros::WallTime t = ros::WallTime::now();
00630         ros::WallTime done = t + duration;
00631 
00632         while ( t < done )
00633         {
00634             if (t > next_pub_)
00635             {
00636                 pub_msg.clock = current_;
00637                 time_pub_.publish(pub_msg);
00638                 next_pub_ = t + wall_step_;
00639             }
00640 
00641             ros::WallTime target = done;
00642 
00643             if (target > next_pub_)
00644               target = next_pub_;
00645 
00646             ros::WallTime::sleepUntil(target);
00647 
00648             t = ros::WallTime::now();
00649         }
00650     } else {
00651         duration.sleep();
00652     }
00653 }
00654 
00655 bool TimePublisher::horizonReached()
00656 {
00657   return ros::WallTime::now() > wc_horizon_;
00658 }
00659 
00660 } // namespace rosbag


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Mon Oct 6 2014 11:47:09