recorder.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2008, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage, Inc. nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 ********************************************************************/
00034 
00035 #include "rosbag/recorder.h"
00036 
00037 #include <sys/stat.h>
00038 #include <sys/statvfs.h>
00039 #include <time.h>
00040 
00041 #include <queue>
00042 #include <set>
00043 #include <sstream>
00044 #include <string>
00045 
00046 #include <boost/foreach.hpp>
00047 #include <boost/lexical_cast.hpp>
00048 #include <boost/regex.hpp>
00049 #include <boost/thread.hpp>
00050 #include <boost/thread/xtime.hpp>
00051 
00052 #include <ros/ros.h>
00053 #include <topic_tools/shape_shifter.h>
00054 
00055 #include "ros/network.h"
00056 #include "ros/xmlrpc_manager.h"
00057 #include "XmlRpc.h"
00058 
00059 #define foreach BOOST_FOREACH
00060 
00061 using std::cout;
00062 using std::endl;
00063 using std::set;
00064 using std::string;
00065 using std::vector;
00066 using boost::shared_ptr;
00067 using ros::Time;
00068 
00069 namespace rosbag {
00070 
00071 // OutgoingMessage
00072 
00073 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00074     topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00075 {
00076 }
00077 
00078 // OutgoingQueue
00079 
00080 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00081     filename(_filename), queue(_queue), time(_time)
00082 {
00083 }
00084 
00085 // RecorderOptions
00086 
00087 RecorderOptions::RecorderOptions() :
00088     trigger(false),
00089     record_all(false),
00090     regex(false),
00091     do_exclude(false),
00092     quiet(false),
00093     append_date(true),
00094     snapshot(false),
00095     verbose(false),
00096     compression(compression::Uncompressed),
00097     prefix(""),
00098     name(""),
00099     exclude_regex(),
00100     buffer_size(1048576 * 256),
00101     limit(0),
00102     split(false),
00103     max_size(0),
00104     max_duration(-1.0),
00105     node("")
00106 {
00107 }
00108 
00109 // Recorder
00110 
00111 Recorder::Recorder(RecorderOptions const& options) :
00112     options_(options),
00113     num_subscribers_(0),
00114     exit_code_(0),
00115     queue_size_(0),
00116     split_count_(0),
00117     writing_enabled_(true)
00118 {
00119 }
00120 
00121 int Recorder::run() {
00122     if (options_.trigger) {
00123         doTrigger();
00124         return 0;
00125     }
00126 
00127     if (options_.topics.size() == 0) {
00128         // Make sure limit is not specified with automatic topic subscription
00129         if (options_.limit > 0) {
00130             fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00131             return 1;
00132         }
00133 
00134         // Make sure topics are specified
00135         if (!options_.record_all && (options_.node == std::string(""))) {
00136             fprintf(stderr, "No topics specified.\n");
00137             return 1;
00138         }
00139     }
00140 
00141     ros::NodeHandle nh;
00142     if (!nh.ok())
00143         return 0;
00144 
00145     last_buffer_warn_ = Time();
00146     queue_ = new std::queue<OutgoingMessage>;
00147 
00148     // Subscribe to each topic
00149     if (!options_.regex) {
00150         foreach(string const& topic, options_.topics)
00151                         subscribe(topic);
00152     }
00153 
00154     if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00155       ROS_WARN("/use_sim_time set to true and no clock published.  Still waiting for valid time...");
00156 
00157     ros::Time::waitForValid();
00158 
00159     start_time_ = ros::Time::now();
00160 
00161     // Don't bother doing anything if we never got a valid time
00162     if (!nh.ok())
00163         return 0;
00164 
00165     ros::Subscriber trigger_sub;
00166 
00167     // Spin up a thread for writing to the file
00168     boost::thread record_thread;
00169     if (options_.snapshot)
00170     {
00171         record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00172 
00173         // Subscribe to the snapshot trigger
00174         trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00175     }
00176     else
00177         record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00178 
00179 
00180 
00181     ros::Timer check_master_timer;
00182     if (options_.record_all || options_.regex || (options_.node != std::string("")))
00183         check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00184 
00185     ros::MultiThreadedSpinner s(10);
00186     ros::spin(s);
00187 
00188     queue_condition_.notify_all();
00189 
00190     record_thread.join();
00191 
00192     delete queue_;
00193 
00194     return exit_code_;
00195 }
00196 
00197 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00198         ROS_INFO("Subscribing to %s", topic.c_str());
00199 
00200     ros::NodeHandle nh;
00201     shared_ptr<int> count(new int(options_.limit));
00202     shared_ptr<ros::Subscriber> sub(new ros::Subscriber);
00203     *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00204     currently_recording_.insert(topic);
00205     num_subscribers_++;
00206 
00207     return sub;
00208 }
00209 
00210 bool Recorder::isSubscribed(string const& topic) const {
00211     return currently_recording_.find(topic) != currently_recording_.end();
00212 }
00213 
00214 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00215     // ignore already known topics
00216     if (isSubscribed(topic)) {
00217         return false;
00218     }
00219 
00220     // subtract exclusion regex, if any
00221     if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00222         return false;
00223     }
00224 
00225     if(options_.record_all || from_node) {
00226         return true;
00227     }
00228     
00229     if (options_.regex) {
00230         // Treat the topics as regular expressions
00231         foreach(string const& regex_str, options_.topics) {
00232             boost::regex e(regex_str);
00233             boost::smatch what;
00234             if (boost::regex_match(topic, what, e, boost::match_extra))
00235                 return true;
00236         }
00237     }
00238     else {
00239         foreach(string const& t, options_.topics)
00240             if (t == topic)
00241                 return true;
00242     }
00243     
00244     return false;
00245 }
00246 
00247 template<class T>
00248 std::string Recorder::timeToStr(T ros_t) {
00249     char buf[1024] = "";
00250     time_t t = ros_t.sec;
00251     struct tm* tms = localtime(&t);
00252     strftime(buf, 1024, "%Y-%m-%d-%H-%M-%S", tms);
00253     return string(buf);
00254 }
00255 
00257 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00258     //void Recorder::doQueue(topic_tools::ShapeShifter::ConstPtr msg, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00259     Time rectime = Time::now();
00260     
00261     if (options_.verbose)
00262         cout << "Received message on topic " << subscriber->getTopic() << endl;
00263 
00264     OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00265     
00266     {
00267         boost::mutex::scoped_lock lock(queue_mutex_);
00268 
00269         queue_->push(out);
00270         queue_size_ += out.msg->size();
00271         
00272         // Check to see if buffer has been exceeded
00273         while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00274             OutgoingMessage drop = queue_->front();
00275             queue_->pop();
00276             queue_size_ -= drop.msg->size();
00277 
00278             if (!options_.snapshot) {
00279                 Time now = Time::now();
00280                 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00281                     ROS_WARN("rosbag record buffer exceeded.  Dropping oldest queued message.");
00282                     last_buffer_warn_ = now;
00283                 }
00284             }
00285         }
00286     }
00287   
00288     if (!options_.snapshot)
00289         queue_condition_.notify_all();
00290 
00291     // If we are book-keeping count, decrement and possibly shutdown
00292     if ((*count) > 0) {
00293         (*count)--;
00294         if ((*count) == 0) {
00295             subscriber->shutdown();
00296 
00297             num_subscribers_--;
00298 
00299             if (num_subscribers_ == 0)
00300                 ros::shutdown();
00301         }
00302     }
00303 }
00304 
00305 void Recorder::updateFilenames() {
00306     vector<string> parts;
00307 
00308     std::string prefix = options_.prefix;
00309     uint32_t ind = prefix.rfind(".bag");
00310 
00311     if (ind == prefix.size() - 4)
00312     {
00313       prefix.erase(ind);
00314       ind = prefix.rfind(".bag");
00315     }
00316 
00317     if (prefix.length() > 0)
00318         parts.push_back(prefix);
00319     if (options_.append_date)
00320         parts.push_back(timeToStr(ros::WallTime::now()));
00321     if (options_.split)
00322         parts.push_back(boost::lexical_cast<string>(split_count_));
00323 
00324     target_filename_ = parts[0];
00325     for (unsigned int i = 1; i < parts.size(); i++)
00326         target_filename_ += string("_") + parts[i];
00327 
00328     target_filename_ += string(".bag");
00329     write_filename_ = target_filename_ + string(".active");
00330 }
00331 
00333 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00334     updateFilenames();
00335     
00336     ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00337     
00338     {
00339         boost::mutex::scoped_lock lock(queue_mutex_);
00340         queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00341         queue_      = new std::queue<OutgoingMessage>;
00342         queue_size_ = 0;
00343     }
00344 
00345     queue_condition_.notify_all();
00346 }
00347 
00348 void Recorder::startWriting() {
00349     bag_.setCompression(options_.compression);
00350 
00351     updateFilenames();
00352     try {
00353         bag_.open(write_filename_, bagmode::Write);
00354     }
00355     catch (rosbag::BagException e) {
00356         ROS_ERROR("Error writing: %s", e.what());
00357         exit_code_ = 1;
00358         ros::shutdown();
00359     }
00360     ROS_INFO("Recording to %s.", target_filename_.c_str());
00361 }
00362 
00363 void Recorder::stopWriting() {
00364     ROS_INFO("Closing %s.", target_filename_.c_str());
00365     bag_.close();
00366     rename(write_filename_.c_str(), target_filename_.c_str());
00367 }
00368 
00369 bool Recorder::checkSize()
00370 {
00371     if (options_.max_size > 0)
00372     {
00373         if (bag_.getSize() > options_.max_size)
00374         {
00375             if (options_.split)
00376             {
00377                 stopWriting();
00378                 split_count_++;
00379                 startWriting();
00380             } else {
00381                 ros::shutdown();
00382                 return true;
00383             }
00384         }
00385     }
00386     return false;
00387 }
00388 
00389 bool Recorder::checkDuration(const ros::Time& t)
00390 {
00391     if (options_.max_duration > ros::Duration(0))
00392     {
00393         if (t - start_time_ > options_.max_duration)
00394         {
00395             if (options_.split)
00396             {
00397                 while (start_time_ + options_.max_duration < t)
00398                 {
00399                     stopWriting();
00400                     split_count_++;
00401                     start_time_ += options_.max_duration;
00402                     startWriting();
00403                 }
00404             } else {
00405                 ros::shutdown();
00406                 return true;
00407             }
00408         }
00409     }
00410     return false;
00411 }
00412 
00413 
00415 void Recorder::doRecord() {
00416     // Open bag file for writing
00417     startWriting();
00418 
00419     // Schedule the disk space check
00420     warn_next_ = ros::WallTime();
00421     checkDisk();
00422     check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00423 
00424     // Technically the queue_mutex_ should be locked while checking empty.
00425     // Except it should only get checked if the node is not ok, and thus
00426     // it shouldn't be in contention.
00427     ros::NodeHandle nh;
00428     while (nh.ok() || !queue_->empty()) {
00429         boost::unique_lock<boost::mutex> lock(queue_mutex_);
00430 
00431         bool finished = false;
00432         while (queue_->empty()) {
00433             if (!nh.ok()) {
00434                 lock.release()->unlock();
00435                 finished = true;
00436                 break;
00437             }
00438             boost::xtime xt;
00439 #if BOOST_VERSION >= 105000
00440             boost::xtime_get(&xt, boost::TIME_UTC_);
00441 #else
00442             boost::xtime_get(&xt, boost::TIME_UTC);
00443 #endif
00444             xt.nsec += 250000000;
00445             queue_condition_.timed_wait(lock, xt);
00446             if (checkDuration(ros::Time::now()))
00447             {
00448                 finished = true;
00449                 break;
00450             }
00451         }
00452         if (finished)
00453             break;
00454 
00455         OutgoingMessage out = queue_->front();
00456         queue_->pop();
00457         queue_size_ -= out.msg->size();
00458         
00459         lock.release()->unlock();
00460         
00461         if (checkSize())
00462             break;
00463 
00464         if (checkDuration(out.time))
00465             break;
00466 
00467         if (scheduledCheckDisk() && checkLogging())
00468             bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00469     }
00470 
00471     stopWriting();
00472 }
00473 
00474 void Recorder::doRecordSnapshotter() {
00475     ros::NodeHandle nh;
00476   
00477     while (nh.ok() || !queue_queue_.empty()) {
00478         boost::unique_lock<boost::mutex> lock(queue_mutex_);
00479         while (queue_queue_.empty()) {
00480             if (!nh.ok())
00481                 return;
00482             queue_condition_.wait(lock);
00483         }
00484         
00485         OutgoingQueue out_queue = queue_queue_.front();
00486         queue_queue_.pop();
00487         
00488         lock.release()->unlock();
00489         
00490         string target_filename = out_queue.filename;
00491         string write_filename  = target_filename + string(".active");
00492         
00493         try {
00494             bag_.open(write_filename, bagmode::Write);
00495         }
00496         catch (rosbag::BagException ex) {
00497             ROS_ERROR("Error writing: %s", ex.what());
00498             return;
00499         }
00500 
00501         while (!out_queue.queue->empty()) {
00502             OutgoingMessage out = out_queue.queue->front();
00503             out_queue.queue->pop();
00504 
00505             bag_.write(out.topic, out.time, *out.msg);
00506         }
00507 
00508         stopWriting();
00509     }
00510 }
00511 
00512 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00513     ros::master::V_TopicInfo topics;
00514     if (ros::master::getTopics(topics)) {
00515                 foreach(ros::master::TopicInfo const& t, topics) {
00516                         if (shouldSubscribeToTopic(t.name))
00517                                 subscribe(t.name);
00518                 }
00519     }
00520     
00521     if (options_.node != std::string(""))
00522     {
00523 
00524       XmlRpc::XmlRpcValue req;
00525       req[0] = ros::this_node::getName();
00526       req[1] = options_.node;
00527       XmlRpc::XmlRpcValue resp;
00528       XmlRpc::XmlRpcValue payload;
00529 
00530       if (ros::master::execute("lookupNode", req, resp, payload, true))
00531       {
00532         std::string peer_host;
00533         uint32_t peer_port;
00534 
00535         if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00536         {
00537           ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00538         } else {
00539 
00540           XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00541           XmlRpc::XmlRpcValue req;
00542           XmlRpc::XmlRpcValue resp;
00543           req[0] = ros::this_node::getName();
00544           c.execute("getSubscriptions", req, resp);
00545           
00546           if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1)
00547           {
00548             for(int i = 0; i < resp[2].size(); i++)
00549             {
00550               if (shouldSubscribeToTopic(resp[2][i][0], true))
00551                 subscribe(resp[2][i][0]);
00552             }
00553           } else {
00554             ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00555           }
00556         }
00557       }
00558     }
00559 }
00560 
00561 void Recorder::doTrigger() {
00562     ros::NodeHandle nh;
00563     ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00564     pub.publish(std_msgs::Empty());
00565 
00566     ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00567     ros::spin();
00568 }
00569 
00570 bool Recorder::scheduledCheckDisk() {
00571     boost::mutex::scoped_lock lock(check_disk_mutex_);
00572 
00573     if (ros::WallTime::now() < check_disk_next_)
00574         return true;
00575 
00576     check_disk_next_ += ros::WallDuration().fromSec(20.0);
00577     return checkDisk();
00578 }
00579 
00580 bool Recorder::checkDisk() {
00581     struct statvfs fiData;
00582     if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0) {
00583         ROS_WARN("Failed to check filesystem stats.");
00584         return true;
00585     }
00586 
00587     unsigned long long free_space = 0;
00588     free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00589     if (free_space < 1073741824ull) {
00590         ROS_ERROR("Less than 1GB of space free on disk with %s.  Disabling recording.", bag_.getFileName().c_str());
00591         writing_enabled_ = false;
00592         return false;
00593     }
00594     else if (free_space < 5368709120ull) {
00595         ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00596     }
00597     else {
00598         writing_enabled_ = true;
00599     }
00600 
00601     return true;
00602 }
00603 
00604 bool Recorder::checkLogging() {
00605     if (writing_enabled_)
00606         return true;
00607 
00608     ros::WallTime now = ros::WallTime::now();
00609     if (now >= warn_next_) {
00610         warn_next_ = now + ros::WallDuration().fromSec(5.0);
00611         ROS_WARN("Not logging message because logging disabled.  Most likely cause is a full disk.");
00612     }
00613     return false;
00614 }
00615 
00616 } // namespace rosbag


rosbag
Author(s): Tim Field (tfield@willowgarage.com), Jeremy Leibs (leibs@willowgarage.com), and James Bowman (jamesb@willowgarage.com)
autogenerated on Sat Dec 28 2013 17:43:05