player.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2008, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage, Inc. nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 ********************************************************************/
00034 
00035 #include "rosbag/player.h"
00036 #include "rosbag/message_instance.h"
00037 #include "rosbag/view.h"
00038 
00039 #include <sys/select.h>
00040 
00041 #include <boost/foreach.hpp>
00042 #include <boost/format.hpp>
00043 
00044 #include "rosgraph_msgs/Clock.h"
00045 
00046 #define foreach BOOST_FOREACH
00047 
00048 using std::map;
00049 using std::pair;
00050 using std::string;
00051 using std::vector;
00052 using boost::shared_ptr;
00053 using ros::Exception;
00054 
00055 namespace rosbag {
00056 
00057 // PlayerOptions
00058 
00059 PlayerOptions::PlayerOptions() :
00060     quiet(false),
00061     start_paused(false),
00062     at_once(false),
00063     bag_time(false),
00064     bag_time_frequency(0.0),
00065     time_scale(1.0),
00066     queue_size(0),
00067     advertise_sleep(0.2),
00068     try_future(false),
00069     has_time(false),
00070     loop(false),
00071     time(0.0f),
00072     keep_alive(false),
00073     skip_empty(ros::DURATION_MAX)
00074 {
00075 }
00076 
00077 void PlayerOptions::check() {
00078     if (bags.size() == 0)
00079         throw Exception("You must specify at least one bag file to play from");
00080 }
00081 
00082 // Player
00083 
00084 Player::Player(PlayerOptions const& options) :
00085     options_(options),
00086     paused_(false),
00087     terminal_modified_(false)
00088 {
00089 }
00090 
00091 Player::~Player() {
00092     foreach(shared_ptr<Bag> bag, bags_)
00093         bag->close();
00094 
00095     restoreTerminal();
00096 }
00097 
00098 void Player::publish() {
00099     options_.check();
00100 
00101     // Open all the bag files
00102     foreach(string const& filename, options_.bags) {
00103         ROS_INFO("Opening %s", filename.c_str());
00104 
00105         try
00106         {
00107             shared_ptr<Bag> bag(new Bag);
00108             bag->open(filename, bagmode::Read);
00109             bags_.push_back(bag);
00110         }
00111         catch (BagUnindexedException ex) {
00112             std::cerr << "Bag file " << filename << " is unindexed.  Run rosbag reindex." << std::endl;
00113             return;
00114         }
00115     }
00116 
00117     setupTerminal();
00118 
00119     if (!node_handle_.ok())
00120       return;
00121 
00122     if (!options_.quiet)
00123       puts("");
00124     
00125     // Publish all messages in the bags
00126     View full_view;
00127     foreach(shared_ptr<Bag> bag, bags_)
00128         full_view.addQuery(*bag);
00129 
00130     ros::Time initial_time = full_view.getBeginTime();
00131 
00132     initial_time += ros::Duration(options_.time);
00133 
00134 
00135     View view;
00136     TopicQuery topics(options_.topics);
00137 
00138     if (options_.topics.empty())
00139     {
00140       foreach(shared_ptr<Bag> bag, bags_)
00141         view.addQuery(*bag, initial_time, ros::TIME_MAX);
00142     } else {
00143       foreach(shared_ptr<Bag> bag, bags_)
00144         view.addQuery(*bag, topics, initial_time, ros::TIME_MAX);
00145     }
00146 
00147     if (view.size() == 0)
00148     {
00149       std::cerr << "No messages to play on specified topics.  Exiting." << std::endl;
00150       ros::shutdown();
00151       return;
00152     }
00153 
00154     // Advertise all of our messages
00155     foreach(const ConnectionInfo* c, view.getConnections())
00156     {
00157         ros::M_string::const_iterator header_iter = c->header->find("callerid");
00158         std::string callerid = (header_iter != c->header->end() ? header_iter->second : string(""));
00159 
00160         string callerid_topic = callerid + c->topic;
00161 
00162         map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00163         if (pub_iter == publishers_.end()) {
00164 
00165             ros::AdvertiseOptions opts = createAdvertiseOptions(c, options_.queue_size);
00166 
00167             ros::Publisher pub = node_handle_.advertise(opts);
00168             publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
00169 
00170             pub_iter = publishers_.find(callerid_topic);
00171         }
00172     }
00173 
00174     std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
00175     options_.advertise_sleep.sleep();
00176     std::cout << " done." << std::endl;
00177 
00178     std::cout << std::endl << "Hit space to toggle paused, or 's' to step." << std::endl;
00179 
00180     paused_ = options_.start_paused;
00181 
00182     while (true) {
00183         // Set up our time_translator and publishers
00184 
00185         time_translator_.setTimeScale(options_.time_scale);
00186 
00187         start_time_ = view.begin()->getTime();
00188         time_translator_.setRealStartTime(start_time_);
00189         bag_length_ = view.getEndTime() - view.getBeginTime();
00190 
00191         time_publisher_.setTime(start_time_);
00192 
00193         ros::WallTime now_wt = ros::WallTime::now();
00194         time_translator_.setTranslatedStartTime(ros::Time(now_wt.sec, now_wt.nsec));
00195 
00196 
00197         time_publisher_.setTimeScale(options_.time_scale);
00198         if (options_.bag_time)
00199             time_publisher_.setPublishFrequency(options_.bag_time_frequency);
00200         else
00201             time_publisher_.setPublishFrequency(-1.0);
00202 
00203         paused_time_ = now_wt;
00204 
00205         // Call do-publish for each message
00206         foreach(MessageInstance m, view) {
00207             if (!node_handle_.ok())
00208                 break;
00209 
00210             doPublish(m);
00211         }
00212 
00213         if (options_.keep_alive)
00214             while (node_handle_.ok())
00215                 doKeepAlive();
00216 
00217         if (!node_handle_.ok()) {
00218             std::cout << std::endl;
00219             break;
00220         }
00221         if (!options_.loop) {
00222             std::cout << std::endl << "Done." << std::endl;
00223             break;
00224         }
00225     }
00226 
00227     ros::shutdown();
00228 }
00229 
00230 void Player::printTime()
00231 {
00232     if (!options_.quiet) {
00233 
00234         ros::Time current_time = time_publisher_.getTime();
00235         ros::Duration d = current_time - start_time_;
00236 
00237         if (paused_)
00238         {
00239             printf("\r [PAUSED]   Bag Time: %13.6f   Duration: %.6f / %.6f     \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00240         }
00241         else
00242         {
00243             printf("\r [RUNNING]  Bag Time: %13.6f   Duration: %.6f / %.6f     \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
00244         }
00245         fflush(stdout);
00246     }
00247 }
00248 
00249 void Player::doPublish(MessageInstance const& m) {
00250     string const& topic   = m.getTopic();
00251     ros::Time const& time = m.getTime();
00252     string callerid       = m.getCallerId();
00253     
00254     ros::Time translated = time_translator_.translate(time);
00255     ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00256 
00257     time_publisher_.setHorizon(time);
00258     time_publisher_.setWCHorizon(horizon);
00259 
00260     string callerid_topic = callerid + topic;
00261 
00262     map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
00263     ROS_ASSERT(pub_iter != publishers_.end());
00264 
00265     // If immediate specified, play immediately
00266     if (options_.at_once) {
00267         time_publisher_.stepClock();
00268         pub_iter->second.publish(m);
00269         printTime();
00270         return;
00271     }
00272 
00273     // If skip_empty is specified, skip this region and shift.
00274     if (time - time_publisher_.getTime() > options_.skip_empty)
00275     {
00276       time_publisher_.stepClock();
00277 
00278       ros::WallDuration shift = ros::WallTime::now() - horizon ;
00279       time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00280       horizon += shift;
00281       time_publisher_.setWCHorizon(horizon);
00282       (pub_iter->second).publish(m);
00283       printTime();
00284       return;
00285     }
00286 
00287     while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00288     {
00289         bool charsleftorpaused = true;
00290         while (charsleftorpaused && node_handle_.ok())
00291         {
00292             switch (readCharFromStdin()){
00293             case ' ':
00294                 paused_ = !paused_;
00295                 if (paused_) {
00296                     paused_time_ = ros::WallTime::now();
00297                 }
00298                 else
00299                 {
00300                     ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00301                     paused_time_ = ros::WallTime::now();
00302          
00303                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00304 
00305                     horizon += shift;
00306                     time_publisher_.setWCHorizon(horizon);
00307                 }
00308                 break;
00309             case 's':
00310                 if (paused_) {
00311                     time_publisher_.stepClock();
00312 
00313                     ros::WallDuration shift = ros::WallTime::now() - horizon ;
00314                     paused_time_ = ros::WallTime::now();
00315 
00316                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00317 
00318                     horizon += shift;
00319                     time_publisher_.setWCHorizon(horizon);
00320             
00321                     (pub_iter->second).publish(m);
00322 
00323                     printTime();
00324                     return;
00325                 }
00326                 break;
00327             case EOF:
00328                 if (paused_)
00329                 {
00330                     printTime();
00331                     time_publisher_.runStalledClock(ros::WallDuration(.1));
00332                 }
00333                 else
00334                     charsleftorpaused = false;
00335             }
00336         }
00337 
00338         printTime();
00339         time_publisher_.runClock(ros::WallDuration(.1));
00340     }
00341 
00342     pub_iter->second.publish(m);
00343 }
00344 
00345 
00346 void Player::doKeepAlive() {
00347     //Keep pushing ourself out in 10-sec increments (avoids fancy math dealing with the end of time)
00348     ros::Time const& time = time_publisher_.getTime() + ros::Duration(10.0);
00349 
00350     ros::Time translated = time_translator_.translate(time);
00351     ros::WallTime horizon = ros::WallTime(translated.sec, translated.nsec);
00352 
00353     time_publisher_.setHorizon(time);
00354     time_publisher_.setWCHorizon(horizon);
00355 
00356     if (options_.at_once) {
00357         return;
00358     }
00359 
00360     while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
00361     {
00362         bool charsleftorpaused = true;
00363         while (charsleftorpaused && node_handle_.ok())
00364         {
00365             switch (readCharFromStdin()){
00366             case ' ':
00367                 paused_ = !paused_;
00368                 if (paused_) {
00369                     paused_time_ = ros::WallTime::now();
00370                 }
00371                 else
00372                 {
00373                     ros::WallDuration shift = ros::WallTime::now() - paused_time_;
00374                     paused_time_ = ros::WallTime::now();
00375          
00376                     time_translator_.shift(ros::Duration(shift.sec, shift.nsec));
00377 
00378                     horizon += shift;
00379                     time_publisher_.setWCHorizon(horizon);
00380                 }
00381                 break;
00382             case EOF:
00383                 if (paused_)
00384                 {
00385                     printTime();
00386                     time_publisher_.runStalledClock(ros::WallDuration(.1));
00387                 }
00388                 else
00389                     charsleftorpaused = false;
00390             }
00391         }
00392 
00393         printTime();
00394         time_publisher_.runClock(ros::WallDuration(.1));
00395     }
00396 }
00397 
00398 
00399 
00400 void Player::setupTerminal() {
00401         if (terminal_modified_)
00402                 return;
00403 
00404     const int fd = fileno(stdin);
00405     termios flags;
00406     tcgetattr(fd, &orig_flags_);
00407     flags = orig_flags_;
00408     flags.c_lflag &= ~ICANON;      // set raw (unset canonical modes)
00409     flags.c_cc[VMIN]  = 0;         // i.e. min 1 char for blocking, 0 chars for non-blocking
00410     flags.c_cc[VTIME] = 0;         // block if waiting for char
00411     tcsetattr(fd, TCSANOW, &flags);
00412 
00413     FD_ZERO(&stdin_fdset_);
00414     FD_SET(fd, &stdin_fdset_);
00415     maxfd_ = fd + 1;
00416 
00417     terminal_modified_ = true;
00418 }
00419 
00420 void Player::restoreTerminal() {
00421         if (!terminal_modified_)
00422                 return;
00423 
00424     const int fd = fileno(stdin);
00425     tcsetattr(fd, TCSANOW, &orig_flags_);
00426 
00427     terminal_modified_ = false;
00428 }
00429 
00430 char Player::readCharFromStdin() {
00431 #ifdef __APPLE__
00432     fd_set testfd;
00433     FD_COPY(&stdin_fdset_, &testfd);
00434 #else
00435     fd_set testfd = stdin_fdset_;
00436 #endif
00437 
00438     timeval tv;
00439     tv.tv_sec  = 0;
00440     tv.tv_usec = 0;
00441     if (select(maxfd_, &testfd, NULL, NULL, &tv) <= 0)
00442         return EOF;
00443 
00444     return getc(stdin);
00445 }
00446 
00447 TimePublisher::TimePublisher() : time_scale_(1.0)
00448 {
00449   setPublishFrequency(-1.0);
00450   time_pub_ = node_handle_.advertise<rosgraph_msgs::Clock>("clock",1);
00451 }
00452 
00453 void TimePublisher::setPublishFrequency(double publish_frequency)
00454 {
00455   publish_frequency_ = publish_frequency;
00456   
00457   do_publish_ = (publish_frequency > 0.0);
00458 
00459   wall_step_.fromSec(1.0 / publish_frequency);
00460 }
00461 
00462 void TimePublisher::setTimeScale(double time_scale)
00463 {
00464     time_scale_ = time_scale;
00465 }
00466 
00467 void TimePublisher::setHorizon(const ros::Time& horizon)
00468 {
00469     horizon_ = horizon;
00470 }
00471 
00472 void TimePublisher::setWCHorizon(const ros::WallTime& horizon)
00473 {
00474   wc_horizon_ = horizon;
00475 }
00476 
00477 void TimePublisher::setTime(const ros::Time& time)
00478 {
00479     current_ = time;
00480 }
00481 
00482 ros::Time const& TimePublisher::getTime() const
00483 {
00484     return current_;
00485 }
00486 
00487 void TimePublisher::runClock(const ros::WallDuration& duration)
00488 {
00489     if (do_publish_)
00490     {
00491         rosgraph_msgs::Clock pub_msg;
00492 
00493         ros::WallTime t = ros::WallTime::now();
00494         ros::WallTime done = t + duration;
00495 
00496         while (t < done && t < wc_horizon_)
00497         {
00498             ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00499 
00500             ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00501             d *= time_scale_;
00502 
00503             current_ = horizon_ - d;
00504 
00505             if (current_ >= horizon_)
00506               current_ = horizon_;
00507 
00508             if (t >= next_pub_)
00509             {
00510                 pub_msg.clock = current_;
00511                 time_pub_.publish(pub_msg);
00512                 next_pub_ = t + wall_step_;
00513             }
00514 
00515             ros::WallTime target = done;
00516             if (target > wc_horizon_)
00517               target = wc_horizon_;
00518             if (target > next_pub_)
00519               target = next_pub_;
00520 
00521             ros::WallTime::sleepUntil(target);
00522 
00523             t = ros::WallTime::now();
00524         }
00525     } else {
00526 
00527         ros::WallTime t = ros::WallTime::now();
00528 
00529         ros::WallDuration leftHorizonWC = wc_horizon_ - t;
00530 
00531         ros::Duration d(leftHorizonWC.sec, leftHorizonWC.nsec);
00532         d *= time_scale_;
00533 
00534         current_ = horizon_ - d;
00535         
00536         if (current_ >= horizon_)
00537             current_ = horizon_;
00538 
00539         ros::WallTime target = ros::WallTime::now() + duration;
00540 
00541         if (target > wc_horizon_)
00542             target = wc_horizon_;
00543 
00544         ros::WallTime::sleepUntil(target);
00545     }
00546 }
00547 
00548 void TimePublisher::stepClock()
00549 {
00550     if (do_publish_)
00551     {
00552         current_ = horizon_;
00553 
00554         rosgraph_msgs::Clock pub_msg;
00555 
00556         pub_msg.clock = current_;
00557         time_pub_.publish(pub_msg);
00558 
00559         ros::WallTime t = ros::WallTime::now();
00560         next_pub_ = t + wall_step_;
00561     } else {
00562         current_ = horizon_;
00563     }
00564 }
00565 
00566 void TimePublisher::runStalledClock(const ros::WallDuration& duration)
00567 {
00568     if (do_publish_)
00569     {
00570         rosgraph_msgs::Clock pub_msg;
00571 
00572         ros::WallTime t = ros::WallTime::now();
00573         ros::WallTime done = t + duration;
00574 
00575         while ( t < done )
00576         {
00577             if (t > next_pub_)
00578             {
00579                 pub_msg.clock = current_;
00580                 time_pub_.publish(pub_msg);
00581                 next_pub_ = t + wall_step_;
00582             }
00583 
00584             ros::WallTime target = done;
00585 
00586             if (target > next_pub_)
00587               target = next_pub_;
00588 
00589             ros::WallTime::sleepUntil(target);
00590 
00591             t = ros::WallTime::now();
00592         }
00593     } else {
00594         duration.sleep();
00595     }
00596 }
00597 
00598 bool TimePublisher::horizonReached()
00599 {
00600   return ros::WallTime::now() > wc_horizon_;
00601 }
00602 
00603 } // namespace rosbag


rosbag
Author(s): Tim Field (tfield@willowgarage.com), Jeremy Leibs (leibs@willowgarage.com), and James Bowman (jamesb@willowgarage.com)
autogenerated on Sat Dec 28 2013 17:43:05