00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "rosbag/recorder.h"
00036
00037 #include <sys/stat.h>
00038 #include <sys/statvfs.h>
00039 #include <time.h>
00040
00041 #include <queue>
00042 #include <set>
00043 #include <sstream>
00044 #include <string>
00045
00046 #include <boost/foreach.hpp>
00047 #include <boost/lexical_cast.hpp>
00048 #include <boost/regex.hpp>
00049 #include <boost/thread.hpp>
00050 #include <boost/thread/xtime.hpp>
00051
00052 #include <ros/ros.h>
00053 #include <topic_tools/shape_shifter.h>
00054
00055 #include "ros/network.h"
00056 #include "ros/xmlrpc_manager.h"
00057 #include "XmlRpc.h"
00058
00059 #define foreach BOOST_FOREACH
00060
00061 using std::cout;
00062 using std::endl;
00063 using std::set;
00064 using std::string;
00065 using std::vector;
00066 using boost::shared_ptr;
00067 using ros::Time;
00068
00069 namespace rosbag {
00070
00071
00072
00073 OutgoingMessage::OutgoingMessage(string const& _topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr<ros::M_string> _connection_header, Time _time) :
00074 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
00075 {
00076 }
00077
00078
00079
00080 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
00081 filename(_filename), queue(_queue), time(_time)
00082 {
00083 }
00084
00085
00086
00087 RecorderOptions::RecorderOptions() :
00088 trigger(false),
00089 record_all(false),
00090 regex(false),
00091 do_exclude(false),
00092 quiet(false),
00093 append_date(true),
00094 snapshot(false),
00095 verbose(false),
00096 compression(compression::Uncompressed),
00097 prefix(""),
00098 name(""),
00099 exclude_regex(),
00100 buffer_size(1048576 * 256),
00101 limit(0),
00102 split(false),
00103 max_size(0),
00104 max_duration(-1.0),
00105 node("")
00106 {
00107 }
00108
00109
00110
00111 Recorder::Recorder(RecorderOptions const& options) :
00112 options_(options),
00113 num_subscribers_(0),
00114 exit_code_(0),
00115 queue_size_(0),
00116 split_count_(0),
00117 writing_enabled_(true)
00118 {
00119 }
00120
00121 int Recorder::run() {
00122 if (options_.trigger) {
00123 doTrigger();
00124 return 0;
00125 }
00126
00127 if (options_.topics.size() == 0) {
00128
00129 if (options_.limit > 0) {
00130 fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
00131 return 1;
00132 }
00133
00134
00135 if (!options_.record_all && (options_.node == std::string(""))) {
00136 fprintf(stderr, "No topics specified.\n");
00137 return 1;
00138 }
00139 }
00140
00141 ros::NodeHandle nh;
00142 if (!nh.ok())
00143 return 0;
00144
00145 last_buffer_warn_ = Time();
00146 queue_ = new std::queue<OutgoingMessage>;
00147
00148
00149 if (!options_.regex) {
00150 foreach(string const& topic, options_.topics)
00151 subscribe(topic);
00152 }
00153
00154 if (!ros::Time::waitForValid(ros::WallDuration(2.0)))
00155 ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
00156
00157 ros::Time::waitForValid();
00158
00159 start_time_ = ros::Time::now();
00160
00161
00162 if (!nh.ok())
00163 return 0;
00164
00165 ros::Subscriber trigger_sub;
00166
00167
00168 boost::thread record_thread;
00169 if (options_.snapshot)
00170 {
00171 record_thread = boost::thread(boost::bind(&Recorder::doRecordSnapshotter, this));
00172
00173
00174 trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
00175 }
00176 else
00177 record_thread = boost::thread(boost::bind(&Recorder::doRecord, this));
00178
00179
00180
00181 ros::Timer check_master_timer;
00182 if (options_.record_all || options_.regex || (options_.node != std::string("")))
00183 check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
00184
00185 ros::MultiThreadedSpinner s(10);
00186 ros::spin(s);
00187
00188 queue_condition_.notify_all();
00189
00190 record_thread.join();
00191
00192 delete queue_;
00193
00194 return exit_code_;
00195 }
00196
00197 shared_ptr<ros::Subscriber> Recorder::subscribe(string const& topic) {
00198 ROS_INFO("Subscribing to %s", topic.c_str());
00199
00200 ros::NodeHandle nh;
00201 shared_ptr<int> count(new int(options_.limit));
00202 shared_ptr<ros::Subscriber> sub(new ros::Subscriber);
00203 *sub = nh.subscribe<topic_tools::ShapeShifter>(topic, 100, boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
00204 currently_recording_.insert(topic);
00205 num_subscribers_++;
00206
00207 return sub;
00208 }
00209
00210 bool Recorder::isSubscribed(string const& topic) const {
00211 return currently_recording_.find(topic) != currently_recording_.end();
00212 }
00213
00214 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
00215
00216 if (isSubscribed(topic)) {
00217 return false;
00218 }
00219
00220
00221 if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
00222 return false;
00223 }
00224
00225 if(options_.record_all || from_node) {
00226 return true;
00227 }
00228
00229 if (options_.regex) {
00230
00231 foreach(string const& regex_str, options_.topics) {
00232 boost::regex e(regex_str);
00233 boost::smatch what;
00234 if (boost::regex_match(topic, what, e, boost::match_extra))
00235 return true;
00236 }
00237 }
00238 else {
00239 foreach(string const& t, options_.topics)
00240 if (t == topic)
00241 return true;
00242 }
00243
00244 return false;
00245 }
00246
00247 template<class T>
00248 std::string Recorder::timeToStr(T ros_t) {
00249 char buf[1024] = "";
00250 time_t t = ros_t.sec;
00251 struct tm* tms = localtime(&t);
00252 strftime(buf, 1024, "%Y-%m-%d-%H-%M-%S", tms);
00253 return string(buf);
00254 }
00255
00257 void Recorder::doQueue(ros::MessageEvent<topic_tools::ShapeShifter const> msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
00258
00259 Time rectime = Time::now();
00260
00261 if (options_.verbose)
00262 cout << "Received message on topic " << subscriber->getTopic() << endl;
00263
00264 OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
00265
00266 {
00267 boost::mutex::scoped_lock lock(queue_mutex_);
00268
00269 queue_->push(out);
00270 queue_size_ += out.msg->size();
00271
00272
00273 while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
00274 OutgoingMessage drop = queue_->front();
00275 queue_->pop();
00276 queue_size_ -= drop.msg->size();
00277
00278 if (!options_.snapshot) {
00279 Time now = Time::now();
00280 if (now > last_buffer_warn_ + ros::Duration(5.0)) {
00281 ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message.");
00282 last_buffer_warn_ = now;
00283 }
00284 }
00285 }
00286 }
00287
00288 if (!options_.snapshot)
00289 queue_condition_.notify_all();
00290
00291
00292 if ((*count) > 0) {
00293 (*count)--;
00294 if ((*count) == 0) {
00295 subscriber->shutdown();
00296
00297 num_subscribers_--;
00298
00299 if (num_subscribers_ == 0)
00300 ros::shutdown();
00301 }
00302 }
00303 }
00304
00305 void Recorder::updateFilenames() {
00306 vector<string> parts;
00307
00308 std::string prefix = options_.prefix;
00309 uint32_t ind = prefix.rfind(".bag");
00310
00311 if (ind == prefix.size() - 4)
00312 {
00313 prefix.erase(ind);
00314 ind = prefix.rfind(".bag");
00315 }
00316
00317 if (prefix.length() > 0)
00318 parts.push_back(prefix);
00319 if (options_.append_date)
00320 parts.push_back(timeToStr(ros::WallTime::now()));
00321 if (options_.split)
00322 parts.push_back(boost::lexical_cast<string>(split_count_));
00323
00324 target_filename_ = parts[0];
00325 for (unsigned int i = 1; i < parts.size(); i++)
00326 target_filename_ += string("_") + parts[i];
00327
00328 target_filename_ += string(".bag");
00329 write_filename_ = target_filename_ + string(".active");
00330 }
00331
00333 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
00334 updateFilenames();
00335
00336 ROS_INFO("Triggered snapshot recording with name %s.", target_filename_.c_str());
00337
00338 {
00339 boost::mutex::scoped_lock lock(queue_mutex_);
00340 queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
00341 queue_ = new std::queue<OutgoingMessage>;
00342 queue_size_ = 0;
00343 }
00344
00345 queue_condition_.notify_all();
00346 }
00347
00348 void Recorder::startWriting() {
00349 bag_.setCompression(options_.compression);
00350
00351 updateFilenames();
00352 try {
00353 bag_.open(write_filename_, bagmode::Write);
00354 }
00355 catch (rosbag::BagException e) {
00356 ROS_ERROR("Error writing: %s", e.what());
00357 exit_code_ = 1;
00358 ros::shutdown();
00359 }
00360 ROS_INFO("Recording to %s.", target_filename_.c_str());
00361 }
00362
00363 void Recorder::stopWriting() {
00364 ROS_INFO("Closing %s.", target_filename_.c_str());
00365 bag_.close();
00366 rename(write_filename_.c_str(), target_filename_.c_str());
00367 }
00368
00369 bool Recorder::checkSize()
00370 {
00371 if (options_.max_size > 0)
00372 {
00373 if (bag_.getSize() > options_.max_size)
00374 {
00375 if (options_.split)
00376 {
00377 stopWriting();
00378 split_count_++;
00379 startWriting();
00380 } else {
00381 ros::shutdown();
00382 return true;
00383 }
00384 }
00385 }
00386 return false;
00387 }
00388
00389 bool Recorder::checkDuration(const ros::Time& t)
00390 {
00391 if (options_.max_duration > ros::Duration(0))
00392 {
00393 if (t - start_time_ > options_.max_duration)
00394 {
00395 if (options_.split)
00396 {
00397 while (start_time_ + options_.max_duration < t)
00398 {
00399 stopWriting();
00400 split_count_++;
00401 start_time_ += options_.max_duration;
00402 startWriting();
00403 }
00404 } else {
00405 ros::shutdown();
00406 return true;
00407 }
00408 }
00409 }
00410 return false;
00411 }
00412
00413
00415 void Recorder::doRecord() {
00416
00417 startWriting();
00418
00419
00420 warn_next_ = ros::WallTime();
00421 checkDisk();
00422 check_disk_next_ = ros::WallTime::now() + ros::WallDuration().fromSec(20.0);
00423
00424
00425
00426
00427 ros::NodeHandle nh;
00428 while (nh.ok() || !queue_->empty()) {
00429 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00430
00431 bool finished = false;
00432 while (queue_->empty()) {
00433 if (!nh.ok()) {
00434 lock.release()->unlock();
00435 finished = true;
00436 break;
00437 }
00438 boost::xtime xt;
00439 #if BOOST_VERSION >= 105000
00440 boost::xtime_get(&xt, boost::TIME_UTC_);
00441 #else
00442 boost::xtime_get(&xt, boost::TIME_UTC);
00443 #endif
00444 xt.nsec += 250000000;
00445 queue_condition_.timed_wait(lock, xt);
00446 if (checkDuration(ros::Time::now()))
00447 {
00448 finished = true;
00449 break;
00450 }
00451 }
00452 if (finished)
00453 break;
00454
00455 OutgoingMessage out = queue_->front();
00456 queue_->pop();
00457 queue_size_ -= out.msg->size();
00458
00459 lock.release()->unlock();
00460
00461 if (checkSize())
00462 break;
00463
00464 if (checkDuration(out.time))
00465 break;
00466
00467 if (scheduledCheckDisk() && checkLogging())
00468 bag_.write(out.topic, out.time, *out.msg, out.connection_header);
00469 }
00470
00471 stopWriting();
00472 }
00473
00474 void Recorder::doRecordSnapshotter() {
00475 ros::NodeHandle nh;
00476
00477 while (nh.ok() || !queue_queue_.empty()) {
00478 boost::unique_lock<boost::mutex> lock(queue_mutex_);
00479 while (queue_queue_.empty()) {
00480 if (!nh.ok())
00481 return;
00482 queue_condition_.wait(lock);
00483 }
00484
00485 OutgoingQueue out_queue = queue_queue_.front();
00486 queue_queue_.pop();
00487
00488 lock.release()->unlock();
00489
00490 string target_filename = out_queue.filename;
00491 string write_filename = target_filename + string(".active");
00492
00493 try {
00494 bag_.open(write_filename, bagmode::Write);
00495 }
00496 catch (rosbag::BagException ex) {
00497 ROS_ERROR("Error writing: %s", ex.what());
00498 return;
00499 }
00500
00501 while (!out_queue.queue->empty()) {
00502 OutgoingMessage out = out_queue.queue->front();
00503 out_queue.queue->pop();
00504
00505 bag_.write(out.topic, out.time, *out.msg);
00506 }
00507
00508 stopWriting();
00509 }
00510 }
00511
00512 void Recorder::doCheckMaster(ros::TimerEvent const& e, ros::NodeHandle& node_handle) {
00513 ros::master::V_TopicInfo topics;
00514 if (ros::master::getTopics(topics)) {
00515 foreach(ros::master::TopicInfo const& t, topics) {
00516 if (shouldSubscribeToTopic(t.name))
00517 subscribe(t.name);
00518 }
00519 }
00520
00521 if (options_.node != std::string(""))
00522 {
00523
00524 XmlRpc::XmlRpcValue req;
00525 req[0] = ros::this_node::getName();
00526 req[1] = options_.node;
00527 XmlRpc::XmlRpcValue resp;
00528 XmlRpc::XmlRpcValue payload;
00529
00530 if (ros::master::execute("lookupNode", req, resp, payload, true))
00531 {
00532 std::string peer_host;
00533 uint32_t peer_port;
00534
00535 if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
00536 {
00537 ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
00538 } else {
00539
00540 XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
00541 XmlRpc::XmlRpcValue req;
00542 XmlRpc::XmlRpcValue resp;
00543 req[0] = ros::this_node::getName();
00544 c.execute("getSubscriptions", req, resp);
00545
00546 if (!c.isFault() && resp.size() > 0 && static_cast<int>(resp[0]) == 1)
00547 {
00548 for(int i = 0; i < resp[2].size(); i++)
00549 {
00550 if (shouldSubscribeToTopic(resp[2][i][0], true))
00551 subscribe(resp[2][i][0]);
00552 }
00553 } else {
00554 ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
00555 }
00556 }
00557 }
00558 }
00559 }
00560
00561 void Recorder::doTrigger() {
00562 ros::NodeHandle nh;
00563 ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
00564 pub.publish(std_msgs::Empty());
00565
00566 ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
00567 ros::spin();
00568 }
00569
00570 bool Recorder::scheduledCheckDisk() {
00571 boost::mutex::scoped_lock lock(check_disk_mutex_);
00572
00573 if (ros::WallTime::now() < check_disk_next_)
00574 return true;
00575
00576 check_disk_next_ += ros::WallDuration().fromSec(20.0);
00577 return checkDisk();
00578 }
00579
00580 bool Recorder::checkDisk() {
00581 struct statvfs fiData;
00582 if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0) {
00583 ROS_WARN("Failed to check filesystem stats.");
00584 return true;
00585 }
00586
00587 unsigned long long free_space = 0;
00588 free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
00589 if (free_space < 1073741824ull) {
00590 ROS_ERROR("Less than 1GB of space free on disk with %s. Disabling recording.", bag_.getFileName().c_str());
00591 writing_enabled_ = false;
00592 return false;
00593 }
00594 else if (free_space < 5368709120ull) {
00595 ROS_WARN("Less than 5GB of space free on disk with %s.", bag_.getFileName().c_str());
00596 }
00597 else {
00598 writing_enabled_ = true;
00599 }
00600
00601 return true;
00602 }
00603
00604 bool Recorder::checkLogging() {
00605 if (writing_enabled_)
00606 return true;
00607
00608 ros::WallTime now = ros::WallTime::now();
00609 if (now >= warn_next_) {
00610 warn_next_ = now + ros::WallDuration().fromSec(5.0);
00611 ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk.");
00612 }
00613 return false;
00614 }
00615
00616 }