snapshotter.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2018, Open Source Robotics Foundation, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Open Source Robotics Foundation, Inc. nor the
18 * names of its contributors may be used to endorse or promote products
19 * derived from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 #include <queue>
35 #include <string>
36 #include <time.h>
37 #include <vector>
38 #include <boost/filesystem.hpp>
39 #include <boost/scope_exit.hpp>
40 #include <boost/thread/xtime.hpp>
41 #include <boost/thread/shared_mutex.hpp>
42 #include <boost/date_time/local_time/local_time.hpp>
43 #include <ros/ros.h>
44 #include <ros/assert.h>
46 #include <rosbag_snapshot_msgs/SnapshotStatus.h>
48 #include <memory>
49 #include <utility>
50 
51 using std::string;
52 using boost::shared_ptr;
53 using ros::Time;
54 
55 namespace rosbag_snapshot
56 {
61 
63  : duration_limit_(duration_limit), memory_limit_(memory_limit)
64 {
65 }
66 
67 SnapshotterOptions::SnapshotterOptions(ros::Duration default_duration_limit, int32_t default_memory_limit,
68  ros::Duration status_period)
69  : default_duration_limit_(default_duration_limit)
70  , default_memory_limit_(default_memory_limit)
71  , status_period_(status_period)
72  , topics_()
73 {
74 }
75 
76 bool SnapshotterOptions::addTopic(std::string const& topic, ros::Duration duration, int32_t memory)
77 {
78  SnapshotterTopicOptions ops(duration, memory);
79  std::pair<topics_t::iterator, bool> ret;
80  ret = topics_.insert(topics_t::value_type(topic, ops));
81  return ret.second;
82 }
83 
85 {
86 }
87 
89  boost::shared_ptr<ros::M_string> _connection_header, Time _time)
90  : msg(_msg), connection_header(_connection_header), time(_time)
91 {
92 }
93 
94 MessageQueue::MessageQueue(SnapshotterTopicOptions const& options) : options_(options), size_(0)
95 {
96 }
97 
99 {
100  sub_ = sub;
101 }
102 
103 void MessageQueue::fillStatus(rosgraph_msgs::TopicStatistics& status)
104 {
105  boost::mutex::scoped_lock l(lock);
106  if (!queue_.size())
107  return;
108  status.traffic = size_;
109  status.delivered_msgs = queue_.size();
110  status.window_start = queue_.front().time;
111  status.window_stop = queue_.back().time;
112 }
113 
115 {
116  boost::mutex::scoped_lock l(lock);
117  _clear();
118 }
119 
121 {
122  queue_.clear();
123  size_ = 0;
124 }
125 
127 {
128  // No duration if 0 or 1 messages
129  if (queue_.size() <= 1)
130  return ros::Duration();
131  return queue_.back().time - queue_.front().time;
132 }
133 
134 bool MessageQueue::preparePush(int32_t size, ros::Time const& time)
135 {
136  // If new message is older than back of queue, time has gone backwards and buffer must be cleared
137  if (!queue_.empty() && time < queue_.back().time)
138  {
139  ROS_WARN("Time has gone backwards. Clearing buffer for this topic.");
140  _clear();
141  }
142 
143  // The only case where message cannot be addded is if size is greater than limit
144  if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT && size > options_.memory_limit_)
145  return false;
146 
147  // If memory limit is enforced, remove elements from front of queue until limit would be met once message is added
148  if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT)
149  while (queue_.size() != 0 && size_ + size > options_.memory_limit_)
150  _pop();
151 
152  // If duration limit is encforced, remove elements from front of queue until duration limit would be met once message
153  // is added
154  if (options_.duration_limit_ > SnapshotterTopicOptions::NO_DURATION_LIMIT && queue_.size() != 0)
155  {
156  ros::Duration dt = time - queue_.front().time;
157  while (dt > options_.duration_limit_)
158  {
159  _pop();
160  if (queue_.empty())
161  break;
162  dt = time - queue_.front().time;
163  }
164  }
165  return true;
166 }
168 {
169  boost::mutex::scoped_try_lock l(lock);
170  if (!l.owns_lock())
171  {
172  ROS_ERROR("Failed to lock. Time %f", _out.time.toSec());
173  return;
174  }
175  _push(_out);
176 }
177 
179 {
180  boost::mutex::scoped_lock l(lock);
181  return _pop();
182 }
184 {
185  int32_t size = _out.msg->size();
186  // If message cannot be added without violating limits, it must be dropped
187  if (!preparePush(size, _out.time))
188  return;
189  queue_.push_back(_out);
190  // Add size of new message to running count to maintain correctness
191  size_ += _out.msg->size();
192 }
193 
195 {
196  SnapshotMessage tmp = queue_.front();
197  queue_.pop_front();
198  // Remove size of popped message to maintain correctness of size_
199  size_ -= tmp.msg->size();
200  return tmp;
201 }
202 
204 {
205  range_t::first_type begin = queue_.begin();
206  range_t::second_type end = queue_.end();
207 
208  // Increment / Decrement iterators until time contraints are met
209  if (!start.isZero())
210  {
211  while (begin != end && (*begin).time < start)
212  ++begin;
213  }
214  if (!stop.isZero())
215  {
216  while (end != begin && (*(end - 1)).time > stop)
217  --end;
218  }
219  return range_t(begin, end);
220 }
221 
222 const int Snapshotter::QUEUE_SIZE = 10;
223 
224 Snapshotter::Snapshotter(SnapshotterOptions const& options) : options_(options), recording_(true), writing_(false)
225 {
226  status_pub_ = nh_.advertise<rosbag_snapshot_msgs::SnapshotStatus>("snapshot_status", 10);
227 }
228 
230 {
231  // Each buffer contains a pointer to the subscriber and vice versa, so we need to
232  // shutdown the subscriber to allow garbage collection to happen
233  for (std::pair<const std::string, boost::shared_ptr<MessageQueue>>& buffer : buffers_)
234  {
235  buffer.second->sub_->shutdown();
236  }
237 }
238 
240 {
241  if (options.duration_limit_ == SnapshotterTopicOptions::INHERIT_DURATION_LIMIT)
243  if (options.memory_limit_ == SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT)
245 }
246 
248 {
249  size_t ind = file.rfind(".bag");
250  // If requested ends in .bag, this is literal name do not append date
251  if (ind != string::npos && ind == file.size() - 4)
252  {
253  return true;
254  }
255  // Otherwise treat as prefix and append datetime and extension
256  file += timeAsStr() + ".bag";
257  return true;
258 }
259 
261 {
262  std::stringstream msg;
263  const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
264  boost::posix_time::time_facet* const f = new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
265  msg.imbue(std::locale(msg.getloc(), f));
266  msg << now;
267  return msg.str();
268 }
269 
272 {
273  // If recording is paused (or writing), exit
274  {
275  boost::shared_lock<boost::upgrade_mutex> lock(state_lock_);
276  if (!recording_)
277  {
278  return;
279  }
280  }
281 
282  // Pack message and metadata into SnapshotMessage holder
283  SnapshotMessage out(msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), msg_event.getReceiptTime());
284  queue->push(out);
285 }
286 
288 {
289  ROS_INFO("Subscribing to %s", topic.c_str());
290 
291  shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
293  ops.topic = topic;
294  ops.queue_size = QUEUE_SIZE;
295  ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
296  ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
297  ops.helper =
298  boost::make_shared<ros::SubscriptionCallbackHelperT<const ros::MessageEvent<topic_tools::ShapeShifter const>&> >(
299  boost::bind(&Snapshotter::topicCB, this, _1, queue));
300  *sub = nh_.subscribe(ops);
301  queue->setSubscriber(sub);
302 }
303 
304 bool Snapshotter::writeTopic(rosbag::Bag& bag, MessageQueue& message_queue, string const& topic,
305  rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
306  rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
307 {
308  // acquire lock for this queue
309  boost::mutex::scoped_lock l(message_queue.lock);
310 
311  MessageQueue::range_t range = message_queue.rangeFromTimes(req.start_time, req.stop_time);
312 
313  // open bag if this the first valid topic and there is data
314  if (!bag.isOpen() && range.second > range.first)
315  {
316  try
317  {
318  bag.open(req.filename, rosbag::bagmode::Write);
319  }
320  catch (rosbag::BagException const& err)
321  {
322  res.success = false;
323  res.message = string("failed to open bag: ") + err.what();
324  return false;
325  }
326  ROS_INFO("Writing snapshot to %s", req.filename.c_str());
327  }
328 
329  // write queue
330  try
331  {
332  for (MessageQueue::range_t::first_type msg_it = range.first; msg_it != range.second; ++msg_it)
333  {
334  SnapshotMessage const& msg = *msg_it;
335  bag.write(topic, msg.time, msg.msg, msg.connection_header);
336  }
337  }
338  catch (rosbag::BagException const& err)
339  {
340  res.success = false;
341  res.message = string("failed to write bag: ") + err.what();
342  }
343  return true;
344 }
345 
346 bool Snapshotter::triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
347  rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
348 {
349  if (!postfixFilename(req.filename))
350  {
351  res.success = false;
352  res.message = "invalid";
353  return true;
354  }
355  bool recording_prior; // Store if we were recording prior to write to restore this state after write
356  {
357  boost::upgrade_lock<boost::upgrade_mutex> read_lock(state_lock_);
358  recording_prior = recording_;
359  if (writing_)
360  {
361  res.success = false;
362  res.message = "Already writing";
363  return true;
364  }
365  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
366  if (recording_prior)
367  pause();
368  writing_ = true;
369  }
370 
371  // Ensure that state is updated when function exits, regardlesss of branch path / exception events
372  BOOST_SCOPE_EXIT(&state_lock_, &writing_, recording_prior, this_)
373  {
374  // Clear buffers beacuase time gaps (skipped messages) may have occured while paused
375  boost::unique_lock<boost::upgrade_mutex> write_lock(state_lock_);
376  // Turn off writing flag and return recording to its state before writing
377  writing_ = false;
378  if (recording_prior)
379  this_->resume();
380  }
381  BOOST_SCOPE_EXIT_END
382 
383  // Create bag
384  rosbag::Bag bag;
385 
386  // Write each selected topic's queue to bag file
387  if (req.topics.size() && req.topics.at(0).size())
388  {
389  for (std::string& topic : req.topics)
390  {
391  // Resolve and clean topic
392  try
393  {
394  topic = ros::names::resolve(nh_.getNamespace(), topic);
395  }
396  catch (ros::InvalidNameException const& err)
397  {
398  ROS_WARN("Requested topic %s is invalid, skipping.", topic.c_str());
399  continue;
400  }
401 
402  // Find the message queue for this topic if it exsists
403  buffers_t::iterator found = buffers_.find(topic);
404  // If topic not found, error and exit
405  if (found == buffers_.end())
406  {
407  ROS_WARN("Requested topic %s is not subscribed, skipping.", topic.c_str());
408  continue;
409  }
410  MessageQueue& message_queue = *(*found).second;
411  if (!writeTopic(bag, message_queue, topic, req, res))
412  return true;
413  }
414  }
415  // If topic list empty, record all buffered topics
416  else
417  {
418  for (const buffers_t::value_type& pair : buffers_)
419  {
420  MessageQueue& message_queue = *(pair.second);
421  std::string const& topic = pair.first;
422  if (!writeTopic(bag, message_queue, topic, req, res))
423  return true;
424  }
425  }
426 
427  // If no topics were subscribed/valid/contained data, this is considered a non-success
428  if (!bag.isOpen())
429  {
430  res.success = false;
431  res.message = res.NO_DATA_MESSAGE;
432  return true;
433  }
434 
435  res.success = true;
436  return true;
437 }
438 
440 {
441  for (const buffers_t::value_type& pair : buffers_)
442  {
443  pair.second->clear();
444  }
445 }
446 
448 {
449  ROS_INFO("Buffering paused");
450  recording_ = false;
451 }
452 
454 {
455  clear();
456  recording_ = true;
457  ROS_INFO("Buffering resumed and old data cleared.");
458 }
459 
460 bool Snapshotter::enableCB(std_srvs::SetBool::Request& req, std_srvs::SetBool::Response& res)
461 {
462  boost::upgrade_lock<boost::upgrade_mutex> read_lock(state_lock_);
463  if (req.data && writing_) // Cannot enable while writing
464  {
465  res.success = false;
466  res.message = "cannot enable recording while writing.";
467  return true;
468  }
469  // Obtain write lock and update state if requested state is different from current
470  if (req.data && !recording_)
471  {
472  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
473  resume();
474  }
475  else if (!req.data && recording_)
476  {
477  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
478  pause();
479  }
480  res.success = true;
481  return true;
482 }
483 
485 {
486  (void)e; // Make your "unused variable" warnings a thing of the past with cast to void!
487  // Don't bother doing computing if no one is subscribed
489  return;
490 
491  // TODO(any): consider options to make this faster
492  // (caching and updating last status, having queues track their own status)
493  rosbag_snapshot_msgs::SnapshotStatus msg;
494  {
495  boost::shared_lock<boost::upgrade_mutex> lock(state_lock_);
496  msg.enabled = recording_;
497  }
498  std::string node_id = ros::this_node::getName();
499  for (const buffers_t::value_type& pair : buffers_)
500  {
501  rosgraph_msgs::TopicStatistics status;
502  status.node_sub = node_id;
503  status.topic = pair.first;
504  pair.second->fillStatus(status);
505  msg.topics.push_back(status);
506  }
507 
508  status_pub_.publish(msg);
509 }
510 
512 {
513  (void)e;
515  if (ros::master::getTopics(topics))
516  {
517  for (ros::master::TopicInfo const& t : topics)
518  {
519  std::string topic = t.name;
520  if (options->addTopic(topic))
521  {
522  SnapshotterTopicOptions topic_options;
523  fixTopicOptions(topic_options);
525  queue.reset(new MessageQueue(topic_options));
526  std::pair<buffers_t::iterator, bool> res = buffers_.insert(buffers_t::value_type(topic, queue));
527  ROS_ASSERT_MSG(res.second, "failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
528  subscribe(topic, queue);
529  }
530  }
531  }
532  else
533  {
534  ROS_WARN_THROTTLE(5, "Failed to get topics from the ROS master");
535  }
536 }
537 
539 {
540  if (!nh_.ok())
541  return 0;
542 
543  // Create the queue for each topic and set up the subscriber to add to it on new messages
544  for (SnapshotterOptions::topics_t::value_type& pair : options_.topics_)
545  {
546  string topic = ros::names::resolve(nh_.getNamespace(), pair.first);
547  fixTopicOptions(pair.second);
549  queue.reset(new MessageQueue(pair.second));
550  std::pair<buffers_t::iterator, bool> res = buffers_.insert(buffers_t::value_type(topic, queue));
551  ROS_ASSERT_MSG(res.second, "failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
552  subscribe(topic, queue);
553  }
554 
555  // Now that subscriptions are setup, setup service servers for writing and pausing
557  enable_server_ = nh_.advertiseService("enable_snapshot", &Snapshotter::enableCB, this);
558 
559  // Start timer to publish status regularly
562 
563  // Start timer to poll ROS master for topics
564  if (options_.all_topics_)
566 
567  // Use multiple callback threads
568  ros::MultiThreadedSpinner spinner(4); // Use 4 threads
569  spinner.spin(); // spin() will not return until the node has been shutdown
570  return 0;
571 }
572 
574 {
575 }
576 
578 {
580  {
581  ros::ServiceClient client = nh_.serviceClient<rosbag_snapshot_msgs::TriggerSnapshot>("trigger_snapshot");
582  if (!client.exists())
583  {
584  ROS_ERROR("Service %s does not exist. Is snapshot running in this namespace?", "trigger_snapshot");
585  return 1;
586  }
587  rosbag_snapshot_msgs::TriggerSnapshotRequest req;
588  req.topics = opts.topics_;
589  // Prefix mode
590  if (opts.filename_.empty())
591  {
592  req.filename = opts.prefix_;
593  size_t ind = req.filename.rfind(".bag");
594  if (ind != string::npos && ind == req.filename.size() - 4)
595  req.filename.erase(ind);
596  }
597  else
598  {
599  req.filename = opts.filename_;
600  size_t ind = req.filename.rfind(".bag");
601  if (ind == string::npos || ind != req.filename.size() - 4)
602  req.filename += ".bag";
603  }
604 
605  // Resolve filename relative to clients working directory to avoid confusion
606  if (req.filename.empty()) // Special case of no specified file, ensure still in working directory of client
607  req.filename = "./";
608  boost::filesystem::path p(boost::filesystem::system_complete(req.filename));
609  req.filename = p.string();
610 
611  rosbag_snapshot_msgs::TriggerSnapshotResponse res;
612  if (!client.call(req, res))
613  {
614  ROS_ERROR("Failed to call service");
615  return 1;
616  }
617  if (!res.success)
618  {
619  ROS_ERROR("%s", res.message.c_str());
620  return 1;
621  }
622  return 0;
623  }
625  {
626  ros::ServiceClient client = nh_.serviceClient<std_srvs::SetBool>("enable_snapshot");
627  if (!client.exists())
628  {
629  ROS_ERROR("Service %s does not exist. Is snapshot running in this namespace?", "enable_snapshot");
630  return 1;
631  }
632  std_srvs::SetBoolRequest req;
633  req.data = (opts.action_ == SnapshotterClientOptions::RESUME);
634  std_srvs::SetBoolResponse res;
635  if (!client.call(req, res))
636  {
637  ROS_ERROR("Failed to call service.");
638  return 1;
639  }
640  if (!res.success)
641  {
642  ROS_ERROR("%s", res.message.c_str());
643  return 1;
644  }
645  return 0;
646  }
647  else
648  {
649  ROS_ASSERT_MSG(false, "Invalid value of enum.");
650  return 1;
651  }
652 }
653 
654 } // namespace rosbag_snapshot
void subscribe(std::string const &topic, boost::shared_ptr< MessageQueue > queue)
ServiceClient serviceClient(const std::string &service_name, bool persistent=false, const M_string &header_values=M_string())
bool postfixFilename(std::string &file)
void topicCB(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, boost::shared_ptr< MessageQueue > queue)
boost::shared_ptr< ros::M_string > connection_header
Definition: snapshotter.h:118
void pollTopics(ros::TimerEvent const &e, rosbag_snapshot::SnapshotterOptions *options)
ros::Time getReceiptTime() const
#define ROS_WARN_THROTTLE(rate,...)
boost::shared_ptr< ros::Subscriber > sub_
Definition: snapshotter.h:141
void publishStatus(ros::TimerEvent const &e)
std::string timeAsStr()
Return current local datetime as a string such as 2018-05-22-14-28-51. Used to generate bag filenames...
void publish(const boost::shared_ptr< M > &message) const
bool isOpen() const
bool writeTopic(rosbag::Bag &bag, MessageQueue &message_queue, std::string const &topic, rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
void open(std::string const &filename, uint32_t mode=bagmode::Read)
f
bool enableCB(std_srvs::SetBool::Request &req, std_srvs::SetBool::Response &res)
ros::ServiceServer enable_server_
Definition: snapshotter.h:201
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
SnapshotterTopicOptions(ros::Duration duration_limit=INHERIT_DURATION_LIMIT, int32_t memory_limit=INHERIT_MEMORY_LIMIT)
Definition: snapshotter.cpp:62
static const int32_t INHERIT_MEMORY_LIMIT
Definition: snapshotter.h:72
bool call(MReq &req, MRes &res)
void push(SnapshotMessage const &msg)
static const ros::Duration NO_DURATION_LIMIT
Definition: snapshotter.h:66
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
topic_tools::ShapeShifter::ConstPtr msg
Definition: snapshotter.h:117
void fixTopicOptions(SnapshotterTopicOptions &options)
std::vector< TopicInfo > V_TopicInfo
#define ROS_WARN(...)
boost::upgrade_mutex state_lock_
Definition: snapshotter.h:194
bool preparePush(int32_t size, ros::Time const &time)
bool addTopic(std::string const &topic, ros::Duration duration_limit=SnapshotterTopicOptions::INHERIT_DURATION_LIMIT, int32_t memory_limit=SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT)
Definition: snapshotter.cpp:76
SnapshotterOptions options_
Definition: snapshotter.h:190
SubscriptionCallbackHelperPtr helper
std::pair< queue_t::const_iterator, queue_t::const_iterator > range_t
Definition: snapshotter.h:157
void _push(SnapshotMessage const &msg)
SnapshotterTopicOptions options_
Definition: snapshotter.h:135
std::vector< std::string > topics_
Definition: snapshotter.h:252
#define ROS_INFO(...)
#define ROS_ASSERT_MSG(cond,...)
static const int QUEUE_SIZE
Definition: snapshotter.h:189
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
const std::string & getNamespace() const
static const ros::Duration INHERIT_DURATION_LIMIT
Definition: snapshotter.h:70
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
virtual void spin(CallbackQueue *queue=0)
MessageQueue(SnapshotterTopicOptions const &options)
Definition: snapshotter.cpp:94
uint32_t getNumSubscribers() const
bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
ros::Duration duration() const
void fillStatus(rosgraph_msgs::TopicStatistics &status)
SnapshotterOptions(ros::Duration default_duration_limit=ros::Duration(30), int32_t default_memory_limit=-1, ros::Duration status_period=ros::Duration(1))
Definition: snapshotter.cpp:67
ros::ServiceServer trigger_snapshot_server_
Definition: snapshotter.h:200
bool ok() const
SnapshotMessage(topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
Definition: snapshotter.cpp:88
void setSubscriber(boost::shared_ptr< ros::Subscriber > sub)
Definition: snapshotter.cpp:98
#define ROS_ERROR(...)
int run(SnapshotterClientOptions const &opts)
void write(std::string const &topic, ros::MessageEvent< T > const &event)
range_t rangeFromTimes(ros::Time const &start, ros::Time const &end)
boost::shared_ptr< M > getMessage() const
Snapshotter(SnapshotterOptions const &options)


rosbag_snapshot
Author(s): Kevin Allen
autogenerated on Mon Jan 18 2021 03:45:11