recorder.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2008, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage, Inc. nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 ********************************************************************/
00034 
00035 #include "rosbag/recorder.h"
00036 
00037 #include <sys/stat.h>
00038 #include <boost/filesystem.hpp>
00039 // Boost filesystem v3 is default in 1.46.0 and above
00040 // Fallback to original posix code (*nix only) if this is not true
00041 #if BOOST_FILESYSTEM_VERSION < 3
00042   #include <sys/statvfs.h>
00043 #endif
00044 #include <time.h>
00045 
00046 #include <queue>
00047 #include <set>
00048 #include <sstream>
00049 #include <string>
00050 
00051 #include <boost/foreach.hpp>
00052 #include <boost/lexical_cast.hpp>
00053 #include <boost/regex.hpp>
00054 #include <boost/thread.hpp>
00055 #include <boost/thread/xtime.hpp>
00056 #include <boost/date_time/local_time/local_time.hpp>
00057 
00058 #include <ros/ros.h>
00059 #include <topic_tools/shape_shifter.h>
00060 
00061 #include "ros/network.h"
00062 #include "ros/xmlrpc_manager.h"
00063 #include "XmlRpc.h"
00064 
00065 #define foreach BOOST_FOREACH
00066 
00067 using std::cout;
00068 using std::endl;
00069 using std::set;
00070 using std::string;
00071 using std::vector;
00072 using boost::shared_ptr;
00073 using ros::Time;
00074 
00075 namespace rosbag {
00076 
00077 // OutgoingMessage
00078 
00079 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00080     topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00081 {
00082 }
00083 
00084 // OutgoingQueue
00085 
00086 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00087     filename(_filename), queue(_queue), time(_time)
00088 {
00089 }
00090 
00091 // RecorderOptions
00092 
00093 RecorderOptions::RecorderOptions() :
00094     trigger(false),
00095     record_all(false),
00096     regex(false),
00097     do_exclude(false),
00098     quiet(false),
00099     append_date(true),
00100     snapshot(false),
00101     verbose(false),
00102     compression(compression::Uncompressed),
00103     prefix(""),
00104     name(""),
00105     exclude_regex(),
00106     buffer_size(1048576 * 256),
00107     chunk_size(1024 * 768),
00108     limit(0),
00109     split(false),
00110     max_size(0),
00111     max_duration(-1.0),
00112     node("")
00113 {
00114 }
00115 
00116 // Recorder
00117 
00118 Recorder::Recorder(RecorderOptions const& options) :
00119     options_(options),
00120     num_subscribers_(0),
00121     exit_code_(0),
00122     queue_size_(0),
00123     split_count_(0),
00124     writing_enabled_(true)
00125 {
00126 }
00127 
00128 int Recorder::run() {
00129     if (options_.trigger) {
00130         doTrigger();
00131         return 0;
00132     }
00133 
00134     if (options_.topics.size() == 0) {
00135         // Make sure limit is not specified with automatic topic subscription
00136         if (options_.limit > 0) {
00137             fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00138             return 1;
00139         }
00140 
00141         // Make sure topics are specified
00142         if (!options_.record_all && (options_.node == std::string(""))) {
00143             fprintf(stderr, "No topics specified.\n");
00144             return 1;
00145         }
00146     }
00147 
00148     ros::NodeHandle nh;
00149     if (!nh.ok())
00150         return 0;
00151 
00152     last_buffer_warn_ = Time();
00153     queue_ = new std::queue<OutgoingMessage>;
00154 
00155     // Subscribe to each topic
00156     if (!options_.regex) {
00157         foreach(string const& topic, options_.topics)
00158                         subscribe(topic);
00159     }
00160 
00161     if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00162       ROS_WARN("/use_sim_time set to true and no clock published.  Still waiting for valid time...");
00163 
00164     ros::Time::waitForValid();
00165 
00166     start_time_ = ros::Time::now();
00167 
00168     // Don't bother doing anything if we never got a valid time
00169     if (!nh.ok())
00170         return 0;
00171 
00172     ros::Subscriber trigger_sub;
00173 
00174     // Spin up a thread for writing to the file
00175     boost::thread record_thread;
00176     if (options_.snapshot)
00177     {
00178         record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00179 
00180         // Subscribe to the snapshot trigger
00181         trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00182     }
00183     else
00184         record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00185 
00186 
00187 
00188     ros::Timer check_master_timer;
00189     if (options_.record_all || options_.regex || (options_.node != std::string("")))
00190     {
00191         // check for master first
00192         doCheckMaster(ros::TimerEvent(), nh);
00193         check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00194     }
00195 
00196     ros::MultiThreadedSpinner s(10);
00197     ros::spin(s);
00198 
00199     queue_condition_.notify_all();
00200 
00201     record_thread.join();
00202 
00203     delete queue_;
00204 
00205     return exit_code_;
00206 }
00207 
00208 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00209         ROS_INFO("Subscribing to %s", topic.c_str());
00210 
00211     ros::NodeHandle nh;
00212     shared_ptr<int> count(new int(options_.limit));
00213     shared_ptr<ros::Subscriber> sub(new ros::Subscriber);
00214     *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00215     currently_recording_.insert(topic);
00216     num_subscribers_++;
00217 
00218     return sub;
00219 }
00220 
00221 bool Recorder::isSubscribed(string const& topic) const {
00222     return currently_recording_.find(topic) != currently_recording_.end();
00223 }
00224 
00225 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00226     // ignore already known topics
00227     if (isSubscribed(topic)) {
00228         return false;
00229     }
00230 
00231     // subtract exclusion regex, if any
00232     if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00233         return false;
00234     }
00235 
00236     if(options_.record_all || from_node) {
00237         return true;
00238     }
00239     
00240     if (options_.regex) {
00241         // Treat the topics as regular expressions
00242         foreach(string const& regex_str, options_.topics) {
00243             boost::regex e(regex_str);
00244             boost::smatch what;
00245             if (boost::regex_match(topic, what, e, boost::match_extra))
00246                 return true;
00247         }
00248     }
00249     else {
00250         foreach(string const& t, options_.topics)
00251             if (t == topic)
00252                 return true;
00253     }
00254     
00255     return false;
00256 }
00257 
00258 template<class T>
00259 std::string Recorder::timeToStr(T ros_t)
00260 {
00261     std::stringstream msg;
00262     const boost::posix_time::ptime now=
00263         boost::posix_time::second_clock::local_time();
00264     boost::posix_time::time_facet *const f=
00265         new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
00266     msg.imbue(std::locale(msg.getloc(),f));
00267     msg << now;
00268     return msg.str();
00269 }
00270 
00272 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00273     //void Recorder::doQueue(topic_tools::ShapeShifter::ConstPtr msg, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00274     Time rectime = Time::now();
00275     
00276     if (options_.verbose)
00277         cout << "Received message on topic " << subscriber->getTopic() << endl;
00278 
00279     OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00280     
00281     {
00282         boost::mutex::scoped_lock lock(queue_mutex_);
00283 
00284         queue_->push(out);
00285         queue_size_ += out.msg->size();
00286         
00287         // Check to see if buffer has been exceeded
00288         while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00289             OutgoingMessage drop = queue_->front();
00290             queue_->pop();
00291             queue_size_ -= drop.msg->size();
00292 
00293             if (!options_.snapshot) {
00294                 Time now = Time::now();
00295                 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00296                     ROS_WARN("rosbag record buffer exceeded.  Dropping oldest queued message.");
00297                     last_buffer_warn_ = now;
00298                 }
00299             }
00300         }
00301     }
00302   
00303     if (!options_.snapshot)
00304         queue_condition_.notify_all();
00305 
00306     // If we are book-keeping count, decrement and possibly shutdown
00307     if ((*count) > 0) {
00308         (*count)--;
00309         if ((*count) == 0) {
00310             subscriber->shutdown();
00311 
00312             num_subscribers_--;
00313 
00314             if (num_subscribers_ == 0)
00315                 ros::shutdown();
00316         }
00317     }
00318 }
00319 
00320 void Recorder::updateFilenames() {
00321     vector<string> parts;
00322 
00323     std::string prefix = options_.prefix;
00324     uint32_t ind = prefix.rfind(".bag");
00325 
00326     if (ind == prefix.size() - 4)
00327     {
00328       prefix.erase(ind);
00329       ind = prefix.rfind(".bag");
00330     }
00331 
00332     if (prefix.length() > 0)
00333         parts.push_back(prefix);
00334     if (options_.append_date)
00335         parts.push_back(timeToStr(ros::WallTime::now()));
00336     if (options_.split)
00337         parts.push_back(boost::lexical_cast<string>(split_count_));
00338 
00339     target_filename_ = parts[0];
00340     for (unsigned int i = 1; i < parts.size(); i++)
00341         target_filename_ += string("_") + parts[i];
00342 
00343     target_filename_ += string(".bag");
00344     write_filename_ = target_filename_ + string(".active");
00345 }
00346 
00348 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00349     updateFilenames();
00350     
00351     ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00352     
00353     {
00354         boost::mutex::scoped_lock lock(queue_mutex_);
00355         queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00356         queue_      = new std::queue<OutgoingMessage>;
00357         queue_size_ = 0;
00358     }
00359 
00360     queue_condition_.notify_all();
00361 }
00362 
00363 void Recorder::startWriting() {
00364     bag_.setCompression(options_.compression);
00365     bag_.setChunkThreshold(options_.chunk_size);
00366 
00367     updateFilenames();
00368     try {
00369         bag_.open(write_filename_, bagmode::Write);
00370     }
00371     catch (rosbag::BagException e) {
00372         ROS_ERROR("Error writing: %s", e.what());
00373         exit_code_ = 1;
00374         ros::shutdown();
00375     }
00376     ROS_INFO("Recording to %s.", target_filename_.c_str());
00377 }
00378 
00379 void Recorder::stopWriting() {
00380     ROS_INFO("Closing %s.", target_filename_.c_str());
00381     bag_.close();
00382     rename(write_filename_.c_str(), target_filename_.c_str());
00383 }
00384 
00385 bool Recorder::checkSize()
00386 {
00387     if (options_.max_size > 0)
00388     {
00389         if (bag_.getSize() > options_.max_size)
00390         {
00391             if (options_.split)
00392             {
00393                 stopWriting();
00394                 split_count_++;
00395                 startWriting();
00396             } else {
00397                 ros::shutdown();
00398                 return true;
00399             }
00400         }
00401     }
00402     return false;
00403 }
00404 
00405 bool Recorder::checkDuration(const ros::Time& t)
00406 {
00407     if (options_.max_duration > ros::Duration(0))
00408     {
00409         if (t - start_time_ > options_.max_duration)
00410         {
00411             if (options_.split)
00412             {
00413                 while (start_time_ + options_.max_duration < t)
00414                 {
00415                     stopWriting();
00416                     split_count_++;
00417                     start_time_ += options_.max_duration;
00418                     startWriting();
00419                 }
00420             } else {
00421                 ros::shutdown();
00422                 return true;
00423             }
00424         }
00425     }
00426     return false;
00427 }
00428 
00429 
00431 void Recorder::doRecord() {
00432     // Open bag file for writing
00433     startWriting();
00434 
00435     // Schedule the disk space check
00436     warn_next_ = ros::WallTime();
00437     checkDisk();
00438     check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00439 
00440     // Technically the queue_mutex_ should be locked while checking empty.
00441     // Except it should only get checked if the node is not ok, and thus
00442     // it shouldn't be in contention.
00443     ros::NodeHandle nh;
00444     while (nh.ok() || !queue_->empty()) {
00445         boost::unique_lock<boost::mutex> lock(queue_mutex_);
00446 
00447         bool finished = false;
00448         while (queue_->empty()) {
00449             if (!nh.ok()) {
00450                 lock.release()->unlock();
00451                 finished = true;
00452                 break;
00453             }
00454             boost::xtime xt;
00455 #if BOOST_VERSION >= 105000
00456             boost::xtime_get(&xt, boost::TIME_UTC_);
00457 #else
00458             boost::xtime_get(&xt, boost::TIME_UTC);
00459 #endif
00460             xt.nsec += 250000000;
00461             queue_condition_.timed_wait(lock, xt);
00462             if (checkDuration(ros::Time::now()))
00463             {
00464                 finished = true;
00465                 break;
00466             }
00467         }
00468         if (finished)
00469             break;
00470 
00471         OutgoingMessage out = queue_->front();
00472         queue_->pop();
00473         queue_size_ -= out.msg->size();
00474         
00475         lock.release()->unlock();
00476         
00477         if (checkSize())
00478             break;
00479 
00480         if (checkDuration(out.time))
00481             break;
00482 
00483         if (scheduledCheckDisk() && checkLogging())
00484             bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00485     }
00486 
00487     stopWriting();
00488 }
00489 
00490 void Recorder::doRecordSnapshotter() {
00491     ros::NodeHandle nh;
00492   
00493     while (nh.ok() || !queue_queue_.empty()) {
00494         boost::unique_lock<boost::mutex> lock(queue_mutex_);
00495         while (queue_queue_.empty()) {
00496             if (!nh.ok())
00497                 return;
00498             queue_condition_.wait(lock);
00499         }
00500         
00501         OutgoingQueue out_queue = queue_queue_.front();
00502         queue_queue_.pop();
00503         
00504         lock.release()->unlock();
00505         
00506         string target_filename = out_queue.filename;
00507         string write_filename  = target_filename + string(".active");
00508         
00509         try {
00510             bag_.open(write_filename, bagmode::Write);
00511         }
00512         catch (rosbag::BagException ex) {
00513             ROS_ERROR("Error writing: %s", ex.what());
00514             return;
00515         }
00516 
00517         while (!out_queue.queue->empty()) {
00518             OutgoingMessage out = out_queue.queue->front();
00519             out_queue.queue->pop();
00520 
00521             bag_.write(out.topic, out.time, *out.msg);
00522         }
00523 
00524         stopWriting();
00525     }
00526 }
00527 
00528 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00529     ros::master::V_TopicInfo topics;
00530     if (ros::master::getTopics(topics)) {
00531                 foreach(ros::master::TopicInfo const& t, topics) {
00532                         if (shouldSubscribeToTopic(t.name))
00533                                 subscribe(t.name);
00534                 }
00535     }
00536     
00537     if (options_.node != std::string(""))
00538     {
00539 
00540       XmlRpc::XmlRpcValue req;
00541       req[0] = ros::this_node::getName();
00542       req[1] = options_.node;
00543       XmlRpc::XmlRpcValue resp;
00544       XmlRpc::XmlRpcValue payload;
00545 
00546       if (ros::master::execute("lookupNode", req, resp, payload, true))
00547       {
00548         std::string peer_host;
00549         uint32_t peer_port;
00550 
00551         if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00552         {
00553           ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00554         } else {
00555 
00556           XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00557           XmlRpc::XmlRpcValue req;
00558           XmlRpc::XmlRpcValue resp;
00559           req[0] = ros::this_node::getName();
00560           c.execute("getSubscriptions", req, resp);
00561           
00562           if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1)
00563           {
00564             for(int i = 0; i < resp[2].size(); i++)
00565             {
00566               if (shouldSubscribeToTopic(resp[2][i][0], true))
00567                 subscribe(resp[2][i][0]);
00568             }
00569           } else {
00570             ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00571           }
00572         }
00573       }
00574     }
00575 }
00576 
00577 void Recorder::doTrigger() {
00578     ros::NodeHandle nh;
00579     ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00580     pub.publish(std_msgs::Empty());
00581 
00582     ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00583     ros::spin();
00584 }
00585 
00586 bool Recorder::scheduledCheckDisk() {
00587     boost::mutex::scoped_lock lock(check_disk_mutex_);
00588 
00589     if (ros::WallTime::now() < check_disk_next_)
00590         return true;
00591 
00592     check_disk_next_ += ros::WallDuration().fromSec(20.0);
00593     return checkDisk();
00594 }
00595 
00596 bool Recorder::checkDisk() {
00597 #if BOOST_FILESYSTEM_VERSION < 3
00598     struct statvfs fiData;
00599     if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0)
00600     {
00601         ROS_WARN("Failed to check filesystem stats.");
00602         return true;
00603     }
00604     unsigned long long free_space = 0;
00605     free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00606     if (free_space < 1073741824ull)
00607     {
00608         ROS_ERROR("Less than 1GB of space free on disk with %s.  Disabling recording.", bag_.getFileName().c_str());
00609         writing_enabled_ = false;
00610         return false;
00611     }
00612     else if (free_space < 5368709120ull)
00613     {
00614         ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00615     }
00616     else
00617     {
00618         writing_enabled_ = true;
00619     }
00620 #else
00621     boost::filesystem::path p(boost::filesystem::system_complete(bag_.getFileName().c_str()));
00622     p = p.parent_path();
00623     boost::filesystem::space_info info;
00624     try
00625     {
00626         info = boost::filesystem::space(p);
00627     }
00628     catch (boost::filesystem::filesystem_error &e) 
00629     { 
00630         ROS_WARN("Failed to check filesystem stats [%s].", e.what());
00631         writing_enabled_ = false;
00632         return false;
00633     }
00634     if ( info.available < 1073741824ull)
00635     {
00636         ROS_ERROR("Less than 1GB of space free on disk with %s.  Disabling recording.", bag_.getFileName().c_str());
00637         writing_enabled_ = false;
00638         return false;
00639     }
00640     else if (info.available < 5368709120ull)
00641     {
00642         ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00643         writing_enabled_ = true;
00644     }
00645     else
00646     {
00647         writing_enabled_ = true;
00648     }
00649 #endif
00650     return true;
00651 }
00652 
00653 bool Recorder::checkLogging() {
00654     if (writing_enabled_)
00655         return true;
00656 
00657     ros::WallTime now = ros::WallTime::now();
00658     if (now >= warn_next_) {
00659         warn_next_ = now + ros::WallDuration().fromSec(5.0);
00660         ROS_WARN("Not logging message because logging disabled.  Most likely cause is a full disk.");
00661     }
00662     return false;
00663 }
00664 
00665 } // namespace rosbag


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Fri Aug 28 2015 12:33:52