$search
00001 /********************************************************************* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2011, Robert Bosch LLC 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 ********************************************************************/ 00034 00035 #include "topic_logger/recorder.h" 00036 00037 #include <sys/stat.h> 00038 #include <sys/statvfs.h> 00039 #include <time.h> 00040 00041 #include <queue> 00042 #include <set> 00043 #include <sstream> 00044 #include <string> 00045 00046 #include <boost/foreach.hpp> 00047 #include <boost/lexical_cast.hpp> 00048 #include <boost/regex.hpp> 00049 #include <boost/thread.hpp> 00050 #include <boost/thread/xtime.hpp> 00051 00052 #include <ros/ros.h> 00053 #include <topic_tools/shape_shifter.h> 00054 00055 #include "ros/network.h" 00056 #include "ros/xmlrpc_manager.h" 00057 #include "XmlRpc.h" 00058 00059 #define foreach BOOST_FOREACH 00060 00061 using std::cout; 00062 using std::endl; 00063 using std::set; 00064 using std::string; 00065 using std::vector; 00066 using boost::shared_ptr; 00067 using ros::Time; 00068 00069 namespace rosbag { 00070 00071 // OutgoingMessage 00072 00073 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) : 00074 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time) 00075 { 00076 } 00077 00078 // OutgoingQueue 00079 00080 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) : 00081 filename(_filename), queue(_queue), time(_time) 00082 { 00083 } 00084 00085 // RecorderOptions 00086 00087 RecorderOptions::RecorderOptions() : 00088 trigger(false), 00089 record_all(false), 00090 regex(false), 00091 do_exclude(false), 00092 quiet(false), 00093 append_date(true), 00094 snapshot(false), 00095 verbose(false), 00096 compression(compression::Uncompressed), 00097 prefix(""), 00098 name(""), 00099 exclude_regex(), 00100 buffer_size(1048576 * 256), 00101 limit(0), 00102 split(false), 00103 max_size(0), 00104 max_duration(-1.0), 00105 node("") 00106 { 00107 } 00108 00109 // Recorder 00110 00111 Recorder::Recorder(RecorderOptions const& options) : 00112 options_(options), 00113 num_subscribers_(0), 00114 exit_code_(0), 00115 queue_size_(0), 00116 split_count_(0), 00117 writing_enabled_(true) 00118 { 00119 } 00120 00121 00122 Recorder::Recorder() : 00123 num_subscribers_(0), 00124 exit_code_(0), 00125 queue_size_(0), 00126 split_count_(0), 00127 writing_enabled_(true) 00128 { 00129 RecorderOptions opts; 00130 options_ = opts; 00131 } 00132 00133 00134 00135 00136 //stop a running recorder thread and close bagfile 00137 void Recorder::stop(){ 00138 stop_ = true; 00139 00140 queue_condition_.notify_all(); 00141 record_thread.join(); 00142 00143 ROS_INFO("Unsubscribing from topics"); 00144 for(unsigned int a = 0; a < subscriber_handles_.size(); a++) 00145 { 00146 subscriber_handles_[a]->shutdown(); 00147 } 00148 subscriber_handles_.clear(); 00149 00150 delete queue_; 00151 00152 return; 00153 } 00154 00155 //load a recorded bagfile up to a webserver via scp 00156 void Recorder::upload(){ 00157 //get necessary params from param server (keyfileDirectory_topicLogger, usernameWebserver_topicLogger, URLWebserver_topicLogger) 00158 ros::NodeHandle nh; 00159 std::string keyfileD, userWS, URLWS; 00160 if(!nh.getParam("keyfileDirectory_topicLogger", keyfileD)) 00161 { 00162 ROS_INFO("Failed to load 'keyfileDirectory_topicLogger'. Upload not possible."); 00163 return; 00164 } 00165 if(!nh.getParam("usernameWebserver_topicLogger", userWS)) 00166 { 00167 ROS_INFO("Failed to load 'usernameWebserver_topicLogger'. Upload not possible."); 00168 return; 00169 } 00170 if(!nh.getParam("URLWebserver_topicLogger", URLWS)) 00171 { 00172 ROS_INFO("Failed to load 'URLWebserver_topicLogger'. Upload not possible."); 00173 return; 00174 } 00175 00176 // create scp upload command and check processor availability 00177 int upload_result; 00178 std::string command = "scp -i " + keyfileD + "/ec2-keypair.pem " + target_filename_ + " " + userWS + "@" + URLWS; 00179 if (system(NULL)){ 00180 //ROS_INFO("Processor is available."); 00181 } 00182 else{ 00183 ROS_INFO("No processor available -> no upload available."); 00184 return; 00185 } 00186 00187 // execute upload 00188 upload_result = system(command.c_str()); 00189 if (upload_result == 0){ 00190 ROS_INFO("File successfully uploaded"); 00191 } 00192 00193 return; 00194 } 00195 00196 //delete the local copy of a bagfile (useful after a successful upload) 00197 void Recorder::deleteFile() 00198 { 00199 //delete local copy of bagfile after successful upload 00200 std::string cmd = "rm " + target_filename_; 00201 int res = system(cmd.c_str()); 00202 if(res == 0) 00203 { 00204 ROS_INFO("Deleted local copy of bagfile"); 00205 } 00206 00207 return; 00208 } 00209 00210 00211 //start a new thread to record a bagfile 00212 int Recorder::run() { 00213 stop_ = false; 00214 00215 if (options_.trigger) { 00216 doTrigger(); 00217 return 0; 00218 } 00219 00220 if (options_.topics.size() == 0) { 00221 // Make sure limit is not specified with automatic topic subscription 00222 if (options_.limit > 0) { 00223 fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n"); 00224 return 1; 00225 } 00226 00227 // Make sure topics are specified 00228 if (!options_.record_all && (options_.node == std::string(""))) { 00229 fprintf(stderr, "No topics specified.\n"); 00230 return 1; 00231 } 00232 } 00233 00234 ros::NodeHandle nh; 00235 if (!nh.ok()) 00236 return 0; 00237 00238 last_buffer_warn_ = Time(); 00239 queue_ = new std::queue<OutgoingMessage>; 00240 00241 // Subscribe to each topic 00242 if (!options_.regex) { 00243 foreach(string const& topic, options_.topics) 00244 subscriber_handles_.push_back(subscribe(topic)); 00245 } 00246 00247 if (!ros::Time::waitForValid(ros::WallDuration(2.0))) 00248 ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time..."); 00249 00250 ros::Time::waitForValid(); 00251 00252 start_time_ = ros::Time::now(); 00253 00254 // Don't bother doing anything if we never got a valid time 00255 if (!nh.ok()) 00256 return 0; 00257 00258 ros::Subscriber trigger_sub; 00259 00260 // Spin up a thread for writing to the file 00261 if (options_.snapshot) 00262 { 00263 record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this)); 00264 00265 // Subscribe to the snapshot trigger 00266 trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1)); 00267 } 00268 else 00269 record_thread = boost::thread(boost::bind(&Recorder::doRecord ,this)); 00270 00271 00272 ros::Timer check_master_timer; 00273 if (options_.record_all || options_.regex || (options_.node != std::string(""))) 00274 check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh))); 00275 00276 return exit_code_; 00277 } 00278 00279 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) { 00280 ROS_INFO("Subscribing to %s", topic.c_str()); 00281 00282 ros::NodeHandle nh; 00283 shared_ptr<int> count(new int(options_.limit)); 00284 shared_ptr<ros::Subscriber> sub(new ros::Subscriber); 00285 *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count)); 00286 currently_recording_.insert(topic); 00287 num_subscribers_++; 00288 00289 return sub; 00290 } 00291 00292 bool Recorder::isSubscribed(string const& topic) const { 00293 return currently_recording_.find(topic) != currently_recording_.end(); 00294 } 00295 00296 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) { 00297 // ignore already known topics 00298 if (isSubscribed(topic)) { 00299 return false; 00300 } 00301 00302 // subtract exclusion regex, if any 00303 if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) { 00304 return false; 00305 } 00306 00307 if(options_.record_all || from_node) { 00308 return true; 00309 } 00310 00311 if (options_.regex) { 00312 // Treat the topics as regular expressions 00313 foreach(string const& regex_str, options_.topics) { 00314 boost::regex e(regex_str); 00315 boost::smatch what; 00316 if (boost::regex_match(topic, what, e, boost::match_extra)) 00317 return true; 00318 } 00319 } 00320 else { 00321 foreach(string const& t, options_.topics) 00322 if (t == topic) 00323 return true; 00324 } 00325 00326 return false; 00327 } 00328 00329 template<class T> 00330 std::string Recorder::timeToStr(T ros_t) { 00331 char buf[1024] = ""; 00332 time_t t = ros_t.sec; 00333 struct tm* tms = localtime(&t); 00334 strftime(buf, 1024, "%Y-%m-%d-%H-%M-%S", tms); 00335 return string(buf); 00336 } 00337 00339 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) { 00340 //void Recorder::doQueue(topic_tools::ShapeShifter::ConstPtr msg, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) { 00341 00342 Time rectime = Time::now(); 00343 00344 if (options_.verbose) 00345 cout << "Received message on topic " << subscriber->getTopic() << endl; 00346 00347 OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime); 00348 00349 { 00350 boost::mutex::scoped_lock lock(queue_mutex_); 00351 00352 queue_->push(out); 00353 queue_size_ += out.msg->size(); 00354 00355 // Check to see if buffer has been exceeded 00356 while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) { 00357 OutgoingMessage drop = queue_->front(); 00358 queue_->pop(); 00359 queue_size_ -= drop.msg->size(); 00360 00361 if (!options_.snapshot) { 00362 Time now = Time::now(); 00363 if (now > last_buffer_warn_ + ros::Duration(5.0)) { 00364 ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message."); 00365 last_buffer_warn_ = now; 00366 } 00367 } 00368 } 00369 } 00370 00371 if (!options_.snapshot) 00372 queue_condition_.notify_all(); 00373 00374 // If we are book-keeping count, decrement and possibly shutdown 00375 if ((*count) > 0) { 00376 (*count)--; 00377 if ((*count) == 0) { 00378 subscriber->shutdown(); 00379 00380 num_subscribers_--; 00381 00382 if (num_subscribers_ == 0) 00383 ros::shutdown(); 00384 } 00385 } 00386 } 00387 00388 void Recorder::updateFilenames() { 00389 vector<string> parts; 00390 00391 std::string prefix = options_.prefix; 00392 uint32_t ind = prefix.rfind(".bag"); 00393 00394 if (ind == prefix.size() - 4) 00395 { 00396 prefix.erase(ind); 00397 ind = prefix.rfind(".bag"); 00398 } 00399 00400 if (prefix.length() > 0) 00401 parts.push_back(prefix); 00402 if (options_.append_date) 00403 parts.push_back(timeToStr(ros::WallTime::now())); 00404 if (options_.split) 00405 parts.push_back(boost::lexical_cast<string>(split_count_)); 00406 00407 target_filename_ = parts[0]; 00408 for (unsigned int i = 1; i < parts.size(); i++) 00409 target_filename_ += string("_") + parts[i]; 00410 00411 target_filename_ += string(".bag"); 00412 write_filename_ = target_filename_ + string(".active"); 00413 } 00414 00416 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) { 00417 updateFilenames(); 00418 00419 ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str()); 00420 00421 { 00422 boost::mutex::scoped_lock lock(queue_mutex_); 00423 queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now())); 00424 queue_ = new std::queue<OutgoingMessage>; 00425 queue_size_ = 0; 00426 } 00427 00428 queue_condition_.notify_all(); 00429 } 00430 00431 void Recorder::startWriting() { 00432 bag_.setCompression(options_.compression); 00433 00434 updateFilenames(); 00435 try { 00436 bag_.open(write_filename_, bagmode::Write); 00437 } 00438 catch (rosbag::BagException e) { 00439 ROS_ERROR("Error writing: %s", e.what()); 00440 exit_code_ = 1; 00441 ros::shutdown(); 00442 } 00443 ROS_INFO("Recording to %s.", target_filename_.c_str()); 00444 } 00445 00446 void Recorder::stopWriting() { 00447 ROS_INFO("Closing %s.", target_filename_.c_str()); 00448 bag_.close(); 00449 rename(write_filename_.c_str(), target_filename_.c_str()); 00450 } 00451 00452 bool Recorder::checkSize() 00453 { 00454 if (options_.max_size > 0) 00455 { 00456 if (bag_.getSize() > options_.max_size) 00457 { 00458 if (options_.split) 00459 { 00460 stopWriting(); 00461 split_count_++; 00462 startWriting(); 00463 } else { 00464 ros::shutdown(); 00465 return true; 00466 } 00467 } 00468 } 00469 return false; 00470 } 00471 00472 bool Recorder::checkDuration(const ros::Time& t) 00473 { 00474 if (options_.max_duration > ros::Duration(0)) 00475 { 00476 if (t - start_time_ > options_.max_duration) 00477 { 00478 if (options_.split) 00479 { 00480 while (start_time_ + options_.max_duration < t) 00481 { 00482 stopWriting(); 00483 split_count_++; 00484 start_time_ += options_.max_duration; 00485 startWriting(); 00486 } 00487 } else { 00488 ros::shutdown(); 00489 return true; 00490 } 00491 } 00492 } 00493 return false; 00494 } 00495 00496 00498 void Recorder::doRecord() { 00499 00500 // Open bag file for writing 00501 startWriting(); 00502 00503 // Schedule the disk space check 00504 warn_next_ = ros::WallTime(); 00505 checkDisk(); 00506 check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0); 00507 00508 // Technically the queue_mutex_ should be locked while checking empty. 00509 // Except it should only get checked if the node is not ok, and thus 00510 // it shouldn't be in contention. 00511 ros::NodeHandle nh; 00512 while ((nh.ok() || !queue_->empty()) && !stop_) //check stop_ to identify whether recording should be stopped 00513 { 00514 boost::unique_lock<boost::mutex> lock(queue_mutex_); 00515 00516 bool finished = false; 00517 while (queue_->empty()) 00518 { 00519 if (!nh.ok()) 00520 { 00521 lock.release()->unlock(); 00522 finished = true; 00523 break; 00524 } 00525 boost::xtime xt; 00526 boost::xtime_get(&xt, boost::TIME_UTC); 00527 xt.nsec += 250000000; 00528 queue_condition_.timed_wait(lock, xt); 00529 if (checkDuration(ros::Time::now())) 00530 { 00531 finished = true; 00532 break; 00533 } 00534 } 00535 if (finished) 00536 break; 00537 00538 OutgoingMessage out = queue_->front(); 00539 queue_->pop(); 00540 queue_size_ -= out.msg->size(); 00541 00542 lock.release()->unlock(); 00543 00544 if (checkSize()) 00545 break; 00546 if (checkDuration(out.time)) 00547 break; 00548 00549 if (scheduledCheckDisk() && checkLogging()) 00550 bag_.write(out.topic, out.time, *out.msg, out.connection_header); 00551 } 00552 00553 stopWriting(); 00554 } 00555 00556 void Recorder::doRecordSnapshotter() { 00557 ros::NodeHandle nh; 00558 00559 while (nh.ok() || !queue_queue_.empty()) { 00560 boost::unique_lock<boost::mutex> lock(queue_mutex_); 00561 while (queue_queue_.empty()) { 00562 if (!nh.ok()) 00563 return; 00564 queue_condition_.wait(lock); 00565 } 00566 00567 OutgoingQueue out_queue = queue_queue_.front(); 00568 queue_queue_.pop(); 00569 00570 lock.release()->unlock(); 00571 00572 string target_filename = out_queue.filename; 00573 string write_filename = target_filename + string(".active"); 00574 00575 try { 00576 bag_.open(write_filename, bagmode::Write); 00577 } 00578 catch (rosbag::BagException ex) { 00579 ROS_ERROR("Error writing: %s", ex.what()); 00580 return; 00581 } 00582 00583 while (!out_queue.queue->empty()) { 00584 OutgoingMessage out = out_queue.queue->front(); 00585 out_queue.queue->pop(); 00586 00587 bag_.write(out.topic, out.time, *out.msg); 00588 } 00589 00590 stopWriting(); 00591 } 00592 } 00593 00594 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) { 00595 ros::master::V_TopicInfo topics; 00596 if (ros::master::getTopics(topics)) { 00597 foreach(ros::master::TopicInfo const& t, topics) { 00598 if (shouldSubscribeToTopic(t.name)) 00599 subscribe(t.name); 00600 } 00601 } 00602 00603 if (options_.node != std::string("")) 00604 { 00605 00606 XmlRpc::XmlRpcValue req; 00607 req[0] = ros::this_node::getName(); 00608 req[1] = options_.node; 00609 XmlRpc::XmlRpcValue resp; 00610 XmlRpc::XmlRpcValue payload; 00611 00612 if (ros::master::execute("lookupNode", req, resp, payload, true)) 00613 { 00614 std::string peer_host; 00615 uint32_t peer_port; 00616 00617 if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port)) 00618 { 00619 ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str()); 00620 } else { 00621 00622 XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/"); 00623 XmlRpc::XmlRpcValue req; 00624 XmlRpc::XmlRpcValue resp; 00625 req[0] = ros::this_node::getName(); 00626 c.execute("getSubscriptions", req, resp); 00627 00628 if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1) 00629 { 00630 for(int i = 0; i < resp[2].size(); i++) 00631 { 00632 if (shouldSubscribeToTopic(resp[2][i][0], true)) 00633 subscribe(resp[2][i][0]); 00634 } 00635 } else { 00636 ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str()); 00637 } 00638 } 00639 } 00640 } 00641 } 00642 00643 void Recorder::doTrigger() { 00644 ros::NodeHandle nh; 00645 ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true); 00646 pub.publish(std_msgs::Empty()); 00647 00648 ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown)); 00649 ros::spin(); 00650 } 00651 00652 bool Recorder::scheduledCheckDisk() { 00653 boost::mutex::scoped_lock lock(check_disk_mutex_); 00654 00655 if (ros::WallTime::now() < check_disk_next_) 00656 return true; 00657 00658 check_disk_next_ += ros::WallDuration().fromSec(20.0); 00659 return checkDisk(); 00660 } 00661 00662 bool Recorder::checkDisk() { 00663 struct statvfs fiData; 00664 if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0) { 00665 ROS_WARN("Failed to check filesystem stats."); 00666 return true; 00667 } 00668 00669 unsigned long long free_space = 0; 00670 free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail); 00671 if (free_space < 1073741824ull) { 00672 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str()); 00673 writing_enabled_ = false; 00674 return false; 00675 } 00676 else if (free_space < 5368709120ull) { 00677 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str()); 00678 } 00679 else { 00680 writing_enabled_ = true; 00681 } 00682 00683 return true; 00684 } 00685 00686 bool Recorder::checkLogging() { 00687 if (writing_enabled_) 00688 return true; 00689 00690 ros::WallTime now = ros::WallTime::now(); 00691 if (now >= warn_next_) { 00692 warn_next_ = now + ros::WallDuration().fromSec(5.0); 00693 ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk."); 00694 } 00695 return false; 00696 } 00697 00698 } // namespace rosbag