recorder.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2008, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage, Inc. nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 ********************************************************************/
00034 
00035 #include "rosbag/recorder.h"
00036 
00037 #include <sys/stat.h>
00038 #include <boost/filesystem.hpp>
00039 // Boost filesystem v3 is default in 1.46.0 and above
00040 // Fallback to original posix code (*nix only) if this is not true
00041 #if BOOST_FILESYSTEM_VERSION < 3
00042   #include <sys/statvfs.h>
00043 #endif
00044 #include <time.h>
00045 
00046 #include <queue>
00047 #include <set>
00048 #include <sstream>
00049 #include <string>
00050 
00051 #include <boost/foreach.hpp>
00052 #include <boost/lexical_cast.hpp>
00053 #include <boost/regex.hpp>
00054 #include <boost/thread.hpp>
00055 #include <boost/thread/xtime.hpp>
00056 #include <boost/date_time/local_time/local_time.hpp>
00057 
00058 #include <ros/ros.h>
00059 #include <topic_tools/shape_shifter.h>
00060 
00061 #include "ros/network.h"
00062 #include "ros/xmlrpc_manager.h"
00063 #include "XmlRpc.h"
00064 
00065 #define foreach BOOST_FOREACH
00066 
00067 using std::cout;
00068 using std::endl;
00069 using std::set;
00070 using std::string;
00071 using std::vector;
00072 using boost::shared_ptr;
00073 using ros::Time;
00074 
00075 namespace rosbag {
00076 
00077 // OutgoingMessage
00078 
00079 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00080     topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00081 {
00082 }
00083 
00084 // OutgoingQueue
00085 
00086 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00087     filename(_filename), queue(_queue), time(_time)
00088 {
00089 }
00090 
00091 // RecorderOptions
00092 
00093 RecorderOptions::RecorderOptions() :
00094     trigger(false),
00095     record_all(false),
00096     regex(false),
00097     do_exclude(false),
00098     quiet(false),
00099     append_date(true),
00100     snapshot(false),
00101     verbose(false),
00102     compression(compression::Uncompressed),
00103     prefix(""),
00104     name(""),
00105     exclude_regex(),
00106     buffer_size(1048576 * 256),
00107     chunk_size(1024 * 768),
00108     limit(0),
00109     split(false),
00110     max_size(0),
00111     max_duration(-1.0),
00112     node("")
00113 {
00114 }
00115 
00116 // Recorder
00117 
00118 Recorder::Recorder(RecorderOptions const& options) :
00119     options_(options),
00120     num_subscribers_(0),
00121     exit_code_(0),
00122     queue_size_(0),
00123     split_count_(0),
00124     writing_enabled_(true)
00125 {
00126 }
00127 
00128 int Recorder::run() {
00129     if (options_.trigger) {
00130         doTrigger();
00131         return 0;
00132     }
00133 
00134     if (options_.topics.size() == 0) {
00135         // Make sure limit is not specified with automatic topic subscription
00136         if (options_.limit > 0) {
00137             fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00138             return 1;
00139         }
00140 
00141         // Make sure topics are specified
00142         if (!options_.record_all && (options_.node == std::string(""))) {
00143             fprintf(stderr, "No topics specified.\n");
00144             return 1;
00145         }
00146     }
00147 
00148     ros::NodeHandle nh;
00149     if (!nh.ok())
00150         return 0;
00151 
00152     last_buffer_warn_ = Time();
00153     queue_ = new std::queue<OutgoingMessage>;
00154 
00155     // Subscribe to each topic
00156     if (!options_.regex) {
00157         foreach(string const& topic, options_.topics)
00158                         subscribe(topic);
00159     }
00160 
00161     if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00162       ROS_WARN("/use_sim_time set to true and no clock published.  Still waiting for valid time...");
00163 
00164     ros::Time::waitForValid();
00165 
00166     start_time_ = ros::Time::now();
00167 
00168     // Don't bother doing anything if we never got a valid time
00169     if (!nh.ok())
00170         return 0;
00171 
00172     ros::Subscriber trigger_sub;
00173 
00174     // Spin up a thread for writing to the file
00175     boost::thread record_thread;
00176     if (options_.snapshot)
00177     {
00178         record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00179 
00180         // Subscribe to the snapshot trigger
00181         trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00182     }
00183     else
00184         record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00185 
00186 
00187 
00188     ros::Timer check_master_timer;
00189     if (options_.record_all || options_.regex || (options_.node != std::string("")))
00190         check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00191 
00192     ros::MultiThreadedSpinner s(10);
00193     ros::spin(s);
00194 
00195     queue_condition_.notify_all();
00196 
00197     record_thread.join();
00198 
00199     delete queue_;
00200 
00201     return exit_code_;
00202 }
00203 
00204 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00205         ROS_INFO("Subscribing to %s", topic.c_str());
00206 
00207     ros::NodeHandle nh;
00208     shared_ptr<int> count(new int(options_.limit));
00209     shared_ptr<ros::Subscriber> sub(new ros::Subscriber);
00210     *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00211     currently_recording_.insert(topic);
00212     num_subscribers_++;
00213 
00214     return sub;
00215 }
00216 
00217 bool Recorder::isSubscribed(string const& topic) const {
00218     return currently_recording_.find(topic) != currently_recording_.end();
00219 }
00220 
00221 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00222     // ignore already known topics
00223     if (isSubscribed(topic)) {
00224         return false;
00225     }
00226 
00227     // subtract exclusion regex, if any
00228     if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00229         return false;
00230     }
00231 
00232     if(options_.record_all || from_node) {
00233         return true;
00234     }
00235     
00236     if (options_.regex) {
00237         // Treat the topics as regular expressions
00238         foreach(string const& regex_str, options_.topics) {
00239             boost::regex e(regex_str);
00240             boost::smatch what;
00241             if (boost::regex_match(topic, what, e, boost::match_extra))
00242                 return true;
00243         }
00244     }
00245     else {
00246         foreach(string const& t, options_.topics)
00247             if (t == topic)
00248                 return true;
00249     }
00250     
00251     return false;
00252 }
00253 
00254 template<class T>
00255 std::string Recorder::timeToStr(T ros_t)
00256 {
00257     std::stringstream msg;
00258     const boost::posix_time::ptime now=
00259         boost::posix_time::second_clock::local_time();
00260     boost::posix_time::time_facet *const f=
00261         new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
00262     msg.imbue(std::locale(msg.getloc(),f));
00263     msg << now;
00264     return msg.str();
00265 }
00266 
00268 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00269     //void Recorder::doQueue(topic_tools::ShapeShifter::ConstPtr msg, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00270     Time rectime = Time::now();
00271     
00272     if (options_.verbose)
00273         cout << "Received message on topic " << subscriber->getTopic() << endl;
00274 
00275     OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00276     
00277     {
00278         boost::mutex::scoped_lock lock(queue_mutex_);
00279 
00280         queue_->push(out);
00281         queue_size_ += out.msg->size();
00282         
00283         // Check to see if buffer has been exceeded
00284         while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00285             OutgoingMessage drop = queue_->front();
00286             queue_->pop();
00287             queue_size_ -= drop.msg->size();
00288 
00289             if (!options_.snapshot) {
00290                 Time now = Time::now();
00291                 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00292                     ROS_WARN("rosbag record buffer exceeded.  Dropping oldest queued message.");
00293                     last_buffer_warn_ = now;
00294                 }
00295             }
00296         }
00297     }
00298   
00299     if (!options_.snapshot)
00300         queue_condition_.notify_all();
00301 
00302     // If we are book-keeping count, decrement and possibly shutdown
00303     if ((*count) > 0) {
00304         (*count)--;
00305         if ((*count) == 0) {
00306             subscriber->shutdown();
00307 
00308             num_subscribers_--;
00309 
00310             if (num_subscribers_ == 0)
00311                 ros::shutdown();
00312         }
00313     }
00314 }
00315 
00316 void Recorder::updateFilenames() {
00317     vector<string> parts;
00318 
00319     std::string prefix = options_.prefix;
00320     uint32_t ind = prefix.rfind(".bag");
00321 
00322     if (ind == prefix.size() - 4)
00323     {
00324       prefix.erase(ind);
00325       ind = prefix.rfind(".bag");
00326     }
00327 
00328     if (prefix.length() > 0)
00329         parts.push_back(prefix);
00330     if (options_.append_date)
00331         parts.push_back(timeToStr(ros::WallTime::now()));
00332     if (options_.split)
00333         parts.push_back(boost::lexical_cast<string>(split_count_));
00334 
00335     target_filename_ = parts[0];
00336     for (unsigned int i = 1; i < parts.size(); i++)
00337         target_filename_ += string("_") + parts[i];
00338 
00339     target_filename_ += string(".bag");
00340     write_filename_ = target_filename_ + string(".active");
00341 }
00342 
00344 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00345     updateFilenames();
00346     
00347     ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00348     
00349     {
00350         boost::mutex::scoped_lock lock(queue_mutex_);
00351         queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00352         queue_      = new std::queue<OutgoingMessage>;
00353         queue_size_ = 0;
00354     }
00355 
00356     queue_condition_.notify_all();
00357 }
00358 
00359 void Recorder::startWriting() {
00360     bag_.setCompression(options_.compression);
00361     bag_.setChunkThreshold(options_.chunk_size);
00362 
00363     updateFilenames();
00364     try {
00365         bag_.open(write_filename_, bagmode::Write);
00366     }
00367     catch (rosbag::BagException e) {
00368         ROS_ERROR("Error writing: %s", e.what());
00369         exit_code_ = 1;
00370         ros::shutdown();
00371     }
00372     ROS_INFO("Recording to %s.", target_filename_.c_str());
00373 }
00374 
00375 void Recorder::stopWriting() {
00376     ROS_INFO("Closing %s.", target_filename_.c_str());
00377     bag_.close();
00378     rename(write_filename_.c_str(), target_filename_.c_str());
00379 }
00380 
00381 bool Recorder::checkSize()
00382 {
00383     if (options_.max_size > 0)
00384     {
00385         if (bag_.getSize() > options_.max_size)
00386         {
00387             if (options_.split)
00388             {
00389                 stopWriting();
00390                 split_count_++;
00391                 startWriting();
00392             } else {
00393                 ros::shutdown();
00394                 return true;
00395             }
00396         }
00397     }
00398     return false;
00399 }
00400 
00401 bool Recorder::checkDuration(const ros::Time& t)
00402 {
00403     if (options_.max_duration > ros::Duration(0))
00404     {
00405         if (t - start_time_ > options_.max_duration)
00406         {
00407             if (options_.split)
00408             {
00409                 while (start_time_ + options_.max_duration < t)
00410                 {
00411                     stopWriting();
00412                     split_count_++;
00413                     start_time_ += options_.max_duration;
00414                     startWriting();
00415                 }
00416             } else {
00417                 ros::shutdown();
00418                 return true;
00419             }
00420         }
00421     }
00422     return false;
00423 }
00424 
00425 
00427 void Recorder::doRecord() {
00428     // Open bag file for writing
00429     startWriting();
00430 
00431     // Schedule the disk space check
00432     warn_next_ = ros::WallTime();
00433     checkDisk();
00434     check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00435 
00436     // Technically the queue_mutex_ should be locked while checking empty.
00437     // Except it should only get checked if the node is not ok, and thus
00438     // it shouldn't be in contention.
00439     ros::NodeHandle nh;
00440     while (nh.ok() || !queue_->empty()) {
00441         boost::unique_lock<boost::mutex> lock(queue_mutex_);
00442 
00443         bool finished = false;
00444         while (queue_->empty()) {
00445             if (!nh.ok()) {
00446                 lock.release()->unlock();
00447                 finished = true;
00448                 break;
00449             }
00450             boost::xtime xt;
00451 #if BOOST_VERSION >= 105000
00452             boost::xtime_get(&xt, boost::TIME_UTC_);
00453 #else
00454             boost::xtime_get(&xt, boost::TIME_UTC);
00455 #endif
00456             xt.nsec += 250000000;
00457             queue_condition_.timed_wait(lock, xt);
00458             if (checkDuration(ros::Time::now()))
00459             {
00460                 finished = true;
00461                 break;
00462             }
00463         }
00464         if (finished)
00465             break;
00466 
00467         OutgoingMessage out = queue_->front();
00468         queue_->pop();
00469         queue_size_ -= out.msg->size();
00470         
00471         lock.release()->unlock();
00472         
00473         if (checkSize())
00474             break;
00475 
00476         if (checkDuration(out.time))
00477             break;
00478 
00479         if (scheduledCheckDisk() && checkLogging())
00480             bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00481     }
00482 
00483     stopWriting();
00484 }
00485 
00486 void Recorder::doRecordSnapshotter() {
00487     ros::NodeHandle nh;
00488   
00489     while (nh.ok() || !queue_queue_.empty()) {
00490         boost::unique_lock<boost::mutex> lock(queue_mutex_);
00491         while (queue_queue_.empty()) {
00492             if (!nh.ok())
00493                 return;
00494             queue_condition_.wait(lock);
00495         }
00496         
00497         OutgoingQueue out_queue = queue_queue_.front();
00498         queue_queue_.pop();
00499         
00500         lock.release()->unlock();
00501         
00502         string target_filename = out_queue.filename;
00503         string write_filename  = target_filename + string(".active");
00504         
00505         try {
00506             bag_.open(write_filename, bagmode::Write);
00507         }
00508         catch (rosbag::BagException ex) {
00509             ROS_ERROR("Error writing: %s", ex.what());
00510             return;
00511         }
00512 
00513         while (!out_queue.queue->empty()) {
00514             OutgoingMessage out = out_queue.queue->front();
00515             out_queue.queue->pop();
00516 
00517             bag_.write(out.topic, out.time, *out.msg);
00518         }
00519 
00520         stopWriting();
00521     }
00522 }
00523 
00524 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00525     ros::master::V_TopicInfo topics;
00526     if (ros::master::getTopics(topics)) {
00527                 foreach(ros::master::TopicInfo const& t, topics) {
00528                         if (shouldSubscribeToTopic(t.name))
00529                                 subscribe(t.name);
00530                 }
00531     }
00532     
00533     if (options_.node != std::string(""))
00534     {
00535 
00536       XmlRpc::XmlRpcValue req;
00537       req[0] = ros::this_node::getName();
00538       req[1] = options_.node;
00539       XmlRpc::XmlRpcValue resp;
00540       XmlRpc::XmlRpcValue payload;
00541 
00542       if (ros::master::execute("lookupNode", req, resp, payload, true))
00543       {
00544         std::string peer_host;
00545         uint32_t peer_port;
00546 
00547         if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00548         {
00549           ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00550         } else {
00551 
00552           XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00553           XmlRpc::XmlRpcValue req;
00554           XmlRpc::XmlRpcValue resp;
00555           req[0] = ros::this_node::getName();
00556           c.execute("getSubscriptions", req, resp);
00557           
00558           if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1)
00559           {
00560             for(int i = 0; i < resp[2].size(); i++)
00561             {
00562               if (shouldSubscribeToTopic(resp[2][i][0], true))
00563                 subscribe(resp[2][i][0]);
00564             }
00565           } else {
00566             ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00567           }
00568         }
00569       }
00570     }
00571 }
00572 
00573 void Recorder::doTrigger() {
00574     ros::NodeHandle nh;
00575     ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00576     pub.publish(std_msgs::Empty());
00577 
00578     ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00579     ros::spin();
00580 }
00581 
00582 bool Recorder::scheduledCheckDisk() {
00583     boost::mutex::scoped_lock lock(check_disk_mutex_);
00584 
00585     if (ros::WallTime::now() < check_disk_next_)
00586         return true;
00587 
00588     check_disk_next_ += ros::WallDuration().fromSec(20.0);
00589     return checkDisk();
00590 }
00591 
00592 bool Recorder::checkDisk() {
00593 #if BOOST_FILESYSTEM_VERSION < 3
00594     struct statvfs fiData;
00595     if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0)
00596     {
00597         ROS_WARN("Failed to check filesystem stats.");
00598         return true;
00599     }
00600     unsigned long long free_space = 0;
00601     free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00602     if (free_space < 1073741824ull)
00603     {
00604         ROS_ERROR("Less than 1GB of space free on disk with %s.  Disabling recording.", bag_.getFileName().c_str());
00605         writing_enabled_ = false;
00606         return false;
00607     }
00608     else if (free_space < 5368709120ull)
00609     {
00610         ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00611     }
00612     else
00613     {
00614         writing_enabled_ = true;
00615     }
00616 #else
00617     boost::filesystem::path p(boost::filesystem::system_complete(bag_.getFileName().c_str()));
00618     p = p.parent_path();
00619     boost::filesystem::space_info info;
00620     try
00621     {
00622         info = boost::filesystem::space(p);
00623     }
00624     catch (boost::filesystem::filesystem_error &e) 
00625     { 
00626         ROS_WARN("Failed to check filesystem stats [%s].", e.what());
00627         writing_enabled_ = false;
00628         return false;
00629     }
00630     if ( info.available < 1073741824ull)
00631     {
00632         ROS_ERROR("Less than 1GB of space free on disk with %s.  Disabling recording.", bag_.getFileName().c_str());
00633         writing_enabled_ = false;
00634         return false;
00635     }
00636     else if (info.available < 5368709120ull)
00637     {
00638         ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00639         writing_enabled_ = true;
00640     }
00641     else
00642     {
00643         writing_enabled_ = true;
00644     }
00645 #endif
00646     return true;
00647 }
00648 
00649 bool Recorder::checkLogging() {
00650     if (writing_enabled_)
00651         return true;
00652 
00653     ros::WallTime now = ros::WallTime::now();
00654     if (now >= warn_next_) {
00655         warn_next_ = now + ros::WallDuration().fromSec(5.0);
00656         ROS_WARN("Not logging message because logging disabled.  Most likely cause is a full disk.");
00657     }
00658     return false;
00659 }
00660 
00661 } // namespace rosbag


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Mon Oct 6 2014 11:47:09