00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/recorder.h"
00036
00037 #include <sys/stat.h>
00038 #include <sys/statvfs.h>
00039 #include <time.h>
00040
00041 #include <queue>
00042 #include <set>
00043 #include <sstream>
00044 #include <string>
00045
00046 #include <boost/foreach.hpp>
00047 #include <boost/lexical_cast.hpp>
00048 #include <boost/regex.hpp>
00049 #include <boost/thread.hpp>
00050 #include <boost/thread/xtime.hpp>
00051
00052 #include <ros/ros.h>
00053 #include <topic_tools/shape_shifter.h>
00054
00055 #include "ros/network.h"
00056 #include "ros/xmlrpc_manager.h"
00057 #include "XmlRpc.h"
00058
00059 #define foreach BOOST_FOREACH
00060
00061 using std::cout;
00062 using std::endl;
00063 using std::set;
00064 using std::string;
00065 using std::vector;
00066 using boost::shared_ptr;
00067 using ros::Time;
00068
00069 namespace rosbag {
00070
00071
00072
00073 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00074 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00075 {
00076 }
00077
00078
00079
00080 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00081 filename(_filename), queue(_queue), time(_time)
00082 {
00083 }
00084
00085
00086
00087 RecorderOptions::RecorderOptions() :
00088 trigger(false),
00089 record_all(false),
00090 regex(false),
00091 do_exclude(false),
00092 quiet(false),
00093 append_date(true),
00094 snapshot(false),
00095 verbose(false),
00096 compression(compression::Uncompressed),
00097 prefix(""),
00098 name(""),
00099 exclude_regex(),
00100 buffer_size(1048576 * 256),
00101 limit(0),
00102 split(false),
00103 max_size(0),
00104 max_duration(-1.0),
00105 node("")
00106 {
00107 }
00108
00109
00110
00111 Recorder::Recorder(RecorderOptions const& options) :
00112 options_(options),
00113 num_subscribers_(0),
00114 exit_code_(0),
00115 queue_size_(0),
00116 split_count_(0),
00117 writing_enabled_(true)
00118 {
00119 }
00120
00121 int Recorder::run() {
00122 if (options_.trigger) {
00123 doTrigger();
00124 return 0;
00125 }
00126
00127 if (options_.topics.size() == 0) {
00128
00129 if (options_.limit > 0) {
00130 fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00131 return 1;
00132 }
00133
00134
00135 if (!options_.record_all && (options_.node == std::string(""))) {
00136 fprintf(stderr, "No topics specified.\n");
00137 return 1;
00138 }
00139 }
00140
00141 ros::NodeHandle nh;
00142 if (!nh.ok())
00143 return 0;
00144
00145 last_buffer_warn_ = Time();
00146 queue_ = new std::queue<OutgoingMessage>;
00147
00148
00149 if (!options_.regex) {
00150 foreach(string const& topic, options_.topics)
00151 subscribe(topic);
00152 }
00153
00154 if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00155 ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
00156
00157 ros::Time::waitForValid();
00158
00159 start_time_ = ros::Time::now();
00160
00161
00162 if (!nh.ok())
00163 return 0;
00164
00165 ros::Subscriber trigger_sub;
00166
00167
00168 boost::thread record_thread;
00169 if (options_.snapshot)
00170 {
00171 record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00172
00173
00174 trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00175 }
00176 else
00177 record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00178
00179
00180
00181 ros::Timer check_master_timer;
00182 if (options_.record_all || options_.regex || (options_.node != std::string("")))
00183 check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00184
00185 ros::MultiThreadedSpinner s(10);
00186 ros::spin(s);
00187
00188 queue_condition_.notify_all();
00189
00190 record_thread.join();
00191
00192 delete queue_;
00193
00194 return exit_code_;
00195 }
00196
00197 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00198 ROS_INFO("Subscribing to %s", topic.c_str());
00199
00200 ros::NodeHandle nh;
00201 shared_ptr<int> count(new int(options_.limit));
00202 shared_ptr<ros::Subscriber> sub(new ros::Subscriber);
00203 *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00204 currently_recording_.insert(topic);
00205 num_subscribers_++;
00206
00207 return sub;
00208 }
00209
00210 bool Recorder::isSubscribed(string const& topic) const {
00211 return currently_recording_.find(topic) != currently_recording_.end();
00212 }
00213
00214 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00215
00216 if (isSubscribed(topic)) {
00217 return false;
00218 }
00219
00220
00221 if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00222 return false;
00223 }
00224
00225 if(options_.record_all || from_node) {
00226 return true;
00227 }
00228
00229 if (options_.regex) {
00230
00231 foreach(string const& regex_str, options_.topics) {
00232 boost::regex e(regex_str);
00233 boost::smatch what;
00234 if (boost::regex_match(topic, what, e, boost::match_extra))
00235 return true;
00236 }
00237 }
00238 else {
00239 foreach(string const& t, options_.topics)
00240 if (t == topic)
00241 return true;
00242 }
00243
00244 return false;
00245 }
00246
00247 template<class T>
00248 std::string Recorder::timeToStr(T ros_t) {
00249 char buf[1024] = "";
00250 time_t t = ros_t.sec;
00251 struct tm* tms = localtime(&t);
00252 strftime(buf, 1024, "%Y-%m-%d-%H-%M-%S", tms);
00253 return string(buf);
00254 }
00255
00257 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00258
00259 Time rectime = Time::now();
00260
00261 if (options_.verbose)
00262 cout << "Received message on topic " << subscriber->getTopic() << endl;
00263
00264 OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00265
00266 {
00267 boost::mutex::scoped_lock lock(queue_mutex_);
00268
00269 queue_->push(out);
00270 queue_size_ += out.msg->size();
00271
00272
00273 while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00274 OutgoingMessage drop = queue_->front();
00275 queue_->pop();
00276 queue_size_ -= drop.msg->size();
00277
00278 if (!options_.snapshot) {
00279 Time now = Time::now();
00280 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00281 ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message.");
00282 last_buffer_warn_ = now;
00283 }
00284 }
00285 }
00286 }
00287
00288 if (!options_.snapshot)
00289 queue_condition_.notify_all();
00290
00291
00292 if ((*count) > 0) {
00293 (*count)--;
00294 if ((*count) == 0) {
00295 subscriber->shutdown();
00296
00297 num_subscribers_--;
00298
00299 if (num_subscribers_ == 0)
00300 ros::shutdown();
00301 }
00302 }
00303 }
00304
00305 void Recorder::updateFilenames() {
00306 vector<string> parts;
00307
00308 std::string prefix = options_.prefix;
00309 uint32_t ind = prefix.rfind(".bag");
00310
00311 if (ind == prefix.size() - 4)
00312 {
00313 prefix.erase(ind);
00314 ind = prefix.rfind(".bag");
00315 }
00316
00317 if (prefix.length() > 0)
00318 parts.push_back(prefix);
00319 if (options_.append_date)
00320 parts.push_back(timeToStr(ros::WallTime::now()));
00321 if (options_.split)
00322 parts.push_back(boost::lexical_cast<string>(split_count_));
00323
00324 target_filename_ = parts[0];
00325 for (unsigned int i = 1; i < parts.size(); i++)
00326 target_filename_ += string("_") + parts[i];
00327
00328 target_filename_ += string(".bag");
00329 write_filename_ = target_filename_ + string(".active");
00330 }
00331
00333 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00334 updateFilenames();
00335
00336 ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00337
00338 {
00339 boost::mutex::scoped_lock lock(queue_mutex_);
00340 queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00341 queue_ = new std::queue<OutgoingMessage>;
00342 queue_size_ = 0;
00343 }
00344
00345 queue_condition_.notify_all();
00346 }
00347
00348 void Recorder::startWriting() {
00349 bag_.setCompression(options_.compression);
00350
00351 updateFilenames();
00352 try {
00353 bag_.open(write_filename_, bagmode::Write);
00354 }
00355 catch (rosbag::BagException e) {
00356 ROS_ERROR("Error writing: %s", e.what());
00357 exit_code_ = 1;
00358 ros::shutdown();
00359 }
00360 ROS_INFO("Recording to %s.", target_filename_.c_str());
00361 }
00362
00363 void Recorder::stopWriting() {
00364 ROS_INFO("Closing %s.", target_filename_.c_str());
00365 bag_.close();
00366 rename(write_filename_.c_str(), target_filename_.c_str());
00367 }
00368
00369 bool Recorder::checkSize()
00370 {
00371 if (options_.max_size > 0)
00372 {
00373 if (bag_.getSize() > options_.max_size)
00374 {
00375 if (options_.split)
00376 {
00377 stopWriting();
00378 split_count_++;
00379 startWriting();
00380 } else {
00381 ros::shutdown();
00382 return true;
00383 }
00384 }
00385 }
00386 return false;
00387 }
00388
00389 bool Recorder::checkDuration(const ros::Time& t)
00390 {
00391 if (options_.max_duration > ros::Duration(0))
00392 {
00393 if (t - start_time_ > options_.max_duration)
00394 {
00395 if (options_.split)
00396 {
00397 while (start_time_ + options_.max_duration < t)
00398 {
00399 stopWriting();
00400 split_count_++;
00401 start_time_ += options_.max_duration;
00402 startWriting();
00403 }
00404 } else {
00405 ros::shutdown();
00406 return true;
00407 }
00408 }
00409 }
00410 return false;
00411 }
00412
00413
00415 void Recorder::doRecord() {
00416
00417 startWriting();
00418
00419
00420 warn_next_ = ros::WallTime();
00421 checkDisk();
00422 check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00423
00424
00425
00426
00427 ros::NodeHandle nh;
00428 while (nh.ok() || !queue_->empty()) {
00429 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00430
00431 bool finished = false;
00432 while (queue_->empty()) {
00433 if (!nh.ok()) {
00434 lock.release()->unlock();
00435 finished = true;
00436 break;
00437 }
00438 boost::xtime xt;
00439 boost::xtime_get(&xt, boost::TIME_UTC);
00440 xt.nsec += 250000000;
00441 queue_condition_.timed_wait(lock, xt);
00442 if (checkDuration(ros::Time::now()))
00443 {
00444 finished = true;
00445 break;
00446 }
00447 }
00448 if (finished)
00449 break;
00450
00451 OutgoingMessage out = queue_->front();
00452 queue_->pop();
00453 queue_size_ -= out.msg->size();
00454
00455 lock.release()->unlock();
00456
00457 if (checkSize())
00458 break;
00459
00460 if (checkDuration(out.time))
00461 break;
00462
00463 if (scheduledCheckDisk() && checkLogging())
00464 bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00465 }
00466
00467 stopWriting();
00468 }
00469
00470 void Recorder::doRecordSnapshotter() {
00471 ros::NodeHandle nh;
00472
00473 while (nh.ok() || !queue_queue_.empty()) {
00474 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00475 while (queue_queue_.empty()) {
00476 if (!nh.ok())
00477 return;
00478 queue_condition_.wait(lock);
00479 }
00480
00481 OutgoingQueue out_queue = queue_queue_.front();
00482 queue_queue_.pop();
00483
00484 lock.release()->unlock();
00485
00486 string target_filename = out_queue.filename;
00487 string write_filename = target_filename + string(".active");
00488
00489 try {
00490 bag_.open(write_filename, bagmode::Write);
00491 }
00492 catch (rosbag::BagException ex) {
00493 ROS_ERROR("Error writing: %s", ex.what());
00494 return;
00495 }
00496
00497 while (!out_queue.queue->empty()) {
00498 OutgoingMessage out = out_queue.queue->front();
00499 out_queue.queue->pop();
00500
00501 bag_.write(out.topic, out.time, *out.msg);
00502 }
00503
00504 stopWriting();
00505 }
00506 }
00507
00508 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00509 ros::master::V_TopicInfo topics;
00510 if (ros::master::getTopics(topics)) {
00511 foreach(ros::master::TopicInfo const& t, topics) {
00512 if (shouldSubscribeToTopic(t.name))
00513 subscribe(t.name);
00514 }
00515 }
00516
00517 if (options_.node != std::string(""))
00518 {
00519
00520 XmlRpc::XmlRpcValue req;
00521 req[0] = ros::this_node::getName();
00522 req[1] = options_.node;
00523 XmlRpc::XmlRpcValue resp;
00524 XmlRpc::XmlRpcValue payload;
00525
00526 if (ros::master::execute("lookupNode", req, resp, payload, true))
00527 {
00528 std::string peer_host;
00529 uint32_t peer_port;
00530
00531 if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00532 {
00533 ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00534 } else {
00535
00536 XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00537 XmlRpc::XmlRpcValue req;
00538 XmlRpc::XmlRpcValue resp;
00539 req[0] = ros::this_node::getName();
00540 c.execute("getSubscriptions", req, resp);
00541
00542 if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1)
00543 {
00544 for(int i = 0; i < resp[2].size(); i++)
00545 {
00546 if (shouldSubscribeToTopic(resp[2][i][0], true))
00547 subscribe(resp[2][i][0]);
00548 }
00549 } else {
00550 ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00551 }
00552 }
00553 }
00554 }
00555 }
00556
00557 void Recorder::doTrigger() {
00558 ros::NodeHandle nh;
00559 ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00560 pub.publish(std_msgs::Empty());
00561
00562 ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00563 ros::spin();
00564 }
00565
00566 bool Recorder::scheduledCheckDisk() {
00567 boost::mutex::scoped_lock lock(check_disk_mutex_);
00568
00569 if (ros::WallTime::now() < check_disk_next_)
00570 return true;
00571
00572 check_disk_next_ += ros::WallDuration().fromSec(20.0);
00573 return checkDisk();
00574 }
00575
00576 bool Recorder::checkDisk() {
00577 struct statvfs fiData;
00578 if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0) {
00579 ROS_WARN("Failed to check filesystem stats.");
00580 return true;
00581 }
00582
00583 unsigned long long free_space = 0;
00584 free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00585 if (free_space < 1073741824ull) {
00586 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str());
00587 writing_enabled_ = false;
00588 return false;
00589 }
00590 else if (free_space < 5368709120ull) {
00591 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00592 }
00593 else {
00594 writing_enabled_ = true;
00595 }
00596
00597 return true;
00598 }
00599
00600 bool Recorder::checkLogging() {
00601 if (writing_enabled_)
00602 return true;
00603
00604 ros::WallTime now = ros::WallTime::now();
00605 if (now >= warn_next_) {
00606 warn_next_ = now + ros::WallDuration().fromSec(5.0);
00607 ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk.");
00608 }
00609 return false;
00610 }
00611
00612 }