00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/recorder.h"
00036
00037 #include <sys/stat.h>
00038 #include <boost/filesystem.hpp>
00039
00040
00041 #if BOOST_FILESYSTEM_VERSION < 3
00042 #include <sys/statvfs.h>
00043 #endif
00044 #include <time.h>
00045
00046 #include <queue>
00047 #include <set>
00048 #include <sstream>
00049 #include <string>
00050
00051 #include <boost/foreach.hpp>
00052 #include <boost/lexical_cast.hpp>
00053 #include <boost/regex.hpp>
00054 #include <boost/thread.hpp>
00055 #include <boost/thread/xtime.hpp>
00056 #include <boost/date_time/local_time/local_time.hpp>
00057
00058 #include <ros/ros.h>
00059 #include <topic_tools/shape_shifter.h>
00060
00061 #include "ros/network.h"
00062 #include "ros/xmlrpc_manager.h"
00063 #include "XmlRpc.h"
00064
00065 #define foreach BOOST_FOREACH
00066
00067 using std::cout;
00068 using std::endl;
00069 using std::set;
00070 using std::string;
00071 using std::vector;
00072 using boost::shared_ptr;
00073 using ros::Time;
00074
00075 namespace rosbag {
00076
00077
00078
00079 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00080 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00081 {
00082 }
00083
00084
00085
00086 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00087 filename(_filename), queue(_queue), time(_time)
00088 {
00089 }
00090
00091
00092
00093 RecorderOptions::RecorderOptions() :
00094 trigger(false),
00095 record_all(false),
00096 regex(false),
00097 do_exclude(false),
00098 quiet(false),
00099 append_date(true),
00100 snapshot(false),
00101 verbose(false),
00102 compression(compression::Uncompressed),
00103 prefix(""),
00104 name(""),
00105 exclude_regex(),
00106 buffer_size(1048576 * 256),
00107 chunk_size(1024 * 768),
00108 limit(0),
00109 split(false),
00110 max_size(0),
00111 max_duration(-1.0),
00112 node("")
00113 {
00114 }
00115
00116
00117
00118 Recorder::Recorder(RecorderOptions const& options) :
00119 options_(options),
00120 num_subscribers_(0),
00121 exit_code_(0),
00122 queue_size_(0),
00123 split_count_(0),
00124 writing_enabled_(true)
00125 {
00126 }
00127
00128 int Recorder::run() {
00129 if (options_.trigger) {
00130 doTrigger();
00131 return 0;
00132 }
00133
00134 if (options_.topics.size() == 0) {
00135
00136 if (options_.limit > 0) {
00137 fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00138 return 1;
00139 }
00140
00141
00142 if (!options_.record_all && (options_.node == std::string(""))) {
00143 fprintf(stderr, "No topics specified.\n");
00144 return 1;
00145 }
00146 }
00147
00148 ros::NodeHandle nh;
00149 if (!nh.ok())
00150 return 0;
00151
00152 last_buffer_warn_ = Time();
00153 queue_ = new std::queue<OutgoingMessage>;
00154
00155
00156 if (!options_.regex) {
00157 foreach(string const& topic, options_.topics)
00158 subscribe(topic);
00159 }
00160
00161 if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00162 ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
00163
00164 ros::Time::waitForValid();
00165
00166 start_time_ = ros::Time::now();
00167
00168
00169 if (!nh.ok())
00170 return 0;
00171
00172 ros::Subscriber trigger_sub;
00173
00174
00175 boost::thread record_thread;
00176 if (options_.snapshot)
00177 {
00178 record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00179
00180
00181 trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00182 }
00183 else
00184 record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00185
00186
00187
00188 ros::Timer check_master_timer;
00189 if (options_.record_all || options_.regex || (options_.node != std::string("")))
00190 check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00191
00192 ros::MultiThreadedSpinner s(10);
00193 ros::spin(s);
00194
00195 queue_condition_.notify_all();
00196
00197 record_thread.join();
00198
00199 delete queue_;
00200
00201 return exit_code_;
00202 }
00203
00204 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00205 ROS_INFO("Subscribing to %s", topic.c_str());
00206
00207 ros::NodeHandle nh;
00208 shared_ptr<int> count(new int(options_.limit));
00209 shared_ptr<ros::Subscriber> sub(new ros::Subscriber);
00210 *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00211 currently_recording_.insert(topic);
00212 num_subscribers_++;
00213
00214 return sub;
00215 }
00216
00217 bool Recorder::isSubscribed(string const& topic) const {
00218 return currently_recording_.find(topic) != currently_recording_.end();
00219 }
00220
00221 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00222
00223 if (isSubscribed(topic)) {
00224 return false;
00225 }
00226
00227
00228 if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00229 return false;
00230 }
00231
00232 if(options_.record_all || from_node) {
00233 return true;
00234 }
00235
00236 if (options_.regex) {
00237
00238 foreach(string const& regex_str, options_.topics) {
00239 boost::regex e(regex_str);
00240 boost::smatch what;
00241 if (boost::regex_match(topic, what, e, boost::match_extra))
00242 return true;
00243 }
00244 }
00245 else {
00246 foreach(string const& t, options_.topics)
00247 if (t == topic)
00248 return true;
00249 }
00250
00251 return false;
00252 }
00253
00254 template<class T>
00255 std::string Recorder::timeToStr(T ros_t)
00256 {
00257 std::stringstream msg;
00258 const boost::posix_time::ptime now=
00259 boost::posix_time::second_clock::local_time();
00260 boost::posix_time::time_facet *const f=
00261 new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
00262 msg.imbue(std::locale(msg.getloc(),f));
00263 msg << now;
00264 return msg.str();
00265 }
00266
00268 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00269
00270 Time rectime = Time::now();
00271
00272 if (options_.verbose)
00273 cout << "Received message on topic " << subscriber->getTopic() << endl;
00274
00275 OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00276
00277 {
00278 boost::mutex::scoped_lock lock(queue_mutex_);
00279
00280 queue_->push(out);
00281 queue_size_ += out.msg->size();
00282
00283
00284 while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00285 OutgoingMessage drop = queue_->front();
00286 queue_->pop();
00287 queue_size_ -= drop.msg->size();
00288
00289 if (!options_.snapshot) {
00290 Time now = Time::now();
00291 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00292 ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message.");
00293 last_buffer_warn_ = now;
00294 }
00295 }
00296 }
00297 }
00298
00299 if (!options_.snapshot)
00300 queue_condition_.notify_all();
00301
00302
00303 if ((*count) > 0) {
00304 (*count)--;
00305 if ((*count) == 0) {
00306 subscriber->shutdown();
00307
00308 num_subscribers_--;
00309
00310 if (num_subscribers_ == 0)
00311 ros::shutdown();
00312 }
00313 }
00314 }
00315
00316 void Recorder::updateFilenames() {
00317 vector<string> parts;
00318
00319 std::string prefix = options_.prefix;
00320 uint32_t ind = prefix.rfind(".bag");
00321
00322 if (ind == prefix.size() - 4)
00323 {
00324 prefix.erase(ind);
00325 ind = prefix.rfind(".bag");
00326 }
00327
00328 if (prefix.length() > 0)
00329 parts.push_back(prefix);
00330 if (options_.append_date)
00331 parts.push_back(timeToStr(ros::WallTime::now()));
00332 if (options_.split)
00333 parts.push_back(boost::lexical_cast<string>(split_count_));
00334
00335 target_filename_ = parts[0];
00336 for (unsigned int i = 1; i < parts.size(); i++)
00337 target_filename_ += string("_") + parts[i];
00338
00339 target_filename_ += string(".bag");
00340 write_filename_ = target_filename_ + string(".active");
00341 }
00342
00344 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00345 updateFilenames();
00346
00347 ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00348
00349 {
00350 boost::mutex::scoped_lock lock(queue_mutex_);
00351 queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00352 queue_ = new std::queue<OutgoingMessage>;
00353 queue_size_ = 0;
00354 }
00355
00356 queue_condition_.notify_all();
00357 }
00358
00359 void Recorder::startWriting() {
00360 bag_.setCompression(options_.compression);
00361 bag_.setChunkThreshold(options_.chunk_size);
00362
00363 updateFilenames();
00364 try {
00365 bag_.open(write_filename_, bagmode::Write);
00366 }
00367 catch (rosbag::BagException e) {
00368 ROS_ERROR("Error writing: %s", e.what());
00369 exit_code_ = 1;
00370 ros::shutdown();
00371 }
00372 ROS_INFO("Recording to %s.", target_filename_.c_str());
00373 }
00374
00375 void Recorder::stopWriting() {
00376 ROS_INFO("Closing %s.", target_filename_.c_str());
00377 bag_.close();
00378 rename(write_filename_.c_str(), target_filename_.c_str());
00379 }
00380
00381 bool Recorder::checkSize()
00382 {
00383 if (options_.max_size > 0)
00384 {
00385 if (bag_.getSize() > options_.max_size)
00386 {
00387 if (options_.split)
00388 {
00389 stopWriting();
00390 split_count_++;
00391 startWriting();
00392 } else {
00393 ros::shutdown();
00394 return true;
00395 }
00396 }
00397 }
00398 return false;
00399 }
00400
00401 bool Recorder::checkDuration(const ros::Time& t)
00402 {
00403 if (options_.max_duration > ros::Duration(0))
00404 {
00405 if (t - start_time_ > options_.max_duration)
00406 {
00407 if (options_.split)
00408 {
00409 while (start_time_ + options_.max_duration < t)
00410 {
00411 stopWriting();
00412 split_count_++;
00413 start_time_ += options_.max_duration;
00414 startWriting();
00415 }
00416 } else {
00417 ros::shutdown();
00418 return true;
00419 }
00420 }
00421 }
00422 return false;
00423 }
00424
00425
00427 void Recorder::doRecord() {
00428
00429 startWriting();
00430
00431
00432 warn_next_ = ros::WallTime();
00433 checkDisk();
00434 check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00435
00436
00437
00438
00439 ros::NodeHandle nh;
00440 while (nh.ok() || !queue_->empty()) {
00441 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00442
00443 bool finished = false;
00444 while (queue_->empty()) {
00445 if (!nh.ok()) {
00446 lock.release()->unlock();
00447 finished = true;
00448 break;
00449 }
00450 boost::xtime xt;
00451 #if BOOST_VERSION >= 105000
00452 boost::xtime_get(&xt, boost::TIME_UTC_);
00453 #else
00454 boost::xtime_get(&xt, boost::TIME_UTC);
00455 #endif
00456 xt.nsec += 250000000;
00457 queue_condition_.timed_wait(lock, xt);
00458 if (checkDuration(ros::Time::now()))
00459 {
00460 finished = true;
00461 break;
00462 }
00463 }
00464 if (finished)
00465 break;
00466
00467 OutgoingMessage out = queue_->front();
00468 queue_->pop();
00469 queue_size_ -= out.msg->size();
00470
00471 lock.release()->unlock();
00472
00473 if (checkSize())
00474 break;
00475
00476 if (checkDuration(out.time))
00477 break;
00478
00479 if (scheduledCheckDisk() && checkLogging())
00480 bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00481 }
00482
00483 stopWriting();
00484 }
00485
00486 void Recorder::doRecordSnapshotter() {
00487 ros::NodeHandle nh;
00488
00489 while (nh.ok() || !queue_queue_.empty()) {
00490 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00491 while (queue_queue_.empty()) {
00492 if (!nh.ok())
00493 return;
00494 queue_condition_.wait(lock);
00495 }
00496
00497 OutgoingQueue out_queue = queue_queue_.front();
00498 queue_queue_.pop();
00499
00500 lock.release()->unlock();
00501
00502 string target_filename = out_queue.filename;
00503 string write_filename = target_filename + string(".active");
00504
00505 try {
00506 bag_.open(write_filename, bagmode::Write);
00507 }
00508 catch (rosbag::BagException ex) {
00509 ROS_ERROR("Error writing: %s", ex.what());
00510 return;
00511 }
00512
00513 while (!out_queue.queue->empty()) {
00514 OutgoingMessage out = out_queue.queue->front();
00515 out_queue.queue->pop();
00516
00517 bag_.write(out.topic, out.time, *out.msg);
00518 }
00519
00520 stopWriting();
00521 }
00522 }
00523
00524 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00525 ros::master::V_TopicInfo topics;
00526 if (ros::master::getTopics(topics)) {
00527 foreach(ros::master::TopicInfo const& t, topics) {
00528 if (shouldSubscribeToTopic(t.name))
00529 subscribe(t.name);
00530 }
00531 }
00532
00533 if (options_.node != std::string(""))
00534 {
00535
00536 XmlRpc::XmlRpcValue req;
00537 req[0] = ros::this_node::getName();
00538 req[1] = options_.node;
00539 XmlRpc::XmlRpcValue resp;
00540 XmlRpc::XmlRpcValue payload;
00541
00542 if (ros::master::execute("lookupNode", req, resp, payload, true))
00543 {
00544 std::string peer_host;
00545 uint32_t peer_port;
00546
00547 if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00548 {
00549 ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00550 } else {
00551
00552 XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00553 XmlRpc::XmlRpcValue req;
00554 XmlRpc::XmlRpcValue resp;
00555 req[0] = ros::this_node::getName();
00556 c.execute("getSubscriptions", req, resp);
00557
00558 if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1)
00559 {
00560 for(int i = 0; i < resp[2].size(); i++)
00561 {
00562 if (shouldSubscribeToTopic(resp[2][i][0], true))
00563 subscribe(resp[2][i][0]);
00564 }
00565 } else {
00566 ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00567 }
00568 }
00569 }
00570 }
00571 }
00572
00573 void Recorder::doTrigger() {
00574 ros::NodeHandle nh;
00575 ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00576 pub.publish(std_msgs::Empty());
00577
00578 ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00579 ros::spin();
00580 }
00581
00582 bool Recorder::scheduledCheckDisk() {
00583 boost::mutex::scoped_lock lock(check_disk_mutex_);
00584
00585 if (ros::WallTime::now() < check_disk_next_)
00586 return true;
00587
00588 check_disk_next_ += ros::WallDuration().fromSec(20.0);
00589 return checkDisk();
00590 }
00591
00592 bool Recorder::checkDisk() {
00593 #if BOOST_FILESYSTEM_VERSION < 3
00594 struct statvfs fiData;
00595 if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0)
00596 {
00597 ROS_WARN("Failed to check filesystem stats.");
00598 return true;
00599 }
00600 unsigned long long free_space = 0;
00601 free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00602 if (free_space < 1073741824ull)
00603 {
00604 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str());
00605 writing_enabled_ = false;
00606 return false;
00607 }
00608 else if (free_space < 5368709120ull)
00609 {
00610 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00611 }
00612 else
00613 {
00614 writing_enabled_ = true;
00615 }
00616 #else
00617 boost::filesystem::path p(boost::filesystem::system_complete(bag_.getFileName().c_str()));
00618 p = p.parent_path();
00619 boost::filesystem::space_info info;
00620 try
00621 {
00622 info = boost::filesystem::space(p);
00623 }
00624 catch (boost::filesystem::filesystem_error &e)
00625 {
00626 ROS_WARN("Failed to check filesystem stats [%s].", e.what());
00627 writing_enabled_ = false;
00628 return false;
00629 }
00630 if ( info.available < 1073741824ull)
00631 {
00632 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str());
00633 writing_enabled_ = false;
00634 return false;
00635 }
00636 else if (info.available < 5368709120ull)
00637 {
00638 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00639 writing_enabled_ = true;
00640 }
00641 else
00642 {
00643 writing_enabled_ = true;
00644 }
00645 #endif
00646 return true;
00647 }
00648
00649 bool Recorder::checkLogging() {
00650 if (writing_enabled_)
00651 return true;
00652
00653 ros::WallTime now = ros::WallTime::now();
00654 if (now >= warn_next_) {
00655 warn_next_ = now + ros::WallDuration().fromSec(5.0);
00656 ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk.");
00657 }
00658 return false;
00659 }
00660
00661 }