snapshotter.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2018, Open Source Robotics Foundation, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Open Source Robotics Foundation, Inc. nor the
18 * names of its contributors may be used to endorse or promote products
19 * derived from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 #include <queue>
35 #include <string>
36 #include <time.h>
37 #include <vector>
38 #include <boost/filesystem.hpp>
39 #include <boost/scope_exit.hpp>
40 #include <boost/thread/xtime.hpp>
41 #include <boost/thread/shared_mutex.hpp>
42 #include <boost/date_time/local_time/local_time.hpp>
43 #include <ros/ros.h>
44 #include <ros/assert.h>
46 #include <rosbag/stream.h>
47 #include <rosbag_snapshot_msgs/SnapshotStatus.h>
49 #include <memory>
50 #include <utility>
51 
52 using std::string;
53 using boost::shared_ptr;
54 using ros::Time;
55 
56 namespace rosbag_snapshot
57 {
64 
66  int32_t count_limit)
67  : duration_limit_(duration_limit), memory_limit_(memory_limit), count_limit_(count_limit)
68 {
69 }
70 
71 SnapshotterOptions::SnapshotterOptions(ros::Duration default_duration_limit, int32_t default_memory_limit,
72  int32_t default_count_limit, ros::Duration status_period)
73  : default_duration_limit_(default_duration_limit)
74  , default_memory_limit_(default_memory_limit)
75  , default_count_limit_(default_count_limit)
76  , status_period_(status_period)
77  , topics_()
78 {
79 }
80 
81 bool SnapshotterOptions::addTopic(std::string const& topic, ros::Duration duration, int32_t memory, int32_t count)
82 {
83  SnapshotterTopicOptions ops(duration, memory, count);
84  std::pair<topics_t::iterator, bool> ret;
85  ret = topics_.insert(topics_t::value_type(topic, ops));
86  return ret.second;
87 }
88 
90 {
91 }
92 
94  boost::shared_ptr<ros::M_string> _connection_header, Time _time)
95  : msg(_msg), connection_header(_connection_header), time(_time)
96 {
97 }
98 
99 MessageQueue::MessageQueue(SnapshotterTopicOptions const& options) : options_(options), size_(0)
100 {
101 }
102 
104 {
105  sub_ = sub;
106 }
107 
108 void MessageQueue::fillStatus(rosgraph_msgs::TopicStatistics& status)
109 {
110  boost::mutex::scoped_lock l(lock);
111  if (!queue_.size())
112  return;
113  status.traffic = size_;
114  status.delivered_msgs = queue_.size();
115  status.window_start = queue_.front().time;
116  status.window_stop = queue_.back().time;
117 }
118 
120 {
121  boost::mutex::scoped_lock l(lock);
122  _clear();
123 }
124 
126 {
127  queue_.clear();
128  size_ = 0;
129 }
130 
132 {
133  // No duration if 0 or 1 messages
134  if (queue_.size() <= 1)
135  return ros::Duration();
136  return queue_.back().time - queue_.front().time;
137 }
138 
139 bool MessageQueue::preparePush(int32_t size, ros::Time const& time)
140 {
141  // If new message is older than back of queue, time has gone backwards and buffer must be cleared
142  if (!queue_.empty() && time < queue_.back().time)
143  {
144  ROS_WARN("Time has gone backwards. Clearing buffer for this topic.");
145  _clear();
146  }
147 
148  // The only case where message cannot be addded is if size is greater than limit
149  if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT && size > options_.memory_limit_)
150  return false;
151 
152  // If memory limit is enforced, remove elements from front of queue until limit would be met once message is added
153  if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT)
154  while (queue_.size() != 0 && size_ + size > options_.memory_limit_)
155  _pop();
156 
157  // If duration limit is encforced, remove elements from front of queue until duration limit would be met once message
158  // is added
159  if (options_.duration_limit_ > SnapshotterTopicOptions::NO_DURATION_LIMIT && queue_.size() != 0)
160  {
161  ros::Duration dt = time - queue_.front().time;
162  while (dt > options_.duration_limit_)
163  {
164  _pop();
165  if (queue_.empty())
166  break;
167  dt = time - queue_.front().time;
168  }
169  }
170 
171  // If count limit is enforced, remove elements from front of queue until the the count is below the limit
172  if (options_.count_limit_ > SnapshotterTopicOptions::NO_COUNT_LIMIT && queue_.size() != 0)
173  while (queue_.size() != 0 && queue_.size() >= options_.count_limit_)
174  _pop();
175 
176  return true;
177 }
179 {
180  boost::mutex::scoped_try_lock l(lock);
181  if (!l.owns_lock())
182  {
183  ROS_ERROR("Failed to lock. Time %f", _out.time.toSec());
184  return;
185  }
186  _push(_out);
187 }
188 
190 {
191  boost::mutex::scoped_lock l(lock);
192  return _pop();
193 }
194 
195 int64_t MessageQueue::getMessageSize(SnapshotMessage const& snapshot_msg) const
196 {
197  return snapshot_msg.msg->size() +
198  snapshot_msg.connection_header->size() +
199  snapshot_msg.msg->getDataType().size() +
200  snapshot_msg.msg->getMD5Sum().size() +
201  snapshot_msg.msg->getMessageDefinition().size() +
202  sizeof(SnapshotMessage);
203 }
204 
206 {
207  int32_t size = _out.msg->size();
208  // If message cannot be added without violating limits, it must be dropped
209  if (!preparePush(size, _out.time))
210  return;
211  queue_.push_back(_out);
212  // Add size of new message to running count to maintain correctness
213  size_ += getMessageSize(_out);
214 }
215 
217 {
218  SnapshotMessage tmp = queue_.front();
219  queue_.pop_front();
220  // Remove size of popped message to maintain correctness of size_
221  size_ -= getMessageSize(tmp);
222  return tmp;
223 }
224 
226 {
227  range_t::first_type begin = queue_.begin();
228  range_t::second_type end = queue_.end();
229 
230  // Increment / Decrement iterators until time contraints are met
231  if (!start.isZero())
232  {
233  while (begin != end && (*begin).time < start)
234  ++begin;
235  }
236  if (!stop.isZero())
237  {
238  while (end != begin && (*(end - 1)).time > stop)
239  --end;
240  }
241  return range_t(begin, end);
242 }
243 
244 const int Snapshotter::QUEUE_SIZE = 10;
245 
246 Snapshotter::Snapshotter(SnapshotterOptions const& options) : options_(options), recording_(true), writing_(false)
247 {
248  status_pub_ = nh_.advertise<rosbag_snapshot_msgs::SnapshotStatus>("snapshot_status", 10);
249 }
250 
252 {
253  // Each buffer contains a pointer to the subscriber and vice versa, so we need to
254  // shutdown the subscriber to allow garbage collection to happen
255  for (std::pair<const std::string, boost::shared_ptr<MessageQueue>>& buffer : buffers_)
256  {
257  buffer.second->sub_->shutdown();
258  }
259 }
260 
262 {
263  if (options.duration_limit_ == SnapshotterTopicOptions::INHERIT_DURATION_LIMIT)
265  if (options.memory_limit_ == SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT)
267  if (options.count_limit_ == SnapshotterTopicOptions::INHERIT_COUNT_LIMIT)
269 }
270 
272 {
273  size_t ind = file.rfind(".bag");
274  // If requested ends in .bag, this is literal name do not append date
275  if (ind != string::npos && ind == file.size() - 4)
276  {
277  return true;
278  }
279  // Otherwise treat as prefix and append datetime and extension
280  file += timeAsStr() + ".bag";
281  return true;
282 }
283 
285 {
286  std::stringstream msg;
287  const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
288  boost::posix_time::time_facet* const f = new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
289  msg.imbue(std::locale(msg.getloc(), f));
290  msg << now;
291  return msg.str();
292 }
293 
296 {
297  // If recording is paused (or writing), exit
298  {
299  boost::shared_lock<boost::upgrade_mutex> lock(state_lock_);
300  if (!recording_)
301  {
302  return;
303  }
304  }
305 
306  // Pack message and metadata into SnapshotMessage holder
307  SnapshotMessage out(msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), msg_event.getReceiptTime());
308  queue->push(out);
309 }
310 
312 {
313  ROS_INFO("Subscribing to %s", topic.c_str());
314 
315  shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
317  ops.topic = topic;
318  ops.queue_size = QUEUE_SIZE;
319  ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
320  ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
321  ops.helper =
322  boost::make_shared<ros::SubscriptionCallbackHelperT<const ros::MessageEvent<topic_tools::ShapeShifter const>&> >(
323  boost::bind(&Snapshotter::topicCB, this, boost::placeholders::_1, queue));
324  *sub = nh_.subscribe(ops);
325  queue->setSubscriber(sub);
326 }
327 
328 bool Snapshotter::writeTopic(rosbag::Bag& bag, MessageQueue& message_queue, string const& topic,
329  rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
330  rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
331 {
332  // acquire lock for this queue
333  boost::mutex::scoped_lock l(message_queue.lock);
334 
335  MessageQueue::range_t range = message_queue.rangeFromTimes(req.start_time, req.stop_time);
336 
337  // open bag if this the first valid topic and there is data
338  if (!bag.isOpen() && range.second > range.first)
339  {
340  try
341  {
342  bag.open(req.filename, rosbag::bagmode::Write);
343  }
344  catch (rosbag::BagException const& err)
345  {
346  res.success = false;
347  res.message = string("failed to open bag: ") + err.what();
348  return false;
349  }
350  ROS_INFO("Writing snapshot to %s", req.filename.c_str());
351 
352  // Setting compression type
353  if (options_.compression_ == "LZ4")
354  {
355  ROS_INFO("Bag compression type LZ4");
357  }
358  else if (options_.compression_ == "BZ2")
359  {
360  ROS_INFO("Bag compression type BZ2");
362  }
363  else
364  {
366  }
367  }
368 
369  // write queue
370  try
371  {
372  for (MessageQueue::range_t::first_type msg_it = range.first; msg_it != range.second; ++msg_it)
373  {
374  SnapshotMessage const& msg = *msg_it;
375  bag.write(topic, msg.time, msg.msg, msg.connection_header);
376  }
377  }
378  catch (rosbag::BagException const& err)
379  {
380  res.success = false;
381  res.message = string("failed to write bag: ") + err.what();
382  }
383  return true;
384 }
385 
386 bool Snapshotter::triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
387  rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
388 {
389  if (!postfixFilename(req.filename))
390  {
391  res.success = false;
392  res.message = "invalid";
393  return true;
394  }
395  bool recording_prior; // Store if we were recording prior to write to restore this state after write
396  {
397  boost::upgrade_lock<boost::upgrade_mutex> read_lock(state_lock_);
398  recording_prior = recording_;
399  if (writing_)
400  {
401  res.success = false;
402  res.message = "Already writing";
403  return true;
404  }
405  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
406  if (recording_prior)
407  pause();
408  writing_ = true;
409  }
410 
411  // Ensure that state is updated when function exits, regardlesss of branch path / exception events
412  BOOST_SCOPE_EXIT(&state_lock_, &writing_, recording_prior, this_)
413  {
414  // Clear buffers beacuase time gaps (skipped messages) may have occured while paused
415  boost::unique_lock<boost::upgrade_mutex> write_lock(state_lock_);
416  // Turn off writing flag and return recording to its state before writing
417  writing_ = false;
418  if (recording_prior)
419  this_->resume();
420  }
421  BOOST_SCOPE_EXIT_END
422 
423  // Create bag
424  rosbag::Bag bag;
425 
426  // Write each selected topic's queue to bag file
427  if (req.topics.size() && req.topics.at(0).size())
428  {
429  for (std::string& topic : req.topics)
430  {
431  // Resolve and clean topic
432  try
433  {
434  topic = ros::names::resolve(nh_.getNamespace(), topic);
435  }
436  catch (ros::InvalidNameException const& err)
437  {
438  ROS_WARN("Requested topic %s is invalid, skipping.", topic.c_str());
439  continue;
440  }
441 
442  // Find the message queue for this topic if it exsists
443  buffers_t::iterator found = buffers_.find(topic);
444  // If topic not found, error and exit
445  if (found == buffers_.end())
446  {
447  ROS_WARN("Requested topic %s is not subscribed, skipping.", topic.c_str());
448  continue;
449  }
450  MessageQueue& message_queue = *(*found).second;
451  if (!writeTopic(bag, message_queue, topic, req, res))
452  return true;
453  }
454  }
455  // If topic list empty, record all buffered topics
456  else
457  {
458  for (const buffers_t::value_type& pair : buffers_)
459  {
460  MessageQueue& message_queue = *(pair.second);
461  std::string const& topic = pair.first;
462  if (!writeTopic(bag, message_queue, topic, req, res))
463  return true;
464  }
465  }
466 
467  // If no topics were subscribed/valid/contained data, this is considered a non-success
468  if (!bag.isOpen())
469  {
470  res.success = false;
471  res.message = res.NO_DATA_MESSAGE;
472  return true;
473  }
474 
475  res.success = true;
476  return true;
477 }
478 
480 {
481  for (const buffers_t::value_type& pair : buffers_)
482  {
483  pair.second->clear();
484  }
485 }
486 
488 {
489  ROS_INFO("Buffering paused");
490  recording_ = false;
491 }
492 
494 {
495  clear();
496  recording_ = true;
497  ROS_INFO("Buffering resumed and old data cleared.");
498 }
499 
500 bool Snapshotter::enableCB(std_srvs::SetBool::Request& req, std_srvs::SetBool::Response& res)
501 {
502  boost::upgrade_lock<boost::upgrade_mutex> read_lock(state_lock_);
503  if (req.data && writing_) // Cannot enable while writing
504  {
505  res.success = false;
506  res.message = "cannot enable recording while writing.";
507  return true;
508  }
509  // Obtain write lock and update state if requested state is different from current
510  if (req.data && !recording_)
511  {
512  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
513  resume();
514  }
515  else if (!req.data && recording_)
516  {
517  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
518  pause();
519  }
520  res.success = true;
521  return true;
522 }
523 
525 {
526  (void)e; // Make your "unused variable" warnings a thing of the past with cast to void!
527  // Don't bother doing computing if no one is subscribed
529  return;
530 
531  // TODO(any): consider options to make this faster
532  // (caching and updating last status, having queues track their own status)
533  rosbag_snapshot_msgs::SnapshotStatus msg;
534  {
535  boost::shared_lock<boost::upgrade_mutex> lock(state_lock_);
536  msg.enabled = recording_;
537  }
538  std::string node_id = ros::this_node::getName();
539  for (const buffers_t::value_type& pair : buffers_)
540  {
541  rosgraph_msgs::TopicStatistics status;
542  status.node_sub = node_id;
543  status.topic = pair.first;
544  pair.second->fillStatus(status);
545  msg.topics.push_back(status);
546  }
547 
548  status_pub_.publish(msg);
549 }
550 
552 {
553  (void)e;
555  if (ros::master::getTopics(topics))
556  {
557  for (ros::master::TopicInfo const& t : topics)
558  {
559  std::string topic = t.name;
560  if (options->addTopic(topic))
561  {
562  SnapshotterTopicOptions topic_options;
563  fixTopicOptions(topic_options);
565  queue.reset(new MessageQueue(topic_options));
566  std::pair<buffers_t::iterator, bool> res = buffers_.insert(buffers_t::value_type(topic, queue));
567  ROS_ASSERT_MSG(res.second, "failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
568  subscribe(topic, queue);
569  }
570  }
571  }
572  else
573  {
574  ROS_WARN_THROTTLE(5, "Failed to get topics from the ROS master");
575  }
576 }
577 
579 {
580  if (!nh_.ok())
581  return 0;
582 
583  // Create the queue for each topic and set up the subscriber to add to it on new messages
584  for (SnapshotterOptions::topics_t::value_type& pair : options_.topics_)
585  {
586  string topic = ros::names::resolve(nh_.getNamespace(), pair.first);
587  fixTopicOptions(pair.second);
589  queue.reset(new MessageQueue(pair.second));
590  std::pair<buffers_t::iterator, bool> res = buffers_.insert(buffers_t::value_type(topic, queue));
591  ROS_ASSERT_MSG(res.second, "failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
592  subscribe(topic, queue);
593  }
594 
595  // Now that subscriptions are setup, setup service servers for writing and pausing
597  enable_server_ = nh_.advertiseService("enable_snapshot", &Snapshotter::enableCB, this);
598 
599  // Start timer to publish status regularly
602 
603  // Start timer to poll ROS master for topics
604  if (options_.all_topics_)
606  boost::bind(&Snapshotter::pollTopics, this,
607  boost::placeholders::_1, &options_));
608 
609  // Use multiple callback threads
610  ros::MultiThreadedSpinner spinner(4); // Use 4 threads
611  spinner.spin(); // spin() will not return until the node has been shutdown
612  return 0;
613 }
614 
616 {
617 }
618 
620 {
622  {
623  ros::ServiceClient client = nh_.serviceClient<rosbag_snapshot_msgs::TriggerSnapshot>("trigger_snapshot");
624  if (!client.exists())
625  {
626  ROS_ERROR("Service %s does not exist. Is snapshot running in this namespace?", "trigger_snapshot");
627  return 1;
628  }
629  rosbag_snapshot_msgs::TriggerSnapshotRequest req;
630  req.topics = opts.topics_;
631  // Prefix mode
632  if (opts.filename_.empty())
633  {
634  req.filename = opts.prefix_;
635  size_t ind = req.filename.rfind(".bag");
636  if (ind != string::npos && ind == req.filename.size() - 4)
637  req.filename.erase(ind);
638  }
639  else
640  {
641  req.filename = opts.filename_;
642  size_t ind = req.filename.rfind(".bag");
643  if (ind == string::npos || ind != req.filename.size() - 4)
644  req.filename += ".bag";
645  }
646 
647  // Resolve filename relative to clients working directory to avoid confusion
648  if (req.filename.empty()) // Special case of no specified file, ensure still in working directory of client
649  req.filename = "./";
650  boost::filesystem::path p(boost::filesystem::system_complete(req.filename));
651  req.filename = p.string();
652 
653  rosbag_snapshot_msgs::TriggerSnapshotResponse res;
654  if (!client.call(req, res))
655  {
656  ROS_ERROR("Failed to call service");
657  return 1;
658  }
659  if (!res.success)
660  {
661  ROS_ERROR("%s", res.message.c_str());
662  return 1;
663  }
664  return 0;
665  }
667  {
668  ros::ServiceClient client = nh_.serviceClient<std_srvs::SetBool>("enable_snapshot");
669  if (!client.exists())
670  {
671  ROS_ERROR("Service %s does not exist. Is snapshot running in this namespace?", "enable_snapshot");
672  return 1;
673  }
674  std_srvs::SetBoolRequest req;
675  req.data = (opts.action_ == SnapshotterClientOptions::RESUME);
676  std_srvs::SetBoolResponse res;
677  if (!client.call(req, res))
678  {
679  ROS_ERROR("Failed to call service.");
680  return 1;
681  }
682  if (!res.success)
683  {
684  ROS_ERROR("%s", res.message.c_str());
685  return 1;
686  }
687  return 0;
688  }
689  else
690  {
691  ROS_ASSERT_MSG(false, "Invalid value of enum.");
692  return 1;
693  }
694 }
695 
696 } // namespace rosbag_snapshot
void subscribe(std::string const &topic, boost::shared_ptr< MessageQueue > queue)
ServiceClient serviceClient(const std::string &service_name, bool persistent=false, const M_string &header_values=M_string())
bool postfixFilename(std::string &file)
boost::shared_ptr< ros::M_string > connection_header
Definition: snapshotter.h:129
void pollTopics(ros::TimerEvent const &e, rosbag_snapshot::SnapshotterOptions *options)
boost::shared_ptr< ros::Subscriber > sub_
Definition: snapshotter.h:152
void publishStatus(ros::TimerEvent const &e)
std::string timeAsStr()
Return current local datetime as a string such as 2018-05-22-14-28-51. Used to generate bag filenames...
bool writeTopic(rosbag::Bag &bag, MessageQueue &message_queue, std::string const &topic, rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
void open(std::string const &filename, uint32_t mode=bagmode::Read)
f
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
bool enableCB(std_srvs::SetBool::Request &req, std_srvs::SetBool::Response &res)
ros::ServiceServer enable_server_
Definition: snapshotter.h:215
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
boost::shared_ptr< M > getMessage() const
static const int32_t INHERIT_MEMORY_LIMIT
Definition: snapshotter.h:74
void topicCB(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, boost::shared_ptr< MessageQueue > queue)
bool call(MReq &req, MRes &res)
void push(SnapshotMessage const &msg)
static const ros::Duration NO_DURATION_LIMIT
Definition: snapshotter.h:66
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
topic_tools::ShapeShifter::ConstPtr msg
Definition: snapshotter.h:128
void fixTopicOptions(SnapshotterTopicOptions &options)
std::vector< TopicInfo > V_TopicInfo
#define ROS_WARN(...)
boost::upgrade_mutex state_lock_
Definition: snapshotter.h:208
bool preparePush(int32_t size, ros::Time const &time)
int64_t getMessageSize(SnapshotMessage const &msg) const
SnapshotterOptions(ros::Duration default_duration_limit=ros::Duration(30), int32_t default_memory_limit=-1, int32_t default_count_limit=-1, ros::Duration status_period=ros::Duration(1))
Definition: snapshotter.cpp:71
SnapshotterOptions options_
Definition: snapshotter.h:204
bool isOpen() const
void publish(const boost::shared_ptr< M > &message) const
SubscriptionCallbackHelperPtr helper
static const int32_t INHERIT_COUNT_LIMIT
Definition: snapshotter.h:76
std::pair< queue_t::const_iterator, queue_t::const_iterator > range_t
Definition: snapshotter.h:168
void _push(SnapshotMessage const &msg)
#define ROS_WARN_THROTTLE(period,...)
SnapshotterTopicOptions options_
Definition: snapshotter.h:146
std::vector< std::string > topics_
Definition: snapshotter.h:266
#define ROS_INFO(...)
#define ROS_ASSERT_MSG(cond,...)
static const int QUEUE_SIZE
Definition: snapshotter.h:203
void setCompression(CompressionType compression)
ros::Duration duration() const
static const ros::Duration INHERIT_DURATION_LIMIT
Definition: snapshotter.h:72
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
virtual void spin(CallbackQueue *queue=0)
MessageQueue(SnapshotterTopicOptions const &options)
Definition: snapshotter.cpp:99
ros::Time getReceiptTime() const
SnapshotterTopicOptions(ros::Duration duration_limit=INHERIT_DURATION_LIMIT, int32_t memory_limit=INHERIT_MEMORY_LIMIT, int32_t count_limit=INHERIT_COUNT_LIMIT)
Definition: snapshotter.cpp:65
const std::string & getNamespace() const
bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
void fillStatus(rosgraph_msgs::TopicStatistics &status)
ros::ServiceServer trigger_snapshot_server_
Definition: snapshotter.h:214
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
SnapshotMessage(topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
Definition: snapshotter.cpp:93
uint32_t getNumSubscribers() const
bool ok() const
void setSubscriber(boost::shared_ptr< ros::Subscriber > sub)
#define ROS_ERROR(...)
int run(SnapshotterClientOptions const &opts)
void write(std::string const &topic, ros::MessageEvent< T > const &event)
range_t rangeFromTimes(ros::Time const &start, ros::Time const &end)
Snapshotter(SnapshotterOptions const &options)
bool addTopic(std::string const &topic, ros::Duration duration_limit=SnapshotterTopicOptions::INHERIT_DURATION_LIMIT, int32_t memory_limit=SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT, int32_t count_limit=SnapshotterTopicOptions::INHERIT_COUNT_LIMIT)
Definition: snapshotter.cpp:81


rosbag_snapshot
Author(s): Kevin Allen
autogenerated on Mon May 22 2023 02:10:47