00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 #include "rosbag/recorder.h"
00036 
00037 #include <sys/stat.h>
00038 #include <boost/filesystem.hpp>
00039 
00040 
00041 #if BOOST_FILESYSTEM_VERSION < 3
00042   #include <sys/statvfs.h>
00043 #endif
00044 #include <time.h>
00045 
00046 #include <queue>
00047 #include <set>
00048 #include <sstream>
00049 #include <string>
00050 
00051 #include <boost/foreach.hpp>
00052 #include <boost/lexical_cast.hpp>
00053 #include <boost/regex.hpp>
00054 #include <boost/thread.hpp>
00055 #include <boost/thread/xtime.hpp>
00056 #include <boost/date_time/local_time/local_time.hpp>
00057 
00058 #include <ros/ros.h>
00059 #include <topic_tools/shape_shifter.h>
00060 
00061 #include "ros/network.h"
00062 #include "ros/xmlrpc_manager.h"
00063 #include "XmlRpc.h"
00064 
00065 #define foreach BOOST_FOREACH
00066 
00067 using std::cout;
00068 using std::endl;
00069 using std::set;
00070 using std::string;
00071 using std::vector;
00072 using boost::shared_ptr;
00073 using ros::Time;
00074 
00075 namespace rosbag {
00076 
00077 
00078 
00079 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00080     topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00081 {
00082 }
00083 
00084 
00085 
00086 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00087     filename(_filename), queue(_queue), time(_time)
00088 {
00089 }
00090 
00091 
00092 
00093 RecorderOptions::RecorderOptions() :
00094     trigger(false),
00095     record_all(false),
00096     regex(false),
00097     do_exclude(false),
00098     quiet(false),
00099     append_date(true),
00100     snapshot(false),
00101     verbose(false),
00102     compression(compression::Uncompressed),
00103     prefix(""),
00104     name(""),
00105     exclude_regex(),
00106     buffer_size(1048576 * 256),
00107     chunk_size(1024 * 768),
00108     limit(0),
00109     split(false),
00110     max_size(0),
00111     max_duration(-1.0),
00112     node(""),
00113     min_space(1024 * 1024 * 1024),
00114     min_space_str("1G")
00115 {
00116 }
00117 
00118 
00119 
00120 Recorder::Recorder(RecorderOptions const& options) :
00121     options_(options),
00122     num_subscribers_(0),
00123     exit_code_(0),
00124     queue_size_(0),
00125     split_count_(0),
00126     writing_enabled_(true)
00127 {
00128 }
00129 
00130 int Recorder::run() {
00131     if (options_.trigger) {
00132         doTrigger();
00133         return 0;
00134     }
00135 
00136     if (options_.topics.size() == 0) {
00137         
00138         if (options_.limit > 0) {
00139             fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00140             return 1;
00141         }
00142 
00143         
00144         if (!options_.record_all && (options_.node == std::string(""))) {
00145             fprintf(stderr, "No topics specified.\n");
00146             return 1;
00147         }
00148     }
00149 
00150     ros::NodeHandle nh;
00151     if (!nh.ok())
00152         return 0;
00153 
00154     last_buffer_warn_ = Time();
00155     queue_ = new std::queue<OutgoingMessage>;
00156 
00157     
00158     if (!options_.regex) {
00159         foreach(string const& topic, options_.topics)
00160                         subscribe(topic);
00161     }
00162 
00163     if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00164       ROS_WARN("/use_sim_time set to true and no clock published.  Still waiting for valid time...");
00165 
00166     ros::Time::waitForValid();
00167 
00168     start_time_ = ros::Time::now();
00169 
00170     
00171     if (!nh.ok())
00172         return 0;
00173 
00174     ros::Subscriber trigger_sub;
00175 
00176     
00177     boost::thread record_thread;
00178     if (options_.snapshot)
00179     {
00180         record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00181 
00182         
00183         trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00184     }
00185     else
00186         record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00187 
00188 
00189 
00190     ros::Timer check_master_timer;
00191     if (options_.record_all || options_.regex || (options_.node != std::string("")))
00192     {
00193         
00194         doCheckMaster(ros::TimerEvent(), nh);
00195         check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00196     }
00197 
00198     ros::MultiThreadedSpinner s(10);
00199     ros::spin(s);
00200 
00201     queue_condition_.notify_all();
00202 
00203     record_thread.join();
00204 
00205     delete queue_;
00206 
00207     return exit_code_;
00208 }
00209 
00210 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00211         ROS_INFO("Subscribing to %s", topic.c_str());
00212 
00213     ros::NodeHandle nh;
00214     shared_ptr<int> count(boost::make_shared<int>(options_.limit));
00215     shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
00216 
00217     ros::SubscribeOptions ops;
00218     ops.topic = topic;
00219     ops.queue_size = 100;
00220     ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
00221     ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
00222     ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
00223         const ros::MessageEvent<topic_tools::ShapeShifter const> &> >(
00224             boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00225     *sub = nh.subscribe(ops);
00226 
00227     currently_recording_.insert(topic);
00228     num_subscribers_++;
00229 
00230     return sub;
00231 }
00232 
00233 bool Recorder::isSubscribed(string const& topic) const {
00234     return currently_recording_.find(topic) != currently_recording_.end();
00235 }
00236 
00237 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00238     
00239     if (isSubscribed(topic)) {
00240         return false;
00241     }
00242 
00243     
00244     if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00245         return false;
00246     }
00247 
00248     if(options_.record_all || from_node) {
00249         return true;
00250     }
00251     
00252     if (options_.regex) {
00253         
00254         foreach(string const& regex_str, options_.topics) {
00255             boost::regex e(regex_str);
00256             boost::smatch what;
00257             if (boost::regex_match(topic, what, e, boost::match_extra))
00258                 return true;
00259         }
00260     }
00261     else {
00262         foreach(string const& t, options_.topics)
00263             if (t == topic)
00264                 return true;
00265     }
00266     
00267     return false;
00268 }
00269 
00270 template<class T>
00271 std::string Recorder::timeToStr(T ros_t)
00272 {
00273     (void)ros_t;
00274     std::stringstream msg;
00275     const boost::posix_time::ptime now=
00276         boost::posix_time::second_clock::local_time();
00277     boost::posix_time::time_facet *const f=
00278         new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
00279     msg.imbue(std::locale(msg.getloc(),f));
00280     msg << now;
00281     return msg.str();
00282 }
00283 
00285 void Recorder::doQueue(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00286     
00287     Time rectime = Time::now();
00288     
00289     if (options_.verbose)
00290         cout << "Received message on topic " << subscriber->getTopic() << endl;
00291 
00292     OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00293     
00294     {
00295         boost::mutex::scoped_lock lock(queue_mutex_);
00296 
00297         queue_->push(out);
00298         queue_size_ += out.msg->size();
00299         
00300         
00301         while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00302             OutgoingMessage drop = queue_->front();
00303             queue_->pop();
00304             queue_size_ -= drop.msg->size();
00305 
00306             if (!options_.snapshot) {
00307                 Time now = Time::now();
00308                 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00309                     ROS_WARN("rosbag record buffer exceeded.  Dropping oldest queued message.");
00310                     last_buffer_warn_ = now;
00311                 }
00312             }
00313         }
00314     }
00315   
00316     if (!options_.snapshot)
00317         queue_condition_.notify_all();
00318 
00319     
00320     if ((*count) > 0) {
00321         (*count)--;
00322         if ((*count) == 0) {
00323             subscriber->shutdown();
00324 
00325             num_subscribers_--;
00326 
00327             if (num_subscribers_ == 0)
00328                 ros::shutdown();
00329         }
00330     }
00331 }
00332 
00333 void Recorder::updateFilenames() {
00334     vector<string> parts;
00335 
00336     std::string prefix = options_.prefix;
00337     size_t ind = prefix.rfind(".bag");
00338 
00339     if (ind != std::string::npos && ind == prefix.size() - 4)
00340     {
00341       prefix.erase(ind);
00342     }
00343 
00344     if (prefix.length() > 0)
00345         parts.push_back(prefix);
00346     if (options_.append_date)
00347         parts.push_back(timeToStr(ros::WallTime::now()));
00348     if (options_.split)
00349         parts.push_back(boost::lexical_cast<string>(split_count_));
00350 
00351     if (parts.size() == 0)
00352     {
00353       throw BagException("Bag filename is empty (neither of these was specified: prefix, append_date, split)");
00354     }
00355 
00356     target_filename_ = parts[0];
00357     for (unsigned int i = 1; i < parts.size(); i++)
00358         target_filename_ += string("_") + parts[i];
00359 
00360     target_filename_ += string(".bag");
00361     write_filename_ = target_filename_ + string(".active");
00362 }
00363 
00365 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00366     (void)trigger;
00367     updateFilenames();
00368     
00369     ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00370     
00371     {
00372         boost::mutex::scoped_lock lock(queue_mutex_);
00373         queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00374         queue_      = new std::queue<OutgoingMessage>;
00375         queue_size_ = 0;
00376     }
00377 
00378     queue_condition_.notify_all();
00379 }
00380 
00381 void Recorder::startWriting() {
00382     bag_.setCompression(options_.compression);
00383     bag_.setChunkThreshold(options_.chunk_size);
00384 
00385     updateFilenames();
00386     try {
00387         bag_.open(write_filename_, bagmode::Write);
00388     }
00389     catch (rosbag::BagException e) {
00390         ROS_ERROR("Error writing: %s", e.what());
00391         exit_code_ = 1;
00392         ros::shutdown();
00393     }
00394     ROS_INFO("Recording to %s.", target_filename_.c_str());
00395 }
00396 
00397 void Recorder::stopWriting() {
00398     ROS_INFO("Closing %s.", target_filename_.c_str());
00399     bag_.close();
00400     rename(write_filename_.c_str(), target_filename_.c_str());
00401 }
00402 
00403 bool Recorder::checkSize()
00404 {
00405     if (options_.max_size > 0)
00406     {
00407         if (bag_.getSize() > options_.max_size)
00408         {
00409             if (options_.split)
00410             {
00411                 stopWriting();
00412                 split_count_++;
00413                 startWriting();
00414             } else {
00415                 ros::shutdown();
00416                 return true;
00417             }
00418         }
00419     }
00420     return false;
00421 }
00422 
00423 bool Recorder::checkDuration(const ros::Time& t)
00424 {
00425     if (options_.max_duration > ros::Duration(0))
00426     {
00427         if (t - start_time_ > options_.max_duration)
00428         {
00429             if (options_.split)
00430             {
00431                 while (start_time_ + options_.max_duration < t)
00432                 {
00433                     stopWriting();
00434                     split_count_++;
00435                     start_time_ += options_.max_duration;
00436                     startWriting();
00437                 }
00438             } else {
00439                 ros::shutdown();
00440                 return true;
00441             }
00442         }
00443     }
00444     return false;
00445 }
00446 
00447 
00449 void Recorder::doRecord() {
00450     
00451     startWriting();
00452 
00453     
00454     warn_next_ = ros::WallTime();
00455     checkDisk();
00456     check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00457 
00458     
00459     
00460     
00461     ros::NodeHandle nh;
00462     while (nh.ok() || !queue_->empty()) {
00463         boost::unique_lock<boost::mutex> lock(queue_mutex_);
00464 
00465         bool finished = false;
00466         while (queue_->empty()) {
00467             if (!nh.ok()) {
00468                 lock.release()->unlock();
00469                 finished = true;
00470                 break;
00471             }
00472             boost::xtime xt;
00473 #if BOOST_VERSION >= 105000
00474             boost::xtime_get(&xt, boost::TIME_UTC_);
00475 #else
00476             boost::xtime_get(&xt, boost::TIME_UTC);
00477 #endif
00478             xt.nsec += 250000000;
00479             queue_condition_.timed_wait(lock, xt);
00480             if (checkDuration(ros::Time::now()))
00481             {
00482                 finished = true;
00483                 break;
00484             }
00485         }
00486         if (finished)
00487             break;
00488 
00489         OutgoingMessage out = queue_->front();
00490         queue_->pop();
00491         queue_size_ -= out.msg->size();
00492         
00493         lock.release()->unlock();
00494         
00495         if (checkSize())
00496             break;
00497 
00498         if (checkDuration(out.time))
00499             break;
00500 
00501         if (scheduledCheckDisk() && checkLogging())
00502             bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00503     }
00504 
00505     stopWriting();
00506 }
00507 
00508 void Recorder::doRecordSnapshotter() {
00509     ros::NodeHandle nh;
00510   
00511     while (nh.ok() || !queue_queue_.empty()) {
00512         boost::unique_lock<boost::mutex> lock(queue_mutex_);
00513         while (queue_queue_.empty()) {
00514             if (!nh.ok())
00515                 return;
00516             queue_condition_.wait(lock);
00517         }
00518         
00519         OutgoingQueue out_queue = queue_queue_.front();
00520         queue_queue_.pop();
00521         
00522         lock.release()->unlock();
00523         
00524         string target_filename = out_queue.filename;
00525         string write_filename  = target_filename + string(".active");
00526         
00527         try {
00528             bag_.open(write_filename, bagmode::Write);
00529         }
00530         catch (rosbag::BagException ex) {
00531             ROS_ERROR("Error writing: %s", ex.what());
00532             return;
00533         }
00534 
00535         while (!out_queue.queue->empty()) {
00536             OutgoingMessage out = out_queue.queue->front();
00537             out_queue.queue->pop();
00538 
00539             bag_.write(out.topic, out.time, *out.msg);
00540         }
00541 
00542         stopWriting();
00543     }
00544 }
00545 
00546 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00547     (void)e;
00548     (void)node_handle;
00549     ros::master::V_TopicInfo topics;
00550     if (ros::master::getTopics(topics)) {
00551                 foreach(ros::master::TopicInfo const& t, topics) {
00552                         if (shouldSubscribeToTopic(t.name))
00553                                 subscribe(t.name);
00554                 }
00555     }
00556     
00557     if (options_.node != std::string(""))
00558     {
00559 
00560       XmlRpc::XmlRpcValue req;
00561       req[0] = ros::this_node::getName();
00562       req[1] = options_.node;
00563       XmlRpc::XmlRpcValue resp;
00564       XmlRpc::XmlRpcValue payload;
00565 
00566       if (ros::master::execute("lookupNode", req, resp, payload, true))
00567       {
00568         std::string peer_host;
00569         uint32_t peer_port;
00570 
00571         if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00572         {
00573           ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00574         } else {
00575 
00576           XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00577           XmlRpc::XmlRpcValue req2;
00578           XmlRpc::XmlRpcValue resp2;
00579           req2[0] = ros::this_node::getName();
00580           c.execute("getSubscriptions", req2, resp2);
00581           
00582           if (!c.isFault() && resp2.valid() && resp2.size() > 0 && static_cast<int>(resp2[0]) == 1)
00583           {
00584             for(int i = 0; i < resp2[2].size(); i++)
00585             {
00586               if (shouldSubscribeToTopic(resp2[2][i][0], true))
00587                 subscribe(resp2[2][i][0]);
00588             }
00589           } else {
00590             ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00591           }
00592         }
00593       }
00594     }
00595 }
00596 
00597 void Recorder::doTrigger() {
00598     ros::NodeHandle nh;
00599     ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00600     pub.publish(std_msgs::Empty());
00601 
00602     ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00603     ros::spin();
00604 }
00605 
00606 bool Recorder::scheduledCheckDisk() {
00607     boost::mutex::scoped_lock lock(check_disk_mutex_);
00608 
00609     if (ros::WallTime::now() < check_disk_next_)
00610         return true;
00611 
00612     check_disk_next_ += ros::WallDuration().fromSec(20.0);
00613     return checkDisk();
00614 }
00615 
00616 bool Recorder::checkDisk() {
00617 #if BOOST_FILESYSTEM_VERSION < 3
00618     struct statvfs fiData;
00619     if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0)
00620     {
00621         ROS_WARN("Failed to check filesystem stats.");
00622         return true;
00623     }
00624     unsigned long long free_space = 0;
00625     free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00626     if (free_space < options_.min_space)
00627     {
00628         ROS_ERROR("Less than %s of space free on disk with %s.  Disabling recording.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
00629         writing_enabled_ = false;
00630         return false;
00631     }
00632     else if (free_space < 5 * options_.min_space)
00633     {
00634         ROS_WARN("Less than 5 x %s of space free on disk with %s.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
00635     }
00636     else
00637     {
00638         writing_enabled_ = true;
00639     }
00640 #else
00641     boost::filesystem::path p(boost::filesystem::system_complete(bag_.getFileName().c_str()));
00642     p = p.parent_path();
00643     boost::filesystem::space_info info;
00644     try
00645     {
00646         info = boost::filesystem::space(p);
00647     }
00648     catch (boost::filesystem::filesystem_error &e) 
00649     { 
00650         ROS_WARN("Failed to check filesystem stats [%s].", e.what());
00651         writing_enabled_ = false;
00652         return false;
00653     }
00654     if ( info.available < options_.min_space)
00655     {
00656         ROS_ERROR("Less than %s of space free on disk with %s.  Disabling recording.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
00657         writing_enabled_ = false;
00658         return false;
00659     }
00660     else if (info.available < 5 * options_.min_space)
00661     {
00662         ROS_WARN("Less than 5 x %s of space free on disk with %s.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
00663         writing_enabled_ = true;
00664     }
00665     else
00666     {
00667         writing_enabled_ = true;
00668     }
00669 #endif
00670     return true;
00671 }
00672 
00673 bool Recorder::checkLogging() {
00674     if (writing_enabled_)
00675         return true;
00676 
00677     ros::WallTime now = ros::WallTime::now();
00678     if (now >= warn_next_) {
00679         warn_next_ = now + ros::WallDuration().fromSec(5.0);
00680         ROS_WARN("Not logging message because logging disabled.  Most likely cause is a full disk.");
00681     }
00682     return false;
00683 }
00684 
00685 }