00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/recorder.h"
00036
00037 #include <sys/stat.h>
00038 #include <boost/filesystem.hpp>
00039
00040
00041 #if BOOST_FILESYSTEM_VERSION < 3
00042 #include <sys/statvfs.h>
00043 #endif
00044 #include <time.h>
00045
00046 #include <queue>
00047 #include <set>
00048 #include <sstream>
00049 #include <string>
00050
00051 #include <boost/foreach.hpp>
00052 #include <boost/lexical_cast.hpp>
00053 #include <boost/regex.hpp>
00054 #include <boost/thread.hpp>
00055 #include <boost/thread/xtime.hpp>
00056 #include <boost/date_time/local_time/local_time.hpp>
00057
00058 #include <ros/ros.h>
00059 #include <topic_tools/shape_shifter.h>
00060
00061 #include "ros/network.h"
00062 #include "ros/xmlrpc_manager.h"
00063 #include "XmlRpc.h"
00064
00065 #define foreach BOOST_FOREACH
00066
00067 using std::cout;
00068 using std::endl;
00069 using std::set;
00070 using std::string;
00071 using std::vector;
00072 using boost::shared_ptr;
00073 using ros::Time;
00074
00075 namespace rosbag {
00076
00077
00078
00079 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00080 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00081 {
00082 }
00083
00084
00085
00086 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00087 filename(_filename), queue(_queue), time(_time)
00088 {
00089 }
00090
00091
00092
00093 RecorderOptions::RecorderOptions() :
00094 trigger(false),
00095 record_all(false),
00096 regex(false),
00097 do_exclude(false),
00098 quiet(false),
00099 append_date(true),
00100 snapshot(false),
00101 verbose(false),
00102 compression(compression::Uncompressed),
00103 prefix(""),
00104 name(""),
00105 exclude_regex(),
00106 buffer_size(1048576 * 256),
00107 chunk_size(1024 * 768),
00108 limit(0),
00109 split(false),
00110 max_size(0),
00111 max_duration(-1.0),
00112 node(""),
00113 min_space(1024 * 1024 * 1024),
00114 min_space_str("1G")
00115 {
00116 }
00117
00118
00119
00120 Recorder::Recorder(RecorderOptions const& options) :
00121 options_(options),
00122 num_subscribers_(0),
00123 exit_code_(0),
00124 queue_size_(0),
00125 split_count_(0),
00126 writing_enabled_(true)
00127 {
00128 }
00129
00130 int Recorder::run() {
00131 if (options_.trigger) {
00132 doTrigger();
00133 return 0;
00134 }
00135
00136 if (options_.topics.size() == 0) {
00137
00138 if (options_.limit > 0) {
00139 fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00140 return 1;
00141 }
00142
00143
00144 if (!options_.record_all && (options_.node == std::string(""))) {
00145 fprintf(stderr, "No topics specified.\n");
00146 return 1;
00147 }
00148 }
00149
00150 ros::NodeHandle nh;
00151 if (!nh.ok())
00152 return 0;
00153
00154 last_buffer_warn_ = Time();
00155 queue_ = new std::queue<OutgoingMessage>;
00156
00157
00158 if (!options_.regex) {
00159 foreach(string const& topic, options_.topics)
00160 subscribe(topic);
00161 }
00162
00163 if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00164 ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
00165
00166 ros::Time::waitForValid();
00167
00168 start_time_ = ros::Time::now();
00169
00170
00171 if (!nh.ok())
00172 return 0;
00173
00174 ros::Subscriber trigger_sub;
00175
00176
00177 boost::thread record_thread;
00178 if (options_.snapshot)
00179 {
00180 record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00181
00182
00183 trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00184 }
00185 else
00186 record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00187
00188
00189
00190 ros::Timer check_master_timer;
00191 if (options_.record_all || options_.regex || (options_.node != std::string("")))
00192 {
00193
00194 doCheckMaster(ros::TimerEvent(), nh);
00195 check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00196 }
00197
00198 ros::MultiThreadedSpinner s(10);
00199 ros::spin(s);
00200
00201 queue_condition_.notify_all();
00202
00203 record_thread.join();
00204
00205 delete queue_;
00206
00207 return exit_code_;
00208 }
00209
00210 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00211 ROS_INFO("Subscribing to %s", topic.c_str());
00212
00213 ros::NodeHandle nh;
00214 shared_ptr<int> count(boost::make_shared<int>(options_.limit));
00215 shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
00216
00217 ros::SubscribeOptions ops;
00218 ops.topic = topic;
00219 ops.queue_size = 100;
00220 ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
00221 ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
00222 ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
00223 const ros::MessageEvent<topic_tools::ShapeShifter const> &> >(
00224 boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00225 *sub = nh.subscribe(ops);
00226
00227 currently_recording_.insert(topic);
00228 num_subscribers_++;
00229
00230 return sub;
00231 }
00232
00233 bool Recorder::isSubscribed(string const& topic) const {
00234 return currently_recording_.find(topic) != currently_recording_.end();
00235 }
00236
00237 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00238
00239 if (isSubscribed(topic)) {
00240 return false;
00241 }
00242
00243
00244 if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00245 return false;
00246 }
00247
00248 if(options_.record_all || from_node) {
00249 return true;
00250 }
00251
00252 if (options_.regex) {
00253
00254 foreach(string const& regex_str, options_.topics) {
00255 boost::regex e(regex_str);
00256 boost::smatch what;
00257 if (boost::regex_match(topic, what, e, boost::match_extra))
00258 return true;
00259 }
00260 }
00261 else {
00262 foreach(string const& t, options_.topics)
00263 if (t == topic)
00264 return true;
00265 }
00266
00267 return false;
00268 }
00269
00270 template<class T>
00271 std::string Recorder::timeToStr(T ros_t)
00272 {
00273 (void)ros_t;
00274 std::stringstream msg;
00275 const boost::posix_time::ptime now=
00276 boost::posix_time::second_clock::local_time();
00277 boost::posix_time::time_facet *const f=
00278 new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
00279 msg.imbue(std::locale(msg.getloc(),f));
00280 msg << now;
00281 return msg.str();
00282 }
00283
00285 void Recorder::doQueue(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00286
00287 Time rectime = Time::now();
00288
00289 if (options_.verbose)
00290 cout << "Received message on topic " << subscriber->getTopic() << endl;
00291
00292 OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00293
00294 {
00295 boost::mutex::scoped_lock lock(queue_mutex_);
00296
00297 queue_->push(out);
00298 queue_size_ += out.msg->size();
00299
00300
00301 while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00302 OutgoingMessage drop = queue_->front();
00303 queue_->pop();
00304 queue_size_ -= drop.msg->size();
00305
00306 if (!options_.snapshot) {
00307 Time now = Time::now();
00308 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00309 ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message.");
00310 last_buffer_warn_ = now;
00311 }
00312 }
00313 }
00314 }
00315
00316 if (!options_.snapshot)
00317 queue_condition_.notify_all();
00318
00319
00320 if ((*count) > 0) {
00321 (*count)--;
00322 if ((*count) == 0) {
00323 subscriber->shutdown();
00324
00325 num_subscribers_--;
00326
00327 if (num_subscribers_ == 0)
00328 ros::shutdown();
00329 }
00330 }
00331 }
00332
00333 void Recorder::updateFilenames() {
00334 vector<string> parts;
00335
00336 std::string prefix = options_.prefix;
00337 size_t ind = prefix.rfind(".bag");
00338
00339 if (ind != std::string::npos && ind == prefix.size() - 4)
00340 {
00341 prefix.erase(ind);
00342 }
00343
00344 if (prefix.length() > 0)
00345 parts.push_back(prefix);
00346 if (options_.append_date)
00347 parts.push_back(timeToStr(ros::WallTime::now()));
00348 if (options_.split)
00349 parts.push_back(boost::lexical_cast<string>(split_count_));
00350
00351 if (parts.size() == 0)
00352 {
00353 throw BagException("Bag filename is empty (neither of these was specified: prefix, append_date, split)");
00354 }
00355
00356 target_filename_ = parts[0];
00357 for (unsigned int i = 1; i < parts.size(); i++)
00358 target_filename_ += string("_") + parts[i];
00359
00360 target_filename_ += string(".bag");
00361 write_filename_ = target_filename_ + string(".active");
00362 }
00363
00365 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00366 (void)trigger;
00367 updateFilenames();
00368
00369 ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00370
00371 {
00372 boost::mutex::scoped_lock lock(queue_mutex_);
00373 queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00374 queue_ = new std::queue<OutgoingMessage>;
00375 queue_size_ = 0;
00376 }
00377
00378 queue_condition_.notify_all();
00379 }
00380
00381 void Recorder::startWriting() {
00382 bag_.setCompression(options_.compression);
00383 bag_.setChunkThreshold(options_.chunk_size);
00384
00385 updateFilenames();
00386 try {
00387 bag_.open(write_filename_, bagmode::Write);
00388 }
00389 catch (rosbag::BagException e) {
00390 ROS_ERROR("Error writing: %s", e.what());
00391 exit_code_ = 1;
00392 ros::shutdown();
00393 }
00394 ROS_INFO("Recording to %s.", target_filename_.c_str());
00395 }
00396
00397 void Recorder::stopWriting() {
00398 ROS_INFO("Closing %s.", target_filename_.c_str());
00399 bag_.close();
00400 rename(write_filename_.c_str(), target_filename_.c_str());
00401 }
00402
00403 bool Recorder::checkSize()
00404 {
00405 if (options_.max_size > 0)
00406 {
00407 if (bag_.getSize() > options_.max_size)
00408 {
00409 if (options_.split)
00410 {
00411 stopWriting();
00412 split_count_++;
00413 startWriting();
00414 } else {
00415 ros::shutdown();
00416 return true;
00417 }
00418 }
00419 }
00420 return false;
00421 }
00422
00423 bool Recorder::checkDuration(const ros::Time& t)
00424 {
00425 if (options_.max_duration > ros::Duration(0))
00426 {
00427 if (t - start_time_ > options_.max_duration)
00428 {
00429 if (options_.split)
00430 {
00431 while (start_time_ + options_.max_duration < t)
00432 {
00433 stopWriting();
00434 split_count_++;
00435 start_time_ += options_.max_duration;
00436 startWriting();
00437 }
00438 } else {
00439 ros::shutdown();
00440 return true;
00441 }
00442 }
00443 }
00444 return false;
00445 }
00446
00447
00449 void Recorder::doRecord() {
00450
00451 startWriting();
00452
00453
00454 warn_next_ = ros::WallTime();
00455 checkDisk();
00456 check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00457
00458
00459
00460
00461 ros::NodeHandle nh;
00462 while (nh.ok() || !queue_->empty()) {
00463 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00464
00465 bool finished = false;
00466 while (queue_->empty()) {
00467 if (!nh.ok()) {
00468 lock.release()->unlock();
00469 finished = true;
00470 break;
00471 }
00472 boost::xtime xt;
00473 #if BOOST_VERSION >= 105000
00474 boost::xtime_get(&xt, boost::TIME_UTC_);
00475 #else
00476 boost::xtime_get(&xt, boost::TIME_UTC);
00477 #endif
00478 xt.nsec += 250000000;
00479 queue_condition_.timed_wait(lock, xt);
00480 if (checkDuration(ros::Time::now()))
00481 {
00482 finished = true;
00483 break;
00484 }
00485 }
00486 if (finished)
00487 break;
00488
00489 OutgoingMessage out = queue_->front();
00490 queue_->pop();
00491 queue_size_ -= out.msg->size();
00492
00493 lock.release()->unlock();
00494
00495 if (checkSize())
00496 break;
00497
00498 if (checkDuration(out.time))
00499 break;
00500
00501 if (scheduledCheckDisk() && checkLogging())
00502 bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00503 }
00504
00505 stopWriting();
00506 }
00507
00508 void Recorder::doRecordSnapshotter() {
00509 ros::NodeHandle nh;
00510
00511 while (nh.ok() || !queue_queue_.empty()) {
00512 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00513 while (queue_queue_.empty()) {
00514 if (!nh.ok())
00515 return;
00516 queue_condition_.wait(lock);
00517 }
00518
00519 OutgoingQueue out_queue = queue_queue_.front();
00520 queue_queue_.pop();
00521
00522 lock.release()->unlock();
00523
00524 string target_filename = out_queue.filename;
00525 string write_filename = target_filename + string(".active");
00526
00527 try {
00528 bag_.open(write_filename, bagmode::Write);
00529 }
00530 catch (rosbag::BagException ex) {
00531 ROS_ERROR("Error writing: %s", ex.what());
00532 return;
00533 }
00534
00535 while (!out_queue.queue->empty()) {
00536 OutgoingMessage out = out_queue.queue->front();
00537 out_queue.queue->pop();
00538
00539 bag_.write(out.topic, out.time, *out.msg);
00540 }
00541
00542 stopWriting();
00543 }
00544 }
00545
00546 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00547 (void)e;
00548 (void)node_handle;
00549 ros::master::V_TopicInfo topics;
00550 if (ros::master::getTopics(topics)) {
00551 foreach(ros::master::TopicInfo const& t, topics) {
00552 if (shouldSubscribeToTopic(t.name))
00553 subscribe(t.name);
00554 }
00555 }
00556
00557 if (options_.node != std::string(""))
00558 {
00559
00560 XmlRpc::XmlRpcValue req;
00561 req[0] = ros::this_node::getName();
00562 req[1] = options_.node;
00563 XmlRpc::XmlRpcValue resp;
00564 XmlRpc::XmlRpcValue payload;
00565
00566 if (ros::master::execute("lookupNode", req, resp, payload, true))
00567 {
00568 std::string peer_host;
00569 uint32_t peer_port;
00570
00571 if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00572 {
00573 ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00574 } else {
00575
00576 XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00577 XmlRpc::XmlRpcValue req2;
00578 XmlRpc::XmlRpcValue resp2;
00579 req2[0] = ros::this_node::getName();
00580 c.execute("getSubscriptions", req2, resp2);
00581
00582 if (!c.isFault() && resp2.valid() && resp2.size() > 0 && static_cast<int>(resp2[0]) == 1)
00583 {
00584 for(int i = 0; i < resp2[2].size(); i++)
00585 {
00586 if (shouldSubscribeToTopic(resp2[2][i][0], true))
00587 subscribe(resp2[2][i][0]);
00588 }
00589 } else {
00590 ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00591 }
00592 }
00593 }
00594 }
00595 }
00596
00597 void Recorder::doTrigger() {
00598 ros::NodeHandle nh;
00599 ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00600 pub.publish(std_msgs::Empty());
00601
00602 ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00603 ros::spin();
00604 }
00605
00606 bool Recorder::scheduledCheckDisk() {
00607 boost::mutex::scoped_lock lock(check_disk_mutex_);
00608
00609 if (ros::WallTime::now() < check_disk_next_)
00610 return true;
00611
00612 check_disk_next_ += ros::WallDuration().fromSec(20.0);
00613 return checkDisk();
00614 }
00615
00616 bool Recorder::checkDisk() {
00617 #if BOOST_FILESYSTEM_VERSION < 3
00618 struct statvfs fiData;
00619 if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0)
00620 {
00621 ROS_WARN("Failed to check filesystem stats.");
00622 return true;
00623 }
00624 unsigned long long free_space = 0;
00625 free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00626 if (free_space < options_.min_space)
00627 {
00628 ROS_ERROR("Less than %s of space free on disk with %s. Disabling recording.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
00629 writing_enabled_ = false;
00630 return false;
00631 }
00632 else if (free_space < 5 * options_.min_space)
00633 {
00634 ROS_WARN("Less than 5 x %s of space free on disk with %s.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
00635 }
00636 else
00637 {
00638 writing_enabled_ = true;
00639 }
00640 #else
00641 boost::filesystem::path p(boost::filesystem::system_complete(bag_.getFileName().c_str()));
00642 p = p.parent_path();
00643 boost::filesystem::space_info info;
00644 try
00645 {
00646 info = boost::filesystem::space(p);
00647 }
00648 catch (boost::filesystem::filesystem_error &e)
00649 {
00650 ROS_WARN("Failed to check filesystem stats [%s].", e.what());
00651 writing_enabled_ = false;
00652 return false;
00653 }
00654 if ( info.available < options_.min_space)
00655 {
00656 ROS_ERROR("Less than %s of space free on disk with %s. Disabling recording.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
00657 writing_enabled_ = false;
00658 return false;
00659 }
00660 else if (info.available < 5 * options_.min_space)
00661 {
00662 ROS_WARN("Less than 5 x %s of space free on disk with %s.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
00663 writing_enabled_ = true;
00664 }
00665 else
00666 {
00667 writing_enabled_ = true;
00668 }
00669 #endif
00670 return true;
00671 }
00672
00673 bool Recorder::checkLogging() {
00674 if (writing_enabled_)
00675 return true;
00676
00677 ros::WallTime now = ros::WallTime::now();
00678 if (now >= warn_next_) {
00679 warn_next_ = now + ros::WallDuration().fromSec(5.0);
00680 ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk.");
00681 }
00682 return false;
00683 }
00684
00685 }