recorder.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2008, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Willow Garage, Inc. nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 
35 /*
36  * This source code is adapted from https://github.com/ros/ros_comm/tree/melodic-devel/tools/rosbag
37  * from commit https://github.com/ros/ros_comm/commit/550608510089600f95f31c10cdbdc1e41087d9ab.
38  * This copy was needed in order to resolve the issue in https://github.com/aws-robotics/rosbag-uploader-ros1/issues/75
39  */
40 
41 
42 
44 
45 #include <sys/stat.h>
46 #include <boost/filesystem.hpp>
47 // Boost filesystem v3 is default in 1.46.0 and above
48 // Fallback to original posix code (*nix only) if this is not true
49 #if BOOST_FILESYSTEM_VERSION < 3
50  #include <sys/statvfs.h>
51 #endif
52 #include <ctime>
53 
54 #include <queue>
55 #include <sstream>
56 #include <string>
57 
58 #include <boost/lexical_cast.hpp>
59 #include <boost/regex.hpp>
60 #include <boost/thread.hpp>
61 #include <boost/thread/xtime.hpp>
62 #include <boost/date_time/local_time/local_time.hpp>
63 
64 #include <ros/ros.h>
66 
67 #include "ros/network.h"
68 #include "ros/xmlrpc_manager.h"
69 #include "xmlrpcpp/XmlRpc.h"
70 
71 using std::cout;
72 using std::endl;
73 using std::string;
74 using std::vector;
75 using boost::shared_ptr;
76 using ros::Time;
77 
78 namespace Aws
79 {
80 namespace Rosbag
81 {
82 namespace Utils
83 {
84 // OutgoingMessage
85 
87  topic(std::move(_topic)), msg(std::move(_msg)), connection_header(std::move(_connection_header)), time(_time)
88 {
89 }
90 
91 // OutgoingQueue
92 
93 OutgoingQueue::OutgoingQueue(string _filename, std::shared_ptr<std::queue<OutgoingMessage>> _queue, Time _time) :
94  filename(std::move(_filename)), queue(std::move(_queue)), time(_time)
95 {
96 }
97 
98 
99 // Recorder
100 
101 Recorder::Recorder(RecorderOptions options) :
102  options_(std::move(options)),
103  exit_code_(0),
104  queue_size_(0),
105  split_count_(0),
106  writing_enabled_(true)
107 {
108 }
109 
111  if (options_.trigger) {
112  DoTrigger();
113  return 0;
114  }
115 
116  if (options_.topics.empty()) {
117  // Make sure limit is not specified with automatic topic subscription
118  if (options_.limit > 0) {
119  fprintf(stderr, "Specifing a count is not valid with automatic topic subscription.\n");
120  return 1;
121  }
122 
123  // Make sure topics are specified
124  if (!options_.record_all && (options_.node == std::string(""))) {
125  fprintf(stderr, "No topics specified.\n");
126  return 1;
127  }
128  }
129 
130  if (!nh_.ok())
131  {
132  return 0;
133  }
134 
135  if (options_.publish)
136  {
137  pub_begin_write_ = nh_.advertise<std_msgs::String>("begin_write", 1, true);
138  }
139 
141  queue_ = std::make_shared<std::queue<OutgoingMessage>>();
142  // Subscribe to each topic
143  if (!options_.regex) {
144  for (string const& topic : options_.topics) {
146  currently_recording_.insert(topic);
147  subscribers_.push_back(sub);
148  }
149  }
150 
152  {
153  ROS_WARN("/use_sim_time set to true and no clock published. Still waiting for valid time...");
154  }
156 
158 
159  // Don't bother doing anything if we never got a valid time
160  if (!nh_.ok())
161  {
162  return 0;
163  }
164 
165  ros::Subscriber trigger_sub;
166 
167  // Spin up a thread for writing to the file
168  boost::thread record_thread;
169  if (options_.snapshot)
170  {
171  record_thread = boost::thread(boost::bind(&Recorder::DoRecordSnapshotter, this));
172 
173  // Subscribe to the snapshot trigger
174  trigger_sub = nh_.subscribe<std_msgs::Empty>("snapshot_trigger", 100, boost::bind(&Recorder::SnapshotTrigger, this, _1));
175  }
176  else
177  {
178  record_thread = boost::thread(boost::bind(&Recorder::DoRecord, this));
179  }
180 
181  ros::Timer check_master_timer;
182  if (options_.record_all || options_.regex || (options_.node != std::string("")))
183  {
184  // check for master first
186  check_master_timer = nh_.createTimer(ros::Duration(1.0), boost::bind(&Recorder::DoCheckMaster, this, _1, boost::ref(nh_)));
187  }
188 
189  ros::AsyncSpinner s(10);
190  s.start();
191 
192  record_thread.join();
193  queue_condition_.notify_all();
194 
195  check_master_timer.stop();
196  subscribers_.clear();
197  currently_recording_.clear();
198  queue_.reset();
199 
200  return exit_code_;
201 }
202 
204  ROS_INFO("Subscribing to %s", topic.c_str());
205 
206  shared_ptr<int> count(boost::make_shared<int>(options_.limit));
207  shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
208 
210  ops.topic = topic;
211  ops.queue_size = 100;
212  ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
213  ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
214  ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
216  boost::bind(&Recorder::DoQueue, this, _1, topic, sub.get(), count));
218  *sub = nh.subscribe(ops);
219 
220  return sub;
221 }
222 
223 bool Recorder::IsSubscribed(string const& topic) const {
224  return currently_recording_.find(topic) != currently_recording_.end();
225 }
226 
227 bool Recorder::ShouldSubscribeToTopic(std::string const& topic, bool from_node) {
228  // ignore already known topics
229  if (IsSubscribed(topic)) {
230  return false;
231  }
232 
233  // subtract exclusion regex, if any
234  if(options_.do_exclude && boost::regex_match(topic, options_.exclude_regex)) {
235  return false;
236  }
237 
238  if(options_.record_all || from_node) {
239  return true;
240  }
241 
242  if (options_.regex) {
243  // Treat the topics as regular expressions
244  return std::any_of(
245  std::begin(options_.topics), std::end(options_.topics),
246  [&topic] (string const& regex_str){
247  boost::regex e(regex_str);
248  boost::smatch what;
249  return boost::regex_match(topic, what, e, boost::match_extra);
250  });
251  }
252 
253  return std::find(std::begin(options_.topics), std::end(options_.topics), topic)
254  != std::end(options_.topics);
255 }
256 
257 template<class T>
258 std::string Recorder::TimeToStr(T ros_t)
259 {
260  (void)ros_t;
261  std::stringstream msg;
262  const boost::posix_time::ptime now=
263  boost::posix_time::second_clock::local_time();
264  boost::posix_time::time_facet *const f=
265  new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
266  msg.imbue(std::locale(msg.getloc(),f));
267  msg << now;
268  return msg.str();
269 }
270 
273  string const& topic,
274  ros::Subscriber * subscriber,
275  const shared_ptr<int> & count) {
276  //void Recorder::DoQueue(topic_tools::ShapeShifter::ConstPtr msg, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count) {
277  Time rectime = Time::now();
278 
279  if (options_.verbose)
280  {
281  cout << "Received message on topic " << subscriber->getTopic() << endl;
282  }
283 
284  OutgoingMessage out(topic, msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), rectime);
285 
286  {
287  boost::mutex::scoped_lock lock(queue_mutex_);
288 
289  queue_->push(out);
290  queue_size_ += out.msg->size();
291 
292  // Check to see if buffer has been exceeded
294  OutgoingMessage drop = queue_->front();
295  queue_->pop();
296  queue_size_ -= drop.msg->size();
297 
298  if (!options_.snapshot) {
299  Time now = Time::now();
300  if (now > last_buffer_warn_ + ros::Duration(5.0)) {
301  ROS_WARN("rosbag record buffer exceeded. Dropping oldest queued message.");
302  last_buffer_warn_ = now;
303  }
304  }
305  }
306  }
307 
308  if (!options_.snapshot)
309  {
310  queue_condition_.notify_all();
311  }
312 
313  // If we are book-keeping count, decrement and possibly shutdown
314  if ((*count) > 0) {
315  (*count)--;
316  if ((*count) == 0) {
317  subscriber->shutdown();
318  }
319  }
320 }
321 
323  vector<string> parts;
324 
325  std::string prefix = options_.prefix;
326  size_t ind = prefix.rfind(".bag");
327 
328  if (ind != std::string::npos && ind == prefix.size() - 4)
329  {
330  prefix.erase(ind);
331  }
332 
333  if (prefix.length() > 0)
334  {
335  parts.push_back(prefix);
336  }
337  if (options_.append_date)
338  {
339  parts.push_back(TimeToStr(ros::WallTime::now()));
340  }
341  if (options_.split)
342  {
343  parts.push_back(boost::lexical_cast<string>(split_count_));
344  }
345  if (parts.empty())
346  {
347  throw rosbag::BagException("Bag filename is empty (neither of these was specified: prefix, append_date, split)");
348  }
349 
350  target_filename_ = parts[0];
351  for (unsigned int i = 1; i < parts.size(); i++)
352  {
353  target_filename_ += string("_") + parts[i];
354  }
355 
356  target_filename_ += string(".bag");
357  write_filename_ = target_filename_ + string(".active");
358 }
359 
361 void Recorder::SnapshotTrigger(std_msgs::Empty::ConstPtr trigger) {
362  (void)trigger;
363  UpdateFilenames();
364 
365  ROS_INFO("Triggered snapshot recording with name '%s'.", target_filename_.c_str());
366 
367  {
368  boost::mutex::scoped_lock lock(queue_mutex_);
369  queue_queue_.push(OutgoingQueue(target_filename_, std::move(queue_), Time::now()));
370  queue_ = std::make_shared<std::queue<OutgoingMessage>>();
371  queue_size_ = 0;
372  }
373 
374  queue_condition_.notify_all();
375 }
376 
380 
381  UpdateFilenames();
382  try {
384  }
385  catch (const rosbag::BagException& e) {
386  ROS_ERROR("Error writing: %s", e.what());
387  exit_code_ = 1;
388  return;
389  }
390  ROS_INFO("Recording to '%s'.", target_filename_.c_str());
391 
392  if (options_.publish)
393  {
394  std_msgs::String msg;
395  msg.data = target_filename_;
397  }
398 }
399 
401  ROS_INFO("Closing '%s'.", target_filename_.c_str());
402  bag_.close();
403  (void) rename(write_filename_.c_str(), target_filename_.c_str());
404 }
405 
407 {
408  if(options_.max_splits>0)
409  {
410  current_files_.push_back(target_filename_);
412  {
413  int err = unlink(current_files_.front().c_str());
414  if(err != 0)
415  {
416  ROS_ERROR("Unable to remove %s: %s", current_files_.front().c_str(), strerror(errno));
417  }
418  current_files_.pop_front();
419  }
420  }
421 }
422 
424 {
425  if (options_.max_size > 0)
426  {
427  if (bag_.getSize() > options_.max_size)
428  {
429  if (options_.split)
430  {
431  StopWriting();
432  split_count_++;
433  CheckNumSplits();
434  StartWriting();
435  } else {
436  return true;
437  }
438  }
439  }
440  return false;
441 }
442 
444 {
446  {
448  {
449  if (options_.split)
450  {
451  while (start_time_ + options_.max_duration < t)
452  {
453  StopWriting();
454  split_count_++;
455  CheckNumSplits();
457  StartWriting();
458  }
459  } else {
460  return true;
461  }
462  }
463  }
464  return false;
465 }
466 
467 
470  // Open bag file for writing
471  StartWriting();
472 
473  // Schedule the disk space check
475 
476  try
477  {
478  CheckDisk();
479  }
480  catch (const rosbag::BagException& ex)
481  {
482  ROS_ERROR_STREAM(ex.what());
483  exit_code_ = 1;
484  StopWriting();
485  return;
486  }
487 
489 
490  // Technically the queue_mutex_ should be locked while checking empty.
491  // Except it should only get checked if the node is not ok, and thus
492  // it shouldn't be in contention.
493  ros::NodeHandle nh;
494  while (nh.ok() || !queue_->empty()) {
495  boost::unique_lock<boost::mutex> lock(queue_mutex_);
496 
497  bool finished = false;
498  while (queue_->empty()) {
499  if (!nh.ok()) {
500  lock.release()->unlock();
501  finished = true;
502  break;
503  }
504  boost::xtime xt {};
505 #if BOOST_VERSION >= 105000
506  boost::xtime_get(&xt, boost::TIME_UTC_);
507 #else
508  boost::xtime_get(&xt, boost::TIME_UTC);
509 #endif
510  xt.nsec += 250000000;
511  queue_condition_.timed_wait(lock, xt);
513  {
514  finished = true;
515  break;
516  }
517  }
518  if (finished)
519  {
520  break;
521  }
522 
523  OutgoingMessage out = queue_->front();
524  queue_->pop();
525  queue_size_ -= out.msg->size();
526 
527  lock.release()->unlock();
528 
529  if (CheckSize())
530  {
531  break;
532  }
533 
534  if (CheckDuration(out.time))
535  {
536  break;
537  }
538 
539  try
540  {
541  if (ScheduledCheckDisk() && CheckLogging())
542  {
543  bag_.write(out.topic, out.time, *out.msg, out.connection_header);
544  }
545  }
546  catch (const rosbag::BagException& ex)
547  {
548  ROS_ERROR_STREAM(ex.what());
549  exit_code_ = 1;
550  break;
551  }
552  }
553 
554  StopWriting();
555 }
556 
558  ros::NodeHandle nh;
559 
560  while (nh.ok() || !queue_queue_.empty()) {
561  boost::unique_lock<boost::mutex> lock(queue_mutex_);
562  while (queue_queue_.empty()) {
563  if (!nh.ok())
564  {
565  return;
566  }
567  queue_condition_.wait(lock);
568  }
569 
570  OutgoingQueue out_queue = queue_queue_.front();
571  queue_queue_.pop();
572 
573  lock.release()->unlock();
574 
575  string target_filename = out_queue.filename;
576  string write_filename = target_filename + string(".active");
577 
578  try {
579  bag_.open(write_filename, rosbag::bagmode::Write);
580  }
581  catch (const rosbag::BagException& ex) {
582  ROS_ERROR("Error writing: %s", ex.what());
583  return;
584  }
585 
586  while (!out_queue.queue->empty()) {
587  OutgoingMessage out = out_queue.queue->front();
588  out_queue.queue->pop();
589 
590  bag_.write(out.topic, out.time, *out.msg);
591  }
592 
593  StopWriting();
594  }
595 }
596 
598  (void) e;
600  if (ros::master::getTopics(topics)) {
601  for (ros::master::TopicInfo const& t : topics) {
602  if (ShouldSubscribeToTopic(t.name)) {
603  shared_ptr<ros::Subscriber> sub = Subscribe(node_handle, t.name);
604  currently_recording_.insert(t.name);
605  subscribers_.push_back(sub);
606  }
607  }
608  }
609 
610  if (options_.node != std::string(""))
611  {
612 
614  req[0] = ros::this_node::getName();
615  req[1] = options_.node;
616  XmlRpc::XmlRpcValue resp;
617  XmlRpc::XmlRpcValue payload;
618 
619  if (ros::master::execute("lookupNode", req, resp, payload, true))
620  {
621  std::string peer_host;
622  uint32_t peer_port;
623 
624  if (!ros::network::splitURI(static_cast<std::string>(resp[2]), peer_host, peer_port))
625  {
626  ROS_ERROR("Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
627  } else {
628 
629  XmlRpc::XmlRpcClient c(peer_host.c_str(), peer_port, "/");
630  XmlRpc::XmlRpcValue req2;
631  XmlRpc::XmlRpcValue resp2;
632  req2[0] = ros::this_node::getName();
633  c.execute("getSubscriptions", req2, resp2);
634 
635  if (!c.isFault() && resp2.valid() && resp2.size() > 0 && static_cast<int>(resp2[0]) == 1)
636  {
637  for(int i = 0; i < resp2[2].size(); i++)
638  {
639  if (ShouldSubscribeToTopic(resp2[2][i][0], true))
640  {
641  shared_ptr<ros::Subscriber> sub = Subscribe(node_handle, resp2[2][i][0]);
642  currently_recording_.insert(resp2[2][i][0]);
643  subscribers_.push_back(sub);
644  }
645  }
646  } else {
647  ROS_ERROR("Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
648  }
649  }
650  }
651  }
652 }
653 
655  ros::NodeHandle nh;
656  ros::Publisher pub = nh.advertise<std_msgs::Empty>("snapshot_trigger", 1, true);
657  pub.publish(std_msgs::Empty());
658 
659  ros::Timer terminate_timer = nh.createTimer(ros::Duration(1.0), boost::bind(&ros::shutdown));
660  ros::spin();
661 }
662 
664  boost::mutex::scoped_lock lock(check_disk_mutex_);
665 
667  {
668  return true;
669  }
670 
672  return CheckDisk();
673 }
674 
676 #if BOOST_FILESYSTEM_VERSION < 3
677  struct statvfs fiData;
678  if ((statvfs(bag_.getFileName().c_str(), &fiData)) < 0)
679  {
680  ROS_WARN("Failed to check filesystem stats.");
681  return true;
682  }
683  unsigned long long free_space = 0;
684  free_space = (unsigned long long) (fiData.f_bsize) * (unsigned long long) (fiData.f_bavail);
685  if (free_space < options_.min_space)
686  {
687  ROS_ERROR("Less than %s of space free on disk with '%s'. Disabling recording.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
688  writing_enabled_ = false;
689  return false;
690  }
691  else if (free_space < 5 * options_.min_space)
692  {
693  ROS_WARN("Less than 5 x %s of space free on disk with '%s'.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
694  }
695  else
696  {
697  writing_enabled_ = true;
698  }
699 #else
700  boost::filesystem::path p(boost::filesystem::system_complete(bag_.getFileName().c_str()));
701  p = p.parent_path();
702  boost::filesystem::space_info info {};
703  try
704  {
705  info = boost::filesystem::space(p);
706  }
707  catch (const boost::filesystem::filesystem_error& e)
708  {
709  ROS_WARN("Failed to check filesystem stats [%s].", e.what());
710  writing_enabled_ = false;
711  return false;
712  }
713  if ( info.available < options_.min_space)
714  {
715  writing_enabled_ = false;
716  throw rosbag::BagException("Less than " + options_.min_space_str + " of space free on disk with " + bag_.getFileName() + ". Disabling recording.");
717  }
718  else if (info.available < 5 * options_.min_space)
719  {
720  ROS_WARN("Less than 5 x %s of space free on disk with '%s'.", options_.min_space_str.c_str(), bag_.getFileName().c_str());
721  writing_enabled_ = true;
722  }
723  else
724  {
725  writing_enabled_ = true;
726  }
727 #endif
728  return true;
729 }
730 
732  if (writing_enabled_)
733  {
734  return true;
735  }
736 
738  if (now >= warn_next_) {
739  warn_next_ = now + ros::WallDuration().fromSec(5.0);
740  ROS_WARN("Not logging message because logging disabled. Most likely cause is a full disk.");
741  }
742  return false;
743 }
744 
745 } // namespace Utils
746 } // namespace Rosbag
747 } // namespace Aws
ros::WallTime check_disk_next_
Definition: recorder.h:197
filename
uint64_t split_count_
split count
Definition: recorder.h:187
topic_tools::ShapeShifter::ConstPtr msg
Definition: recorder.h:78
void publish(const boost::shared_ptr< M > &message) const
void open(std::string const &filename, uint32_t mode=bagmode::Read)
f
void DoRecord()
Thread that actually does writing to file.
Definition: recorder.cpp:469
std::list< std::string > current_files_
Definition: recorder.h:175
std::string getFileName() const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
std::shared_ptr< std::queue< OutgoingMessage > > queue
Definition: recorder.h:89
int size() const
RecorderOptions options_
Definition: recorder.h:168
void setChunkThreshold(uint32_t chunk_threshold)
XmlRpcServer s
bool valid() const
void stop()
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
int exit_code_
eventual exit code
Definition: recorder.h:180
std::string target_filename_
Definition: recorder.h:173
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
std::vector< TopicInfo > V_TopicInfo
#define ROS_WARN(...)
stderr
void close()
boost::shared_ptr< ros::Subscriber > Subscribe(ros::NodeHandle &nh, std::string const &topic)
Definition: recorder.cpp:203
boost::mutex queue_mutex_
mutex for queue
Definition: recorder.h:183
bool IsSubscribed(std::string const &topic) const
Definition: recorder.cpp:223
std::set< std::string > currently_recording_
set of currenly recording topics
Definition: recorder.h:177
boost::condition_variable_any queue_condition_
conditional variable for queue
Definition: recorder.h:182
SubscriptionCallbackHelperPtr helper
WallDuration & fromSec(double t)
uint64_t queue_size_
queue size
Definition: recorder.h:185
#define ROS_INFO(...)
void setCompression(CompressionType compression)
static std::string TimeToStr(T ros_t)
Definition: recorder.cpp:258
std::string getTopic() const
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
rosbag::CompressionType compression
Definition: recorder.h:106
bool ShouldSubscribeToTopic(std::string const &topic, bool from_node=false)
Definition: recorder.cpp:227
void DoCheckMaster(ros::TimerEvent const &e, ros::NodeHandle &node_handle)
Definition: recorder.cpp:597
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
ROSCPP_DECL void spin()
std::queue< OutgoingQueue > queue_queue_
queue of queues to be used by the snapshot recorders
Definition: recorder.h:189
TransportHints transport_hints
std::vector< boost::shared_ptr< ros::Subscriber > > subscribers_
Definition: recorder.h:178
ros::Publisher pub_begin_write_
Definition: recorder.h:200
std::shared_ptr< std::queue< OutgoingMessage > > queue_
queue for storing
Definition: recorder.h:184
static WallTime now()
boost::mutex check_disk_mutex_
Definition: recorder.h:196
std::vector< std::string > topics
Definition: recorder.h:122
ros::WallTime warn_next_
Definition: recorder.h:198
static Time now()
ROSCPP_DECL void shutdown()
uint64_t getSize() const
bool ok() const
bool CheckDuration(const ros::Time &t)
Definition: recorder.cpp:443
#define ROS_ERROR_STREAM(args)
boost::shared_ptr< ros::M_string > connection_header
Definition: recorder.h:79
static bool waitForValid()
#define ROS_ERROR(...)
void write(std::string const &topic, ros::MessageEvent< T > const &event)
void SnapshotTrigger(std_msgs::Empty::ConstPtr trigger)
Callback to be invoked to actually do the recording.
Definition: recorder.cpp:361
void DoQueue(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, std::string const &topic, ros::Subscriber *subscriber, const boost::shared_ptr< int > &count)
Callback to be invoked to save messages into a queue.
Definition: recorder.cpp:272
boost::shared_ptr< M > getMessage() const
ros::TransportHints transport_hints
Definition: recorder.h:120


rosbag_cloud_recorders
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:27