00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/recorder.h"
00036
00037 #include <sys/stat.h>
00038 #include <boost/filesystem.hpp>
00039
00040
00041 #if BOOST_FILESYSTEM_VERSION < 3
00042 #include <sys/statvfs.h>
00043 #endif
00044 #include <time.h>
00045
00046 #include <queue>
00047 #include <set>
00048 #include <sstream>
00049 #include <string>
00050
00051 #include <boost/foreach.hpp>
00052 #include <boost/lexical_cast.hpp>
00053 #include <boost/regex.hpp>
00054 #include <boost/thread.hpp>
00055 #include <boost/thread/xtime.hpp>
00056 #include <boost/date_time/local_time/local_time.hpp>
00057
00058 #include <ros/ros.h>
00059 #include <topic_tools/shape_shifter.h>
00060
00061 #include "ros/network.h"
00062 #include "ros/xmlrpc_manager.h"
00063 #include "XmlRpc.h"
00064
00065 #define foreach BOOST_FOREACH
00066
00067 using std::cout;
00068 using std::endl;
00069 using std::set;
00070 using std::string;
00071 using std::vector;
00072 using boost::shared_ptr;
00073 using ros::Time;
00074
00075 namespace rosbag {
00076
00077
00078
00079 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00080 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00081 {
00082 }
00083
00084
00085
00086 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00087 filename(_filename), queue(_queue), time(_time)
00088 {
00089 }
00090
00091
00092
00093 RecorderOptions::RecorderOptions() :
00094 trigger(false),
00095 record_all(false),
00096 regex(false),
00097 do_exclude(false),
00098 quiet(false),
00099 append_date(true),
00100 snapshot(false),
00101 verbose(false),
00102 compression(compression::Uncompressed),
00103 prefix(""),
00104 name(""),
00105 exclude_regex(),
00106 buffer_size(1048576 * 256),
00107 chunk_size(1024 * 768),
00108 limit(0),
00109 split(false),
00110 max_size(0),
00111 max_duration(-1.0),
00112 node("")
00113 {
00114 }
00115
00116
00117
00118 Recorder::Recorder(RecorderOptions const& options) :
00119 options_(options),
00120 num_subscribers_(0),
00121 exit_code_(0),
00122 queue_size_(0),
00123 split_count_(0),
00124 writing_enabled_(true)
00125 {
00126 }
00127
00128 int Recorder::run() {
00129 if (options_.trigger) {
00130 doTrigger();
00131 return 0;
00132 }
00133
00134 if (options_.topics.size() == 0) {
00135
00136 if (options_.limit > 0) {
00137 fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00138 return 1;
00139 }
00140
00141
00142 if (!options_.record_all && (options_.node == std::string(""))) {
00143 fprintf(stderr, "No topics specified.\n");
00144 return 1;
00145 }
00146 }
00147
00148 ros::NodeHandle nh;
00149 if (!nh.ok())
00150 return 0;
00151
00152 last_buffer_warn_ = Time();
00153 queue_ = new std::queue<OutgoingMessage>;
00154
00155
00156 if (!options_.regex) {
00157 foreach(string const& topic, options_.topics)
00158 subscribe(topic);
00159 }
00160
00161 if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00162 ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
00163
00164 ros::Time::waitForValid();
00165
00166 start_time_ = ros::Time::now();
00167
00168
00169 if (!nh.ok())
00170 return 0;
00171
00172 ros::Subscriber trigger_sub;
00173
00174
00175 boost::thread record_thread;
00176 if (options_.snapshot)
00177 {
00178 record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00179
00180
00181 trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00182 }
00183 else
00184 record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00185
00186
00187
00188 ros::Timer check_master_timer;
00189 if (options_.record_all || options_.regex || (options_.node != std::string("")))
00190 {
00191
00192 doCheckMaster(ros::TimerEvent(), nh);
00193 check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00194 }
00195
00196 ros::MultiThreadedSpinner s(10);
00197 ros::spin(s);
00198
00199 queue_condition_.notify_all();
00200
00201 record_thread.join();
00202
00203 delete queue_;
00204
00205 return exit_code_;
00206 }
00207
00208 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00209 ROS_INFO("Subscribing to %s", topic.c_str());
00210
00211 ros::NodeHandle nh;
00212 shared_ptr<int> count(new int(options_.limit));
00213 shared_ptr<ros::Subscriber> sub(new ros::Subscriber);
00214 *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00215 currently_recording_.insert(topic);
00216 num_subscribers_++;
00217
00218 return sub;
00219 }
00220
00221 bool Recorder::isSubscribed(string const& topic) const {
00222 return currently_recording_.find(topic) != currently_recording_.end();
00223 }
00224
00225 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00226
00227 if (isSubscribed(topic)) {
00228 return false;
00229 }
00230
00231
00232 if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00233 return false;
00234 }
00235
00236 if(options_.record_all || from_node) {
00237 return true;
00238 }
00239
00240 if (options_.regex) {
00241
00242 foreach(string const& regex_str, options_.topics) {
00243 boost::regex e(regex_str);
00244 boost::smatch what;
00245 if (boost::regex_match(topic, what, e, boost::match_extra))
00246 return true;
00247 }
00248 }
00249 else {
00250 foreach(string const& t, options_.topics)
00251 if (t == topic)
00252 return true;
00253 }
00254
00255 return false;
00256 }
00257
00258 template<class T>
00259 std::string Recorder::timeToStr(T ros_t)
00260 {
00261 std::stringstream msg;
00262 const boost::posix_time::ptime now=
00263 boost::posix_time::second_clock::local_time();
00264 boost::posix_time::time_facet *const f=
00265 new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
00266 msg.imbue(std::locale(msg.getloc(),f));
00267 msg << now;
00268 return msg.str();
00269 }
00270
00272 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00273
00274 Time rectime = Time::now();
00275
00276 if (options_.verbose)
00277 cout << "Received message on topic " << subscriber->getTopic() << endl;
00278
00279 OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00280
00281 {
00282 boost::mutex::scoped_lock lock(queue_mutex_);
00283
00284 queue_->push(out);
00285 queue_size_ += out.msg->size();
00286
00287
00288 while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00289 OutgoingMessage drop = queue_->front();
00290 queue_->pop();
00291 queue_size_ -= drop.msg->size();
00292
00293 if (!options_.snapshot) {
00294 Time now = Time::now();
00295 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00296 ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message.");
00297 last_buffer_warn_ = now;
00298 }
00299 }
00300 }
00301 }
00302
00303 if (!options_.snapshot)
00304 queue_condition_.notify_all();
00305
00306
00307 if ((*count) > 0) {
00308 (*count)--;
00309 if ((*count) == 0) {
00310 subscriber->shutdown();
00311
00312 num_subscribers_--;
00313
00314 if (num_subscribers_ == 0)
00315 ros::shutdown();
00316 }
00317 }
00318 }
00319
00320 void Recorder::updateFilenames() {
00321 vector<string> parts;
00322
00323 std::string prefix = options_.prefix;
00324 uint32_t ind = prefix.rfind(".bag");
00325
00326 if (ind == prefix.size() - 4)
00327 {
00328 prefix.erase(ind);
00329 ind = prefix.rfind(".bag");
00330 }
00331
00332 if (prefix.length() > 0)
00333 parts.push_back(prefix);
00334 if (options_.append_date)
00335 parts.push_back(timeToStr(ros::WallTime::now()));
00336 if (options_.split)
00337 parts.push_back(boost::lexical_cast<string>(split_count_));
00338
00339 target_filename_ = parts[0];
00340 for (unsigned int i = 1; i < parts.size(); i++)
00341 target_filename_ += string("_") + parts[i];
00342
00343 target_filename_ += string(".bag");
00344 write_filename_ = target_filename_ + string(".active");
00345 }
00346
00348 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00349 updateFilenames();
00350
00351 ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00352
00353 {
00354 boost::mutex::scoped_lock lock(queue_mutex_);
00355 queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00356 queue_ = new std::queue<OutgoingMessage>;
00357 queue_size_ = 0;
00358 }
00359
00360 queue_condition_.notify_all();
00361 }
00362
00363 void Recorder::startWriting() {
00364 bag_.setCompression(options_.compression);
00365 bag_.setChunkThreshold(options_.chunk_size);
00366
00367 updateFilenames();
00368 try {
00369 bag_.open(write_filename_, bagmode::Write);
00370 }
00371 catch (rosbag::BagException e) {
00372 ROS_ERROR("Error writing: %s", e.what());
00373 exit_code_ = 1;
00374 ros::shutdown();
00375 }
00376 ROS_INFO("Recording to %s.", target_filename_.c_str());
00377 }
00378
00379 void Recorder::stopWriting() {
00380 ROS_INFO("Closing %s.", target_filename_.c_str());
00381 bag_.close();
00382 rename(write_filename_.c_str(), target_filename_.c_str());
00383 }
00384
00385 bool Recorder::checkSize()
00386 {
00387 if (options_.max_size > 0)
00388 {
00389 if (bag_.getSize() > options_.max_size)
00390 {
00391 if (options_.split)
00392 {
00393 stopWriting();
00394 split_count_++;
00395 startWriting();
00396 } else {
00397 ros::shutdown();
00398 return true;
00399 }
00400 }
00401 }
00402 return false;
00403 }
00404
00405 bool Recorder::checkDuration(const ros::Time& t)
00406 {
00407 if (options_.max_duration > ros::Duration(0))
00408 {
00409 if (t - start_time_ > options_.max_duration)
00410 {
00411 if (options_.split)
00412 {
00413 while (start_time_ + options_.max_duration < t)
00414 {
00415 stopWriting();
00416 split_count_++;
00417 start_time_ += options_.max_duration;
00418 startWriting();
00419 }
00420 } else {
00421 ros::shutdown();
00422 return true;
00423 }
00424 }
00425 }
00426 return false;
00427 }
00428
00429
00431 void Recorder::doRecord() {
00432
00433 startWriting();
00434
00435
00436 warn_next_ = ros::WallTime();
00437 checkDisk();
00438 check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00439
00440
00441
00442
00443 ros::NodeHandle nh;
00444 while (nh.ok() || !queue_->empty()) {
00445 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00446
00447 bool finished = false;
00448 while (queue_->empty()) {
00449 if (!nh.ok()) {
00450 lock.release()->unlock();
00451 finished = true;
00452 break;
00453 }
00454 boost::xtime xt;
00455 #if BOOST_VERSION >= 105000
00456 boost::xtime_get(&xt, boost::TIME_UTC_);
00457 #else
00458 boost::xtime_get(&xt, boost::TIME_UTC);
00459 #endif
00460 xt.nsec += 250000000;
00461 queue_condition_.timed_wait(lock, xt);
00462 if (checkDuration(ros::Time::now()))
00463 {
00464 finished = true;
00465 break;
00466 }
00467 }
00468 if (finished)
00469 break;
00470
00471 OutgoingMessage out = queue_->front();
00472 queue_->pop();
00473 queue_size_ -= out.msg->size();
00474
00475 lock.release()->unlock();
00476
00477 if (checkSize())
00478 break;
00479
00480 if (checkDuration(out.time))
00481 break;
00482
00483 if (scheduledCheckDisk() && checkLogging())
00484 bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00485 }
00486
00487 stopWriting();
00488 }
00489
00490 void Recorder::doRecordSnapshotter() {
00491 ros::NodeHandle nh;
00492
00493 while (nh.ok() || !queue_queue_.empty()) {
00494 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00495 while (queue_queue_.empty()) {
00496 if (!nh.ok())
00497 return;
00498 queue_condition_.wait(lock);
00499 }
00500
00501 OutgoingQueue out_queue = queue_queue_.front();
00502 queue_queue_.pop();
00503
00504 lock.release()->unlock();
00505
00506 string target_filename = out_queue.filename;
00507 string write_filename = target_filename + string(".active");
00508
00509 try {
00510 bag_.open(write_filename, bagmode::Write);
00511 }
00512 catch (rosbag::BagException ex) {
00513 ROS_ERROR("Error writing: %s", ex.what());
00514 return;
00515 }
00516
00517 while (!out_queue.queue->empty()) {
00518 OutgoingMessage out = out_queue.queue->front();
00519 out_queue.queue->pop();
00520
00521 bag_.write(out.topic, out.time, *out.msg);
00522 }
00523
00524 stopWriting();
00525 }
00526 }
00527
00528 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00529 ros::master::V_TopicInfo topics;
00530 if (ros::master::getTopics(topics)) {
00531 foreach(ros::master::TopicInfo const& t, topics) {
00532 if (shouldSubscribeToTopic(t.name))
00533 subscribe(t.name);
00534 }
00535 }
00536
00537 if (options_.node != std::string(""))
00538 {
00539
00540 XmlRpc::XmlRpcValue req;
00541 req[0] = ros::this_node::getName();
00542 req[1] = options_.node;
00543 XmlRpc::XmlRpcValue resp;
00544 XmlRpc::XmlRpcValue payload;
00545
00546 if (ros::master::execute("lookupNode", req, resp, payload, true))
00547 {
00548 std::string peer_host;
00549 uint32_t peer_port;
00550
00551 if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00552 {
00553 ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00554 } else {
00555
00556 XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00557 XmlRpc::XmlRpcValue req;
00558 XmlRpc::XmlRpcValue resp;
00559 req[0] = ros::this_node::getName();
00560 c.execute("getSubscriptions", req, resp);
00561
00562 if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1)
00563 {
00564 for(int i = 0; i < resp[2].size(); i++)
00565 {
00566 if (shouldSubscribeToTopic(resp[2][i][0], true))
00567 subscribe(resp[2][i][0]);
00568 }
00569 } else {
00570 ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00571 }
00572 }
00573 }
00574 }
00575 }
00576
00577 void Recorder::doTrigger() {
00578 ros::NodeHandle nh;
00579 ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00580 pub.publish(std_msgs::Empty());
00581
00582 ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00583 ros::spin();
00584 }
00585
00586 bool Recorder::scheduledCheckDisk() {
00587 boost::mutex::scoped_lock lock(check_disk_mutex_);
00588
00589 if (ros::WallTime::now() < check_disk_next_)
00590 return true;
00591
00592 check_disk_next_ += ros::WallDuration().fromSec(20.0);
00593 return checkDisk();
00594 }
00595
00596 bool Recorder::checkDisk() {
00597 #if BOOST_FILESYSTEM_VERSION < 3
00598 struct statvfs fiData;
00599 if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0)
00600 {
00601 ROS_WARN("Failed to check filesystem stats.");
00602 return true;
00603 }
00604 unsigned long long free_space = 0;
00605 free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00606 if (free_space < 1073741824ull)
00607 {
00608 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str());
00609 writing_enabled_ = false;
00610 return false;
00611 }
00612 else if (free_space < 5368709120ull)
00613 {
00614 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00615 }
00616 else
00617 {
00618 writing_enabled_ = true;
00619 }
00620 #else
00621 boost::filesystem::path p(boost::filesystem::system_complete(bag_.getFileName().c_str()));
00622 p = p.parent_path();
00623 boost::filesystem::space_info info;
00624 try
00625 {
00626 info = boost::filesystem::space(p);
00627 }
00628 catch (boost::filesystem::filesystem_error &e)
00629 {
00630 ROS_WARN("Failed to check filesystem stats [%s].", e.what());
00631 writing_enabled_ = false;
00632 return false;
00633 }
00634 if ( info.available < 1073741824ull)
00635 {
00636 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str());
00637 writing_enabled_ = false;
00638 return false;
00639 }
00640 else if (info.available < 5368709120ull)
00641 {
00642 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00643 writing_enabled_ = true;
00644 }
00645 else
00646 {
00647 writing_enabled_ = true;
00648 }
00649 #endif
00650 return true;
00651 }
00652
00653 bool Recorder::checkLogging() {
00654 if (writing_enabled_)
00655 return true;
00656
00657 ros::WallTime now = ros::WallTime::now();
00658 if (now >= warn_next_) {
00659 warn_next_ = now + ros::WallDuration().fromSec(5.0);
00660 ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk.");
00661 }
00662 return false;
00663 }
00664
00665 }