recorder.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2008, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Willow Garage, Inc. nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 
35 #include "rosbag/recorder.h"
36 
37 #include <sys/stat.h>
38 #include <boost/filesystem.hpp>
39 // Boost filesystem v3 is default in 1.46.0 and above
40 // Fallback to original posix code (*nix only) if this is not true
41 #if BOOST_FILESYSTEM_VERSION < 3
42  #include <sys/statvfs.h>
43 #endif
44 #include <time.h>
45 
46 #include <queue>
47 #include <set>
48 #include <sstream>
49 #include <string>
50 
51 #include <boost/lexical_cast.hpp>
52 #include <boost/regex.hpp>
53 #include <boost/thread.hpp>
54 #include <boost/thread/xtime.hpp>
55 #include <boost/date_time/local_time/local_time.hpp>
56 
57 #include <ros/ros.h>
59 
60 #include "ros/network.h"
61 #include "ros/xmlrpc_manager.h"
62 #include "xmlrpcpp/XmlRpc.h"
63 
64 using std::cout;
65 using std::endl;
66 using std::set;
67 using std::string;
68 using std::vector;
69 using boost::shared_ptr;
70 using ros::Time;
71 
72 namespace rosbag {
73 
74 // OutgoingMessage
75 
77  topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
78 {
79 }
80 
81 // OutgoingQueue
82 
83 OutgoingQueue::OutgoingQueue(string const& _filename, std::queue<OutgoingMessage>* _queue, Time _time) :
84  filename(_filename), queue(_queue), time(_time)
85 {
86 }
87 
88 // RecorderOptions
89 
91  trigger(false),
92  record_all(false),
93  regex(false),
94  do_exclude(false),
95  quiet(false),
96  append_date(true),
97  snapshot(false),
98  verbose(false),
99  publish(false),
100  compression(compression::Uncompressed),
101  prefix(""),
102  name(""),
103  exclude_regex(),
104  buffer_size(1048576 * 256),
105  chunk_size(1024 * 768),
106  limit(0),
107  split(false),
108  max_size(0),
109  max_splits(0),
110  max_duration(-1.0),
111  node(""),
112  min_space(1024 * 1024 * 1024),
113  min_space_str("1G")
114 {
115 }
116 
117 // Recorder
118 
120  options_(options),
121  num_subscribers_(0),
122  exit_code_(0),
123  queue_size_(0),
124  split_count_(0),
125  writing_enabled_(true)
126 {
127 }
128 
130  if (options_.trigger) {
131  doTrigger();
132  return 0;
133  }
134 
135  if (options_.topics.size() == 0) {
136  // Make sure limit is not specified with automatic topic subscription
137  if (options_.limit > 0) {
138  fprintf(stderr, "Specifying a count is not valid with automatic topic subscription.\n");
139  return 1;
140  }
141 
142  // Make sure topics are specified
143  if (!options_.record_all && (options_.node == std::string(""))) {
144  fprintf(stderr, "No topics specified.\n");
145  return 1;
146  }
147  }
148 
149  ros::NodeHandle nh;
150  if (!nh.ok())
151  return 0;
152 
153  if (options_.publish)
154  {
155  pub_begin_write = nh.advertise<std_msgs::String>("begin_write", 1, true);
156  }
157 
159  queue_ = new std::queue<OutgoingMessage>;
160 
161  // Subscribe to each topic
162  if (!options_.regex) {
163  for (string const& topic : options_.topics)
164  subscribe(topic);
165  }
166 
168  ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
169 
171 
173 
174  // Don't bother doing anything if we never got a valid time
175  if (!nh.ok())
176  return 0;
177 
178  ros::Subscriber trigger_sub;
179 
180  // Spin up a thread for writing to the file
181  boost::thread record_thread;
182  if (options_.snapshot)
183  {
184  record_thread = boost::thread([this]() {
185  try
186  {
187  this->doRecordSnapshotter();
188  }
189  catch (const rosbag::BagException& ex)
190  {
191  ROS_ERROR_STREAM(ex.what());
192  exit_code_ = 1;
193  }
194  catch (const std::exception& ex)
195  {
196  ROS_ERROR_STREAM(ex.what());
197  exit_code_ = 2;
198  }
199  catch (...)
200  {
201  ROS_ERROR_STREAM("Unknown exception thrown while recording bag, exiting.");
202  exit_code_ = 3;
203  }
204  });
205 
206  // Subscribe to the snapshot trigger
207  trigger_sub = nh.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::snapshotTrigger, this, _1));
208  }
209  else
210  {
211  record_thread = boost::thread([this]() {
212  try
213  {
214  this->doRecord();
215  }
216  catch (const rosbag::BagException& ex)
217  {
218  ROS_ERROR_STREAM(ex.what());
219  exit_code_ = 1;
220  }
221  catch (const std::exception& ex)
222  {
223  ROS_ERROR_STREAM(ex.what());
224  exit_code_ = 2;
225  }
226  catch (...)
227  {
228  ROS_ERROR_STREAM("Unknown exception thrown while recording bag, exiting.");
229  exit_code_ = 3;
230  }
231  });
232  }
233 
234 
235 
236  ros::Timer check_master_timer;
237  if (options_.record_all || options_.regex || (options_.node != std::string("")))
238  {
239  // check for master first
241  check_master_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&Recorder::doCheckMaster, this, _1, boost::ref(nh)));
242  }
243 
244  ros::AsyncSpinner s(10);
245  s.start();
246 
247  record_thread.join();
248  queue_condition_.notify_all();
249  delete queue_;
250 
251  return exit_code_;
252 }
253 
255  ROS_INFO("Subscribing to %s", topic.c_str());
256 
257  ros::NodeHandle nh;
258  shared_ptr<int> count(boost::make_shared<int>(options_.limit));
259  shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
260 
262  ops.topic = topic;
263  ops.queue_size = 100;
264  ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
265  ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
266  ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
268  boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));
270  *sub = nh.subscribe(ops);
271 
272  currently_recording_.insert(topic);
274 
275  return sub;
276 }
277 
278 bool Recorder::isSubscribed(string const& topic) const {
279  return currently_recording_.find(topic) != currently_recording_.end();
280 }
281 
282 bool Recorder::shouldSubscribeToTopic(std::string const& topic, bool from_node) {
283  // ignore already known topics
284  if (isSubscribed(topic)) {
285  return false;
286  }
287 
288  // subtract exclusion regex, if any
289  if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
290  return false;
291  }
292 
293  if(options_.record_all || from_node) {
294  return true;
295  }
296 
297  if (options_.regex) {
298  // Treat the topics as regular expressions
299  return std::any_of(
300  std::begin(options_.topics), std::end(options_.topics),
301  [&topic] (string const& regex_str){
302  boost::regex e(regex_str);
303  boost::smatch what;
304  return boost::regex_match(topic, what, e, boost::match_extra);
305  });
306  }
307 
308  return std::find(std::begin(options_.topics), std::end(options_.topics), topic)
309  != std::end(options_.topics);
310 }
311 
312 template<class T>
313 std::string Recorder::timeToStr(T ros_t)
314 {
315  (void)ros_t;
316  std::stringstream msg;
317  const boost::posix_time::ptime now=
318  boost::posix_time::second_clock::local_time();
319  boost::posix_time::time_facet *const f=
320  new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
321  msg.imbue(std::locale(msg.getloc(),f));
322  msg << now;
323  return msg.str();
324 }
325 
328  //void Recorder::doQueue(topic_tools::ShapeShifter::ConstPtr msg, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
329  Time rectime = Time::now();
330 
331  if (options_.verbose)
332  cout << "Received message on topic " << subscriber->getTopic() << endl;
333 
334  OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
335 
336  {
337  boost::mutex::scoped_lock lock(queue_mutex_);
338 
339  queue_->push(out);
340  queue_size_ += out.msg->size();
341 
342  // Check to see if buffer has been exceeded
344  OutgoingMessage drop = queue_->front();
345  queue_->pop();
346  queue_size_ -= drop.msg->size();
347 
348  if (!options_.snapshot) {
349  Time now = Time::now();
350  if (now > last_buffer_warn_ + ros::Duration(5.0)) {
351  ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message.");
352  last_buffer_warn_ = now;
353  }
354  }
355  }
356  }
357 
358  if (!options_.snapshot)
359  queue_condition_.notify_all();
360 
361  // If we are book-keeping count, decrement and possibly shutdown
362  if ((*count) > 0) {
363  (*count)--;
364  if ((*count) == 0) {
365  subscriber->shutdown();
366 
368 
369  if (num_subscribers_ == 0)
370  ros::shutdown();
371  }
372  }
373 }
374 
376  vector<string> parts;
377 
378  std::string prefix = options_.prefix;
379  size_t ind = prefix.rfind(".bag");
380 
381  if (ind != std::string::npos && ind == prefix.size() - 4)
382  {
383  prefix.erase(ind);
384  }
385 
386  if (prefix.length() > 0)
387  parts.push_back(prefix);
388  if (options_.append_date)
389  parts.push_back(timeToStr(ros::WallTime::now()));
390  if (options_.split)
391  parts.push_back(boost::lexical_cast<string>(split_count_));
392 
393  if (parts.size() == 0)
394  {
395  throw BagException("Bag filename is empty (neither of these was specified: prefix, append_date, split)");
396  }
397 
398  target_filename_ = parts[0];
399  for (unsigned int i = 1; i < parts.size(); i++)
400  target_filename_ += string("_") + parts[i];
401 
402  target_filename_ += string(".bag");
403  write_filename_ = target_filename_ + string(".active");
404 }
405 
407 void Recorder::snapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
408  (void)trigger;
409  updateFilenames();
410 
411  ROS_INFO("Triggered snapshot recording with name '%s'.", target_filename_.c_str());
412 
413  {
414  boost::mutex::scoped_lock lock(queue_mutex_);
415  queue_queue_.push(OutgoingQueue(target_filename_, queue_, Time::now()));
416  queue_ = new std::queue<OutgoingMessage>;
417  queue_size_ = 0;
418  }
419 
420  queue_condition_.notify_all();
421 }
422 
426 
427  updateFilenames();
428  try {
430  }
431  catch (const rosbag::BagException& e) {
432  ROS_ERROR("Error writing: %s", e.what());
433  exit_code_ = 1;
434  ros::shutdown();
435  }
436  ROS_INFO("Recording to '%s'.", target_filename_.c_str());
437 
438  if (options_.publish)
439  {
440  std_msgs::String msg;
441  msg.data = target_filename_.c_str();
443  }
444 }
445 
447  ROS_INFO("Closing '%s'.", target_filename_.c_str());
448  bag_.close();
449  rename(write_filename_.c_str(), target_filename_.c_str());
450 }
451 
453 {
454  if(options_.max_splits>0)
455  {
456  current_files_.push_back(target_filename_);
458  {
459  int err = unlink(current_files_.front().c_str());
460  if(err != 0)
461  {
462  ROS_ERROR("Unable to remove %s: %s", current_files_.front().c_str(), strerror(errno));
463  }
464  current_files_.pop_front();
465  }
466  }
467 }
468 
470 {
471  if (options_.max_size > 0)
472  {
473  if (bag_.getSize() > options_.max_size)
474  {
475  if (options_.split)
476  {
477  stopWriting();
478  split_count_++;
479  checkNumSplits();
480  startWriting();
481  } else {
482  ros::shutdown();
483  return true;
484  }
485  }
486  }
487  return false;
488 }
489 
491 {
493  {
495  {
496  if (options_.split)
497  {
498  while (start_time_ + options_.max_duration < t)
499  {
500  stopWriting();
501  split_count_++;
502  checkNumSplits();
504  startWriting();
505  }
506  } else {
507  ros::shutdown();
508  return true;
509  }
510  }
511  }
512  return false;
513 }
514 
515 
518  // Open bag file for writing
519  startWriting();
520 
521  // Schedule the disk space check
523 
524  try
525  {
526  checkDisk();
527  }
528  catch (const rosbag::BagException& ex)
529  {
530  ROS_ERROR_STREAM(ex.what());
531  exit_code_ = 1;
532  stopWriting();
533  return;
534  }
535 
537 
538  // Technically the queue_mutex_ should be locked while checking empty.
539  // Except it should only get checked if the node is not ok, and thus
540  // it shouldn't be in contention.
541  ros::NodeHandle nh;
542  while (nh.ok() || !queue_->empty()) {
543  boost::unique_lock<boost::mutex> lock(queue_mutex_);
544 
545  bool finished = false;
546  while (queue_->empty()) {
547  if (!nh.ok()) {
548  lock.release()->unlock();
549  finished = true;
550  break;
551  }
552  boost::xtime xt;
553 #if BOOST_VERSION >= 105000
554  boost::xtime_get(&xt, boost::TIME_UTC_);
555 #else
556  boost::xtime_get(&xt, boost::TIME_UTC);
557 #endif
558  xt.nsec += 250000000;
559  queue_condition_.timed_wait(lock, xt);
561  {
562  finished = true;
563  break;
564  }
565  }
566  if (finished)
567  break;
568 
569  OutgoingMessage out = queue_->front();
570  queue_->pop();
571  queue_size_ -= out.msg->size();
572 
573  lock.release()->unlock();
574 
575  if (checkSize())
576  break;
577 
578  if (checkDuration(out.time))
579  break;
580 
581  try
582  {
583  if (scheduledCheckDisk() && checkLogging())
584  bag_.write(out.topic, out.time, *out.msg, out.connection_header);
585  }
586  catch (const rosbag::BagException& ex)
587  {
588  ROS_ERROR_STREAM(ex.what());
589  exit_code_ = 1;
590  break;
591  }
592  }
593 
594  stopWriting();
595 }
596 
598  ros::NodeHandle nh;
599 
600  while (nh.ok() || !queue_queue_.empty()) {
601  boost::unique_lock<boost::mutex> lock(queue_mutex_);
602  while (queue_queue_.empty()) {
603  if (!nh.ok())
604  return;
605  queue_condition_.wait(lock);
606  }
607 
608  OutgoingQueue out_queue = queue_queue_.front();
609  queue_queue_.pop();
610 
611  lock.release()->unlock();
612 
613  string target_filename = out_queue.filename;
614  string write_filename = target_filename + string(".active");
615 
616  try {
617  bag_.open(write_filename, bagmode::Write);
618  }
619  catch (const rosbag::BagException& ex) {
620  ROS_ERROR("Error writing: %s", ex.what());
621  return;
622  }
623 
624  while (!out_queue.queue->empty()) {
625  OutgoingMessage out = out_queue.queue->front();
626  out_queue.queue->pop();
627 
628  bag_.write(out.topic, out.time, *out.msg);
629  }
630 
631  stopWriting();
632  }
633 }
634 
636  (void)e;
637  (void)node_handle;
639  if (ros::master::getTopics(topics)) {
640  for (ros::master::TopicInfo const& t : topics) {
641  if (shouldSubscribeToTopic(t.name))
642  subscribe(t.name);
643  }
644  }
645 
646  if (options_.node != std::string(""))
647  {
648 
650  req[0] = ros::this_node::getName();
651  req[1] = options_.node;
652  XmlRpc::XmlRpcValue resp;
653  XmlRpc::XmlRpcValue payload;
654 
655  if (ros::master::execute("lookupNode", req, resp, payload, true))
656  {
657  std::string peer_host;
658  uint32_t peer_port;
659 
660  if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
661  {
662  ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
663  } else {
664 
665  XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
666  XmlRpc::XmlRpcValue req2;
667  XmlRpc::XmlRpcValue resp2;
668  req2[0] = ros::this_node::getName();
669  c.execute("getSubscriptions", req2, resp2);
670 
671  if (!c.isFault() && resp2.valid() && resp2.size() > 0 && static_cast<int>(resp2[0]) == 1)
672  {
673  for(int i = 0; i < resp2[2].size(); i++)
674  {
675  if (shouldSubscribeToTopic(resp2[2][i][0], true))
676  subscribe(resp2[2][i][0]);
677  }
678  } else {
679  ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
680  }
681  }
682  }
683  }
684 }
685 
687  ros::NodeHandle nh;
688  ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
689  pub.publish(std_msgs::Empty());
690 
691  ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
692  ros::spin();
693 }
694 
696  boost::mutex::scoped_lock lock(check_disk_mutex_);
697 
699  return true;
700 
702  return checkDisk();
703 }
704 
706 #if BOOST_FILESYSTEM_VERSION < 3
707  struct statvfs fiData;
708  if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0)
709  {
710  ROS_WARN("Failed to check filesystem stats.");
711  return true;
712  }
713  unsigned long long free_space = 0;
714  free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
715  if (free_space < options_.min_space)
716  {
717  ROS_ERROR("Less than %s of space free on disk with '%s'. Disabling recording.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
718  writing_enabled_ = false;
719  return false;
720  }
721  else if (free_space < 5 * options_.min_space)
722  {
723  ROS_WARN("Less than 5 x %s of space free on disk with '%s'.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
724  }
725  else
726  {
727  writing_enabled_ = true;
728  }
729 #else
730  boost::filesystem::path p(boost::filesystem::system_complete(bag_.getFileName().c_str()));
731  p = p.parent_path();
732  boost::filesystem::space_info info;
733  try
734  {
735  info = boost::filesystem::space(p);
736  }
737  catch (const boost::filesystem::filesystem_error& e)
738  {
739  ROS_WARN("Failed to check filesystem stats [%s].", e.what());
740  writing_enabled_ = false;
741  return false;
742  }
743  if ( info.available < options_.min_space)
744  {
745  writing_enabled_ = false;
746  throw BagException("Less than " + options_.min_space_str + " of space free on disk with " + bag_.getFileName() + ". Disabling recording.");
747  }
748  else if (info.available < 5 * options_.min_space)
749  {
750  ROS_WARN("Less than 5 x %s of space free on disk with '%s'.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
751  writing_enabled_ = true;
752  }
753  else
754  {
755  writing_enabled_ = true;
756  }
757 #endif
758  return true;
759 }
760 
762  if (writing_enabled_)
763  return true;
764 
766  if (now >= warn_next_) {
767  warn_next_ = now + ros::WallDuration().fromSec(5.0);
768  ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk.");
769  }
770  return false;
771 }
772 
773 } // namespace rosbag
void updateFilenames()
Definition: recorder.cpp:375
void checkNumSplits()
Definition: recorder.cpp:452
ros::TransportHints transport_hints
Definition: recorder.h:115
void doRecordSnapshotter()
Definition: recorder.cpp:597
ros::Time last_buffer_warn_
Definition: recorder.h:183
int exit_code_
eventual exit code
Definition: recorder.h:171
void open(std::string const &filename, uint32_t mode=bagmode::Read)
f
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
uint64_t queue_size_
queue size
Definition: recorder.h:176
ros::WallTime check_disk_next_
Definition: recorder.h:189
boost::shared_ptr< M > getMessage() const
boost::mutex queue_mutex_
mutex for queue
Definition: recorder.h:174
void setChunkThreshold(uint32_t chunk_threshold)
std::string target_filename_
Definition: recorder.h:164
XmlRpcServer s
bool valid() const
boost::regex exclude_regex
Definition: recorder.h:104
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
OutgoingQueue(std::string const &_filename, std::queue< OutgoingMessage > *_queue, ros::Time _time)
Definition: recorder.cpp:83
void startWriting()
Definition: recorder.cpp:423
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
bool checkLogging()
Definition: recorder.cpp:761
topic_tools::ShapeShifter::ConstPtr msg
Definition: recorder.h:73
std::vector< TopicInfo > V_TopicInfo
bool isSubscribed(std::string const &topic) const
Definition: recorder.cpp:278
#define ROS_WARN(...)
void close()
std::string write_filename_
Definition: recorder.h:165
ros::Duration max_duration
Definition: recorder.h:111
uint64_t getSize() const
std::set< std::string > currently_recording_
set of currently recording topics
Definition: recorder.h:168
OutgoingMessage(std::string const &_topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
Definition: recorder.cpp:76
void doCheckMaster(ros::TimerEvent const &e, ros::NodeHandle &node_handle)
Definition: recorder.cpp:635
static std::string timeToStr(T ros_t)
Definition: recorder.cpp:313
std::queue< OutgoingMessage > * queue
Definition: recorder.h:84
ros::WallTime warn_next_
Definition: recorder.h:190
void publish(const boost::shared_ptr< M > &message) const
SubscriptionCallbackHelperPtr helper
std::string prefix
Definition: recorder.h:102
void stopWriting()
Definition: recorder.cpp:446
WallDuration & fromSec(double t)
bool scheduledCheckDisk()
Definition: recorder.cpp:695
void doQueue(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, std::string const &topic, boost::shared_ptr< ros::Subscriber > subscriber, boost::shared_ptr< int > count)
Callback to be invoked to save messages into a queue.
Definition: recorder.cpp:327
#define ROS_INFO(...)
void setCompression(CompressionType compression)
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
RecorderOptions options_
Definition: recorder.h:160
std::list< std::string > current_files_
Definition: recorder.h:166
boost::condition_variable_any queue_condition_
conditional variable for queue
Definition: recorder.h:173
boost::shared_ptr< ros::Subscriber > subscribe(std::string const &topic)
Definition: recorder.cpp:254
std::string filename
Definition: recorder.h:83
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
ROSCPP_DECL void spin()
TransportHints transport_hints
unsigned long long min_space
Definition: recorder.h:113
boost::mutex check_disk_mutex_
Definition: recorder.h:188
Recorder(RecorderOptions const &options)
Definition: recorder.cpp:119
bool writing_enabled_
Definition: recorder.h:187
bool checkDuration(const ros::Time &)
Definition: recorder.cpp:490
static WallTime now()
void snapshotTrigger(std_msgs::Empty::ConstPtr trigger)
Callback to be invoked to actually do the recording.
Definition: recorder.cpp:407
uint64_t split_count_
split count
Definition: recorder.h:179
std::queue< OutgoingQueue > queue_queue_
queue of queues to be used by the snapshot recorders
Definition: recorder.h:181
static Time now()
std::string getFileName() const
ros::Publisher pub_begin_write
Definition: recorder.h:192
ros::Time start_time_
Definition: recorder.h:185
ROSCPP_DECL void shutdown()
void doRecord()
Thread that actually does writing to file.
Definition: recorder.cpp:517
int num_subscribers_
used for book-keeping of our number of subscribers
Definition: recorder.h:169
std::queue< OutgoingMessage > * queue_
queue for storing
Definition: recorder.h:175
CompressionType compression
Definition: recorder.h:101
#define ROS_ERROR_STREAM(args)
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
Uncompressed
bool ok() const
static bool waitForValid()
std::vector< std::string > topics
Definition: recorder.h:117
#define ROS_ERROR(...)
std::string min_space_str
Definition: recorder.h:114
bool shouldSubscribeToTopic(std::string const &topic, bool from_node=false)
Definition: recorder.cpp:282
void write(std::string const &topic, ros::MessageEvent< T > const &event)
std::string topic
Definition: recorder.h:72
boost::shared_ptr< ros::M_string > connection_header
Definition: recorder.h:74


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:34:21