00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "topic_logger/recorder.h"
00036
00037 #include <sys/stat.h>
00038 #include <sys/statvfs.h>
00039 #include <time.h>
00040
00041 #include <queue>
00042 #include <set>
00043 #include <sstream>
00044 #include <string>
00045
00046 #include <boost/foreach.hpp>
00047 #include <boost/lexical_cast.hpp>
00048 #include <boost/regex.hpp>
00049 #include <boost/thread.hpp>
00050 #include <boost/thread/xtime.hpp>
00051
00052 #include <ros/ros.h>
00053 #include <topic_tools/shape_shifter.h>
00054
00055 #include "ros/network.h"
00056 #include "ros/xmlrpc_manager.h"
00057 #include "XmlRpc.h"
00058
00059 #define foreach BOOST_FOREACH
00060
00061 using std::cout;
00062 using std::endl;
00063 using std::set;
00064 using std::string;
00065 using std::vector;
00066 using boost::shared_ptr;
00067 using ros::Time;
00068
00069 namespace rosbag {
00070
00071
00072
00073 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00074 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00075 {
00076 }
00077
00078
00079
00080 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00081 filename(_filename), queue(_queue), time(_time)
00082 {
00083 }
00084
00085
00086
00087 RecorderOptions::RecorderOptions() :
00088 trigger(false),
00089 record_all(false),
00090 regex(false),
00091 do_exclude(false),
00092 quiet(false),
00093 append_date(true),
00094 snapshot(false),
00095 verbose(false),
00096 compression(compression::Uncompressed),
00097 prefix(""),
00098 name(""),
00099 exclude_regex(),
00100 buffer_size(1048576 * 256),
00101 limit(0),
00102 split(false),
00103 max_size(0),
00104 max_duration(-1.0),
00105 node("")
00106 {
00107 }
00108
00109
00110
00111 Recorder::Recorder(RecorderOptions const& options) :
00112 options_(options),
00113 num_subscribers_(0),
00114 exit_code_(0),
00115 queue_size_(0),
00116 split_count_(0),
00117 writing_enabled_(true)
00118 {
00119 }
00120
00121
00122 Recorder::Recorder() :
00123 num_subscribers_(0),
00124 exit_code_(0),
00125 queue_size_(0),
00126 split_count_(0),
00127 writing_enabled_(true)
00128 {
00129 RecorderOptions opts;
00130 options_ = opts;
00131 }
00132
00133
00134
00135
00136
00137 void Recorder::stop(){
00138 stop_ = true;
00139
00140 queue_condition_.notify_all();
00141 record_thread.join();
00142
00143 ROS_INFO("Unsubscribing from topics");
00144 for(unsigned int a = 0; a < subscriber_handles_.size(); a++)
00145 {
00146 subscriber_handles_[a]->shutdown();
00147 }
00148 subscriber_handles_.clear();
00149
00150 delete queue_;
00151
00152 return;
00153 }
00154
00155
00156 void Recorder::upload(){
00157
00158 ros::NodeHandle nh;
00159 std::string keyfileD, userWS, URLWS;
00160 if(!nh.getParam("keyfileDirectory_topicLogger", keyfileD))
00161 {
00162 ROS_INFO("Failed to load 'keyfileDirectory_topicLogger'. Upload not possible.");
00163 return;
00164 }
00165 if(!nh.getParam("usernameWebserver_topicLogger", userWS))
00166 {
00167 ROS_INFO("Failed to load 'usernameWebserver_topicLogger'. Upload not possible.");
00168 return;
00169 }
00170 if(!nh.getParam("URLWebserver_topicLogger", URLWS))
00171 {
00172 ROS_INFO("Failed to load 'URLWebserver_topicLogger'. Upload not possible.");
00173 return;
00174 }
00175
00176
00177 int upload_result;
00178 std::string command = "scp -i " + keyfileD + "/ec2-keypair.pem " + target_filename_ + " " + userWS + "@" + URLWS;
00179 if (system(NULL)){
00180
00181 }
00182 else{
00183 ROS_INFO("No processor available -> no upload available.");
00184 return;
00185 }
00186
00187
00188 upload_result = system(command.c_str());
00189 if (upload_result == 0){
00190 ROS_INFO("File successfully uploaded");
00191 }
00192
00193 return;
00194 }
00195
00196
00197 void Recorder::deleteFile()
00198 {
00199
00200 std::string cmd = "rm " + target_filename_;
00201 int res = system(cmd.c_str());
00202 if(res == 0)
00203 {
00204 ROS_INFO("Deleted local copy of bagfile");
00205 }
00206
00207 return;
00208 }
00209
00210
00211
00212 int Recorder::run() {
00213 stop_ = false;
00214
00215 if (options_.trigger) {
00216 doTrigger();
00217 return 0;
00218 }
00219
00220 if (options_.topics.size() == 0) {
00221
00222 if (options_.limit > 0) {
00223 fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00224 return 1;
00225 }
00226
00227
00228 if (!options_.record_all && (options_.node == std::string(""))) {
00229 fprintf(stderr, "No topics specified.\n");
00230 return 1;
00231 }
00232 }
00233
00234 ros::NodeHandle nh;
00235 if (!nh.ok())
00236 return 0;
00237
00238 last_buffer_warn_ = Time();
00239 queue_ = new std::queue<OutgoingMessage>;
00240
00241
00242 if (!options_.regex) {
00243 foreach(string const& topic, options_.topics)
00244 subscriber_handles_.push_back(subscribe(topic));
00245 }
00246
00247 if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00248 ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
00249
00250 ros::Time::waitForValid();
00251
00252 start_time_ = ros::Time::now();
00253
00254
00255 if (!nh.ok())
00256 return 0;
00257
00258 ros::Subscriber trigger_sub;
00259
00260
00261 if (options_.snapshot)
00262 {
00263 record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00264
00265
00266 trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00267 }
00268 else
00269 record_thread = boost::thread(boost::bind(&Recorder::doRecord ,this));
00270
00271
00272 ros::Timer check_master_timer;
00273 if (options_.record_all || options_.regex || (options_.node != std::string("")))
00274 check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00275
00276 return exit_code_;
00277 }
00278
00279 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00280 ROS_INFO("Subscribing to %s", topic.c_str());
00281
00282 ros::NodeHandle nh;
00283 shared_ptr<int> count(new int(options_.limit));
00284 shared_ptr<ros::Subscriber> sub(new ros::Subscriber);
00285 *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00286 currently_recording_.insert(topic);
00287 num_subscribers_++;
00288
00289 return sub;
00290 }
00291
00292 bool Recorder::isSubscribed(string const& topic) const {
00293 return currently_recording_.find(topic) != currently_recording_.end();
00294 }
00295
00296 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00297
00298 if (isSubscribed(topic)) {
00299 return false;
00300 }
00301
00302
00303 if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00304 return false;
00305 }
00306
00307 if(options_.record_all || from_node) {
00308 return true;
00309 }
00310
00311 if (options_.regex) {
00312
00313 foreach(string const& regex_str, options_.topics) {
00314 boost::regex e(regex_str);
00315 boost::smatch what;
00316 if (boost::regex_match(topic, what, e, boost::match_extra))
00317 return true;
00318 }
00319 }
00320 else {
00321 foreach(string const& t, options_.topics)
00322 if (t == topic)
00323 return true;
00324 }
00325
00326 return false;
00327 }
00328
00329 template<class T>
00330 std::string Recorder::timeToStr(T ros_t) {
00331 char buf[1024] = "";
00332 time_t t = ros_t.sec;
00333 struct tm* tms = localtime(&t);
00334 strftime(buf, 1024, "%Y-%m-%d-%H-%M-%S", tms);
00335 return string(buf);
00336 }
00337
00339 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00340
00341
00342 Time rectime = Time::now();
00343
00344 if (options_.verbose)
00345 cout << "Received message on topic " << subscriber->getTopic() << endl;
00346
00347 OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00348
00349 {
00350 boost::mutex::scoped_lock lock(queue_mutex_);
00351
00352 queue_->push(out);
00353 queue_size_ += out.msg->size();
00354
00355
00356 while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00357 OutgoingMessage drop = queue_->front();
00358 queue_->pop();
00359 queue_size_ -= drop.msg->size();
00360
00361 if (!options_.snapshot) {
00362 Time now = Time::now();
00363 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00364 ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message.");
00365 last_buffer_warn_ = now;
00366 }
00367 }
00368 }
00369 }
00370
00371 if (!options_.snapshot)
00372 queue_condition_.notify_all();
00373
00374
00375 if ((*count) > 0) {
00376 (*count)--;
00377 if ((*count) == 0) {
00378 subscriber->shutdown();
00379
00380 num_subscribers_--;
00381
00382 if (num_subscribers_ == 0)
00383 ros::shutdown();
00384 }
00385 }
00386 }
00387
00388 void Recorder::updateFilenames() {
00389 vector<string> parts;
00390
00391 std::string prefix = options_.prefix;
00392 uint32_t ind = prefix.rfind(".bag");
00393
00394 if (ind == prefix.size() - 4)
00395 {
00396 prefix.erase(ind);
00397 ind = prefix.rfind(".bag");
00398 }
00399
00400 if (prefix.length() > 0)
00401 parts.push_back(prefix);
00402 if (options_.append_date)
00403 parts.push_back(timeToStr(ros::WallTime::now()));
00404 if (options_.split)
00405 parts.push_back(boost::lexical_cast<string>(split_count_));
00406
00407 target_filename_ = parts[0];
00408 for (unsigned int i = 1; i < parts.size(); i++)
00409 target_filename_ += string("_") + parts[i];
00410
00411 target_filename_ += string(".bag");
00412 write_filename_ = target_filename_ + string(".active");
00413 }
00414
00416 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00417 updateFilenames();
00418
00419 ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00420
00421 {
00422 boost::mutex::scoped_lock lock(queue_mutex_);
00423 queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00424 queue_ = new std::queue<OutgoingMessage>;
00425 queue_size_ = 0;
00426 }
00427
00428 queue_condition_.notify_all();
00429 }
00430
00431 void Recorder::startWriting() {
00432 bag_.setCompression(options_.compression);
00433
00434 updateFilenames();
00435 try {
00436 bag_.open(write_filename_, bagmode::Write);
00437 }
00438 catch (rosbag::BagException e) {
00439 ROS_ERROR("Error writing: %s", e.what());
00440 exit_code_ = 1;
00441 ros::shutdown();
00442 }
00443 ROS_INFO("Recording to %s.", target_filename_.c_str());
00444 }
00445
00446 void Recorder::stopWriting() {
00447 ROS_INFO("Closing %s.", target_filename_.c_str());
00448 bag_.close();
00449 rename(write_filename_.c_str(), target_filename_.c_str());
00450 }
00451
00452 bool Recorder::checkSize()
00453 {
00454 if (options_.max_size > 0)
00455 {
00456 if (bag_.getSize() > options_.max_size)
00457 {
00458 if (options_.split)
00459 {
00460 stopWriting();
00461 split_count_++;
00462 startWriting();
00463 } else {
00464 ros::shutdown();
00465 return true;
00466 }
00467 }
00468 }
00469 return false;
00470 }
00471
00472 bool Recorder::checkDuration(const ros::Time& t)
00473 {
00474 if (options_.max_duration > ros::Duration(0))
00475 {
00476 if (t - start_time_ > options_.max_duration)
00477 {
00478 if (options_.split)
00479 {
00480 while (start_time_ + options_.max_duration < t)
00481 {
00482 stopWriting();
00483 split_count_++;
00484 start_time_ += options_.max_duration;
00485 startWriting();
00486 }
00487 } else {
00488 ros::shutdown();
00489 return true;
00490 }
00491 }
00492 }
00493 return false;
00494 }
00495
00496
00498 void Recorder::doRecord() {
00499
00500
00501 startWriting();
00502
00503
00504 warn_next_ = ros::WallTime();
00505 checkDisk();
00506 check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00507
00508
00509
00510
00511 ros::NodeHandle nh;
00512 while ((nh.ok() || !queue_->empty()) && !stop_)
00513 {
00514 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00515
00516 bool finished = false;
00517 while (queue_->empty())
00518 {
00519 if (!nh.ok())
00520 {
00521 lock.release()->unlock();
00522 finished = true;
00523 break;
00524 }
00525 boost::xtime xt;
00526 boost::xtime_get(&xt, boost::TIME_UTC);
00527 xt.nsec += 250000000;
00528 queue_condition_.timed_wait(lock, xt);
00529 if (checkDuration(ros::Time::now()))
00530 {
00531 finished = true;
00532 break;
00533 }
00534 }
00535 if (finished)
00536 break;
00537
00538 OutgoingMessage out = queue_->front();
00539 queue_->pop();
00540 queue_size_ -= out.msg->size();
00541
00542 lock.release()->unlock();
00543
00544 if (checkSize())
00545 break;
00546 if (checkDuration(out.time))
00547 break;
00548
00549 if (scheduledCheckDisk() && checkLogging())
00550 bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00551 }
00552
00553 stopWriting();
00554 }
00555
00556 void Recorder::doRecordSnapshotter() {
00557 ros::NodeHandle nh;
00558
00559 while (nh.ok() || !queue_queue_.empty()) {
00560 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00561 while (queue_queue_.empty()) {
00562 if (!nh.ok())
00563 return;
00564 queue_condition_.wait(lock);
00565 }
00566
00567 OutgoingQueue out_queue = queue_queue_.front();
00568 queue_queue_.pop();
00569
00570 lock.release()->unlock();
00571
00572 string target_filename = out_queue.filename;
00573 string write_filename = target_filename + string(".active");
00574
00575 try {
00576 bag_.open(write_filename, bagmode::Write);
00577 }
00578 catch (rosbag::BagException ex) {
00579 ROS_ERROR("Error writing: %s", ex.what());
00580 return;
00581 }
00582
00583 while (!out_queue.queue->empty()) {
00584 OutgoingMessage out = out_queue.queue->front();
00585 out_queue.queue->pop();
00586
00587 bag_.write(out.topic, out.time, *out.msg);
00588 }
00589
00590 stopWriting();
00591 }
00592 }
00593
00594 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00595 ros::master::V_TopicInfo topics;
00596 if (ros::master::getTopics(topics)) {
00597 foreach(ros::master::TopicInfo const& t, topics) {
00598 if (shouldSubscribeToTopic(t.name))
00599 subscribe(t.name);
00600 }
00601 }
00602
00603 if (options_.node != std::string(""))
00604 {
00605
00606 XmlRpc::XmlRpcValue req;
00607 req[0] = ros::this_node::getName();
00608 req[1] = options_.node;
00609 XmlRpc::XmlRpcValue resp;
00610 XmlRpc::XmlRpcValue payload;
00611
00612 if (ros::master::execute("lookupNode", req, resp, payload, true))
00613 {
00614 std::string peer_host;
00615 uint32_t peer_port;
00616
00617 if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00618 {
00619 ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00620 } else {
00621
00622 XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00623 XmlRpc::XmlRpcValue req;
00624 XmlRpc::XmlRpcValue resp;
00625 req[0] = ros::this_node::getName();
00626 c.execute("getSubscriptions", req, resp);
00627
00628 if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1)
00629 {
00630 for(int i = 0; i < resp[2].size(); i++)
00631 {
00632 if (shouldSubscribeToTopic(resp[2][i][0], true))
00633 subscribe(resp[2][i][0]);
00634 }
00635 } else {
00636 ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00637 }
00638 }
00639 }
00640 }
00641 }
00642
00643 void Recorder::doTrigger() {
00644 ros::NodeHandle nh;
00645 ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00646 pub.publish(std_msgs::Empty());
00647
00648 ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00649 ros::spin();
00650 }
00651
00652 bool Recorder::scheduledCheckDisk() {
00653 boost::mutex::scoped_lock lock(check_disk_mutex_);
00654
00655 if (ros::WallTime::now() < check_disk_next_)
00656 return true;
00657
00658 check_disk_next_ += ros::WallDuration().fromSec(20.0);
00659 return checkDisk();
00660 }
00661
00662 bool Recorder::checkDisk() {
00663 struct statvfs fiData;
00664 if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0) {
00665 ROS_WARN("Failed to check filesystem stats.");
00666 return true;
00667 }
00668
00669 unsigned long long free_space = 0;
00670 free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00671 if (free_space < 1073741824ull) {
00672 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str());
00673 writing_enabled_ = false;
00674 return false;
00675 }
00676 else if (free_space < 5368709120ull) {
00677 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00678 }
00679 else {
00680 writing_enabled_ = true;
00681 }
00682
00683 return true;
00684 }
00685
00686 bool Recorder::checkLogging() {
00687 if (writing_enabled_)
00688 return true;
00689
00690 ros::WallTime now = ros::WallTime::now();
00691 if (now >= warn_next_) {
00692 warn_next_ = now + ros::WallDuration().fromSec(5.0);
00693 ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk.");
00694 }
00695 return false;
00696 }
00697
00698 }