$search
00001 /********************************************************************* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 ********************************************************************/ 00034 00035 #include "rosbag/recorder.h" 00036 00037 #include <sys/stat.h> 00038 #include <sys/statvfs.h> 00039 #include <time.h> 00040 00041 #include <queue> 00042 #include <set> 00043 #include <sstream> 00044 #include <string> 00045 00046 #include <boost/foreach.hpp> 00047 #include <boost/lexical_cast.hpp> 00048 #include <boost/regex.hpp> 00049 #include <boost/thread.hpp> 00050 #include <boost/thread/xtime.hpp> 00051 00052 #include <ros/ros.h> 00053 #include <topic_tools/shape_shifter.h> 00054 00055 #include "ros/network.h" 00056 #include "ros/xmlrpc_manager.h" 00057 #include "XmlRpc.h" 00058 00059 #define foreach BOOST_FOREACH 00060 00061 using std::cout; 00062 using std::endl; 00063 using std::set; 00064 using std::string; 00065 using std::vector; 00066 using boost::shared_ptr; 00067 using ros::Time; 00068 00069 namespace rosbag { 00070 00071 // OutgoingMessage 00072 00073 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) : 00074 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time) 00075 { 00076 } 00077 00078 // OutgoingQueue 00079 00080 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) : 00081 filename(_filename), queue(_queue), time(_time) 00082 { 00083 } 00084 00085 // RecorderOptions 00086 00087 RecorderOptions::RecorderOptions() : 00088 trigger(false), 00089 record_all(false), 00090 regex(false), 00091 do_exclude(false), 00092 quiet(false), 00093 append_date(true), 00094 snapshot(false), 00095 verbose(false), 00096 compression(compression::Uncompressed), 00097 prefix(""), 00098 name(""), 00099 exclude_regex(), 00100 buffer_size(1048576 * 256), 00101 limit(0), 00102 split(false), 00103 max_size(0), 00104 max_duration(-1.0), 00105 node("") 00106 { 00107 } 00108 00109 // Recorder 00110 00111 Recorder::Recorder(RecorderOptions const& options) : 00112 options_(options), 00113 num_subscribers_(0), 00114 exit_code_(0), 00115 queue_size_(0), 00116 split_count_(0), 00117 writing_enabled_(true) 00118 { 00119 } 00120 00121 int Recorder::run() { 00122 if (options_.trigger) { 00123 doTrigger(); 00124 return 0; 00125 } 00126 00127 if (options_.topics.size() == 0) { 00128 // Make sure limit is not specified with automatic topic subscription 00129 if (options_.limit > 0) { 00130 fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n"); 00131 return 1; 00132 } 00133 00134 // Make sure topics are specified 00135 if (!options_.record_all && (options_.node == std::string(""))) { 00136 fprintf(stderr, "No topics specified.\n"); 00137 return 1; 00138 } 00139 } 00140 00141 ros::NodeHandle nh; 00142 if (!nh.ok()) 00143 return 0; 00144 00145 last_buffer_warn_ = Time(); 00146 queue_ = new std::queue<OutgoingMessage>; 00147 00148 // Subscribe to each topic 00149 if (!options_.regex) { 00150 foreach(string const& topic, options_.topics) 00151 subscribe(topic); 00152 } 00153 00154 if (!ros::Time::waitForValid(ros::WallDuration(2.0))) 00155 ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time..."); 00156 00157 ros::Time::waitForValid(); 00158 00159 start_time_ = ros::Time::now(); 00160 00161 // Don't bother doing anything if we never got a valid time 00162 if (!nh.ok()) 00163 return 0; 00164 00165 ros::Subscriber trigger_sub; 00166 00167 // Spin up a thread for writing to the file 00168 boost::thread record_thread; 00169 if (options_.snapshot) 00170 { 00171 record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this)); 00172 00173 // Subscribe to the snapshot trigger 00174 trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1)); 00175 } 00176 else 00177 record_thread = boost::thread(boost::bind(&Recorder::doRecord, this)); 00178 00179 00180 00181 ros::Timer check_master_timer; 00182 if (options_.record_all || options_.regex || (options_.node != std::string(""))) 00183 check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh))); 00184 00185 ros::MultiThreadedSpinner s(10); 00186 ros::spin(s); 00187 00188 queue_condition_.notify_all(); 00189 00190 record_thread.join(); 00191 00192 delete queue_; 00193 00194 return exit_code_; 00195 } 00196 00197 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) { 00198 ROS_INFO("Subscribing to %s", topic.c_str()); 00199 00200 ros::NodeHandle nh; 00201 shared_ptr<int> count(new int(options_.limit)); 00202 shared_ptr<ros::Subscriber> sub(new ros::Subscriber); 00203 *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count)); 00204 currently_recording_.insert(topic); 00205 num_subscribers_++; 00206 00207 return sub; 00208 } 00209 00210 bool Recorder::isSubscribed(string const& topic) const { 00211 return currently_recording_.find(topic) != currently_recording_.end(); 00212 } 00213 00214 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) { 00215 // ignore already known topics 00216 if (isSubscribed(topic)) { 00217 return false; 00218 } 00219 00220 // subtract exclusion regex, if any 00221 if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) { 00222 return false; 00223 } 00224 00225 if(options_.record_all || from_node) { 00226 return true; 00227 } 00228 00229 if (options_.regex) { 00230 // Treat the topics as regular expressions 00231 foreach(string const& regex_str, options_.topics) { 00232 boost::regex e(regex_str); 00233 boost::smatch what; 00234 if (boost::regex_match(topic, what, e, boost::match_extra)) 00235 return true; 00236 } 00237 } 00238 else { 00239 foreach(string const& t, options_.topics) 00240 if (t == topic) 00241 return true; 00242 } 00243 00244 return false; 00245 } 00246 00247 template<class T> 00248 std::string Recorder::timeToStr(T ros_t) { 00249 char buf[1024] = ""; 00250 time_t t = ros_t.sec; 00251 struct tm* tms = localtime(&t); 00252 strftime(buf, 1024, "%Y-%m-%d-%H-%M-%S", tms); 00253 return string(buf); 00254 } 00255 00257 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) { 00258 //void Recorder::doQueue(topic_tools::ShapeShifter::ConstPtr msg, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) { 00259 Time rectime = Time::now(); 00260 00261 if (options_.verbose) 00262 cout << "Received message on topic " << subscriber->getTopic() << endl; 00263 00264 OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime); 00265 00266 { 00267 boost::mutex::scoped_lock lock(queue_mutex_); 00268 00269 queue_->push(out); 00270 queue_size_ += out.msg->size(); 00271 00272 // Check to see if buffer has been exceeded 00273 while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) { 00274 OutgoingMessage drop = queue_->front(); 00275 queue_->pop(); 00276 queue_size_ -= drop.msg->size(); 00277 00278 if (!options_.snapshot) { 00279 Time now = Time::now(); 00280 if (now > last_buffer_warn_ + ros::Duration(5.0)) { 00281 ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message."); 00282 last_buffer_warn_ = now; 00283 } 00284 } 00285 } 00286 } 00287 00288 if (!options_.snapshot) 00289 queue_condition_.notify_all(); 00290 00291 // If we are book-keeping count, decrement and possibly shutdown 00292 if ((*count) > 0) { 00293 (*count)--; 00294 if ((*count) == 0) { 00295 subscriber->shutdown(); 00296 00297 num_subscribers_--; 00298 00299 if (num_subscribers_ == 0) 00300 ros::shutdown(); 00301 } 00302 } 00303 } 00304 00305 void Recorder::updateFilenames() { 00306 vector<string> parts; 00307 00308 std::string prefix = options_.prefix; 00309 uint32_t ind = prefix.rfind(".bag"); 00310 00311 if (ind == prefix.size() - 4) 00312 { 00313 prefix.erase(ind); 00314 ind = prefix.rfind(".bag"); 00315 } 00316 00317 if (prefix.length() > 0) 00318 parts.push_back(prefix); 00319 if (options_.append_date) 00320 parts.push_back(timeToStr(ros::WallTime::now())); 00321 if (options_.split) 00322 parts.push_back(boost::lexical_cast<string>(split_count_)); 00323 00324 target_filename_ = parts[0]; 00325 for (unsigned int i = 1; i < parts.size(); i++) 00326 target_filename_ += string("_") + parts[i]; 00327 00328 target_filename_ += string(".bag"); 00329 write_filename_ = target_filename_ + string(".active"); 00330 } 00331 00333 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) { 00334 updateFilenames(); 00335 00336 ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str()); 00337 00338 { 00339 boost::mutex::scoped_lock lock(queue_mutex_); 00340 queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now())); 00341 queue_ = new std::queue<OutgoingMessage>; 00342 queue_size_ = 0; 00343 } 00344 00345 queue_condition_.notify_all(); 00346 } 00347 00348 void Recorder::startWriting() { 00349 bag_.setCompression(options_.compression); 00350 00351 updateFilenames(); 00352 try { 00353 bag_.open(write_filename_, bagmode::Write); 00354 } 00355 catch (rosbag::BagException e) { 00356 ROS_ERROR("Error writing: %s", e.what()); 00357 exit_code_ = 1; 00358 ros::shutdown(); 00359 } 00360 ROS_INFO("Recording to %s.", target_filename_.c_str()); 00361 } 00362 00363 void Recorder::stopWriting() { 00364 ROS_INFO("Closing %s.", target_filename_.c_str()); 00365 bag_.close(); 00366 rename(write_filename_.c_str(), target_filename_.c_str()); 00367 } 00368 00369 bool Recorder::checkSize() 00370 { 00371 if (options_.max_size > 0) 00372 { 00373 if (bag_.getSize() > options_.max_size) 00374 { 00375 if (options_.split) 00376 { 00377 stopWriting(); 00378 split_count_++; 00379 startWriting(); 00380 } else { 00381 ros::shutdown(); 00382 return true; 00383 } 00384 } 00385 } 00386 return false; 00387 } 00388 00389 bool Recorder::checkDuration(const ros::Time& t) 00390 { 00391 if (options_.max_duration > ros::Duration(0)) 00392 { 00393 if (t - start_time_ > options_.max_duration) 00394 { 00395 if (options_.split) 00396 { 00397 while (start_time_ + options_.max_duration < t) 00398 { 00399 stopWriting(); 00400 split_count_++; 00401 start_time_ += options_.max_duration; 00402 startWriting(); 00403 } 00404 } else { 00405 ros::shutdown(); 00406 return true; 00407 } 00408 } 00409 } 00410 return false; 00411 } 00412 00413 00415 void Recorder::doRecord() { 00416 // Open bag file for writing 00417 startWriting(); 00418 00419 // Schedule the disk space check 00420 warn_next_ = ros::WallTime(); 00421 checkDisk(); 00422 check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0); 00423 00424 // Technically the queue_mutex_ should be locked while checking empty. 00425 // Except it should only get checked if the node is not ok, and thus 00426 // it shouldn't be in contention. 00427 ros::NodeHandle nh; 00428 while (nh.ok() || !queue_->empty()) { 00429 boost::unique_lock<boost::mutex> lock(queue_mutex_); 00430 00431 bool finished = false; 00432 while (queue_->empty()) { 00433 if (!nh.ok()) { 00434 lock.release()->unlock(); 00435 finished = true; 00436 break; 00437 } 00438 boost::xtime xt; 00439 boost::xtime_get(&xt, boost::TIME_UTC); 00440 xt.nsec += 250000000; 00441 queue_condition_.timed_wait(lock, xt); 00442 if (checkDuration(ros::Time::now())) 00443 { 00444 finished = true; 00445 break; 00446 } 00447 } 00448 if (finished) 00449 break; 00450 00451 OutgoingMessage out = queue_->front(); 00452 queue_->pop(); 00453 queue_size_ -= out.msg->size(); 00454 00455 lock.release()->unlock(); 00456 00457 if (checkSize()) 00458 break; 00459 00460 if (checkDuration(out.time)) 00461 break; 00462 00463 if (scheduledCheckDisk() && checkLogging()) 00464 bag_.write(out.topic, out.time, *out.msg, out.connection_header); 00465 } 00466 00467 stopWriting(); 00468 } 00469 00470 void Recorder::doRecordSnapshotter() { 00471 ros::NodeHandle nh; 00472 00473 while (nh.ok() || !queue_queue_.empty()) { 00474 boost::unique_lock<boost::mutex> lock(queue_mutex_); 00475 while (queue_queue_.empty()) { 00476 if (!nh.ok()) 00477 return; 00478 queue_condition_.wait(lock); 00479 } 00480 00481 OutgoingQueue out_queue = queue_queue_.front(); 00482 queue_queue_.pop(); 00483 00484 lock.release()->unlock(); 00485 00486 string target_filename = out_queue.filename; 00487 string write_filename = target_filename + string(".active"); 00488 00489 try { 00490 bag_.open(write_filename, bagmode::Write); 00491 } 00492 catch (rosbag::BagException ex) { 00493 ROS_ERROR("Error writing: %s", ex.what()); 00494 return; 00495 } 00496 00497 while (!out_queue.queue->empty()) { 00498 OutgoingMessage out = out_queue.queue->front(); 00499 out_queue.queue->pop(); 00500 00501 bag_.write(out.topic, out.time, *out.msg); 00502 } 00503 00504 stopWriting(); 00505 } 00506 } 00507 00508 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) { 00509 ros::master::V_TopicInfo topics; 00510 if (ros::master::getTopics(topics)) { 00511 foreach(ros::master::TopicInfo const& t, topics) { 00512 if (shouldSubscribeToTopic(t.name)) 00513 subscribe(t.name); 00514 } 00515 } 00516 00517 if (options_.node != std::string("")) 00518 { 00519 00520 XmlRpc::XmlRpcValue req; 00521 req[0] = ros::this_node::getName(); 00522 req[1] = options_.node; 00523 XmlRpc::XmlRpcValue resp; 00524 XmlRpc::XmlRpcValue payload; 00525 00526 if (ros::master::execute("lookupNode", req, resp, payload, true)) 00527 { 00528 std::string peer_host; 00529 uint32_t peer_port; 00530 00531 if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port)) 00532 { 00533 ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str()); 00534 } else { 00535 00536 XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/"); 00537 XmlRpc::XmlRpcValue req; 00538 XmlRpc::XmlRpcValue resp; 00539 req[0] = ros::this_node::getName(); 00540 c.execute("getSubscriptions", req, resp); 00541 00542 if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1) 00543 { 00544 for(int i = 0; i < resp[2].size(); i++) 00545 { 00546 if (shouldSubscribeToTopic(resp[2][i][0], true)) 00547 subscribe(resp[2][i][0]); 00548 } 00549 } else { 00550 ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str()); 00551 } 00552 } 00553 } 00554 } 00555 } 00556 00557 void Recorder::doTrigger() { 00558 ros::NodeHandle nh; 00559 ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true); 00560 pub.publish(std_msgs::Empty()); 00561 00562 ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown)); 00563 ros::spin(); 00564 } 00565 00566 bool Recorder::scheduledCheckDisk() { 00567 boost::mutex::scoped_lock lock(check_disk_mutex_); 00568 00569 if (ros::WallTime::now() < check_disk_next_) 00570 return true; 00571 00572 check_disk_next_ += ros::WallDuration().fromSec(20.0); 00573 return checkDisk(); 00574 } 00575 00576 bool Recorder::checkDisk() { 00577 struct statvfs fiData; 00578 if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0) { 00579 ROS_WARN("Failed to check filesystem stats."); 00580 return true; 00581 } 00582 00583 unsigned long long free_space = 0; 00584 free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail); 00585 if (free_space < 1073741824ull) { 00586 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str()); 00587 writing_enabled_ = false; 00588 return false; 00589 } 00590 else if (free_space < 5368709120ull) { 00591 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str()); 00592 } 00593 else { 00594 writing_enabled_ = true; 00595 } 00596 00597 return true; 00598 } 00599 00600 bool Recorder::checkLogging() { 00601 if (writing_enabled_) 00602 return true; 00603 00604 ros::WallTime now = ros::WallTime::now(); 00605 if (now >= warn_next_) { 00606 warn_next_ = now + ros::WallDuration().fromSec(5.0); 00607 ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk."); 00608 } 00609 return false; 00610 } 00611 00612 } // namespace rosbag