snapshotter.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2018, Open Source Robotics Foundation, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Open Source Robotics Foundation, Inc. nor the
18 * names of its contributors may be used to endorse or promote products
19 * derived from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 #include <queue>
35 #include <string>
36 #include <time.h>
37 #include <vector>
38 #include <boost/filesystem.hpp>
39 #include <boost/scope_exit.hpp>
40 #include <boost/thread/xtime.hpp>
41 #include <boost/thread/shared_mutex.hpp>
42 #include <boost/date_time/local_time/local_time.hpp>
43 #include <ros/ros.h>
44 #include <ros/assert.h>
46 #include <rosbag/stream.h>
47 #include <rosbag_snapshot_msgs/SnapshotStatus.h>
49 #include <memory>
50 #include <utility>
51 
52 using std::string;
53 using boost::shared_ptr;
54 using ros::Time;
55 
56 namespace rosbag_snapshot
57 {
64 
66  int32_t count_limit)
67  : duration_limit_(duration_limit), memory_limit_(memory_limit), count_limit_(count_limit)
68 {
69 }
70 
71 SnapshotterOptions::SnapshotterOptions(ros::Duration default_duration_limit, int32_t default_memory_limit,
72  int32_t default_count_limit, ros::Duration status_period, bool clear_buffer)
73  : default_duration_limit_(default_duration_limit)
74  , default_memory_limit_(default_memory_limit)
75  , default_count_limit_(default_count_limit)
76  , status_period_(status_period)
77  , clear_buffer_(clear_buffer)
78  , topics_()
79 {
80 }
81 
82 bool SnapshotterOptions::addTopic(std::string const& topic, ros::Duration duration, int32_t memory, int32_t count)
83 {
84  SnapshotterTopicOptions ops(duration, memory, count);
85  std::pair<topics_t::iterator, bool> ret;
86  ret = topics_.insert(topics_t::value_type(topic, ops));
87  return ret.second;
88 }
89 
91 {
92 }
93 
95  boost::shared_ptr<ros::M_string> _connection_header, Time _time)
96  : msg(_msg), connection_header(_connection_header), time(_time)
97 {
98 }
99 
100 MessageQueue::MessageQueue(SnapshotterTopicOptions const& options) : options_(options), size_(0)
101 {
102 }
103 
105 {
106  sub_ = sub;
107 }
108 
109 void MessageQueue::fillStatus(rosgraph_msgs::TopicStatistics& status)
110 {
111  boost::mutex::scoped_lock l(lock);
112  if (!queue_.size())
113  return;
114  status.traffic = size_;
115  status.delivered_msgs = queue_.size();
116  status.window_start = queue_.front().time;
117  status.window_stop = queue_.back().time;
118 }
119 
121 {
122  boost::mutex::scoped_lock l(lock);
123  _clear();
124 }
125 
127 {
128  queue_.clear();
129  size_ = 0;
130 }
131 
133 {
134  // No duration if 0 or 1 messages
135  if (queue_.size() <= 1)
136  return ros::Duration();
137  return queue_.back().time - queue_.front().time;
138 }
139 
140 bool MessageQueue::preparePush(int32_t size, ros::Time const& time)
141 {
142  // If new message is older than back of queue, time has gone backwards and buffer must be cleared
143  if (!queue_.empty() && time < queue_.back().time)
144  {
145  ROS_WARN("Time has gone backwards. Clearing buffer for this topic.");
146  _clear();
147  }
148 
149  // The only case where message cannot be addded is if size is greater than limit
151  return false;
152 
153  // If memory limit is enforced, remove elements from front of queue until limit would be met once message is added
155  while (queue_.size() != 0 && size_ + size > options_.memory_limit_)
156  _pop();
157 
158  // If duration limit is encforced, remove elements from front of queue until duration limit would be met once message
159  // is added
161  {
162  ros::Duration dt = time - queue_.front().time;
163  while (dt > options_.duration_limit_)
164  {
165  _pop();
166  if (queue_.empty())
167  break;
168  dt = time - queue_.front().time;
169  }
170  }
171 
172  // If count limit is enforced, remove elements from front of queue until the the count is below the limit
174  while (queue_.size() != 0 && queue_.size() >= options_.count_limit_)
175  _pop();
176 
177  return true;
178 }
180 {
181  boost::mutex::scoped_try_lock l(lock);
182  if (!l.owns_lock())
183  {
184  ROS_ERROR("Failed to lock. Time %f", _out.time.toSec());
185  return;
186  }
187  _push(_out);
188 }
189 
191 {
192  boost::mutex::scoped_lock l(lock);
193  return _pop();
194 }
195 
196 int64_t MessageQueue::getMessageSize(SnapshotMessage const& snapshot_msg) const
197 {
198  return snapshot_msg.msg->size() +
199  snapshot_msg.connection_header->size() +
200  snapshot_msg.msg->getDataType().size() +
201  snapshot_msg.msg->getMD5Sum().size() +
202  snapshot_msg.msg->getMessageDefinition().size() +
203  sizeof(SnapshotMessage);
204 }
205 
207 {
208  int32_t size = _out.msg->size();
209  // If message cannot be added without violating limits, it must be dropped
210  if (!preparePush(size, _out.time))
211  return;
212  queue_.push_back(_out);
213  // Add size of new message to running count to maintain correctness
214  size_ += getMessageSize(_out);
215 }
216 
218 {
219  SnapshotMessage tmp = queue_.front();
220  queue_.pop_front();
221  // Remove size of popped message to maintain correctness of size_
222  size_ -= getMessageSize(tmp);
223  return tmp;
224 }
225 
227 {
228  range_t::first_type begin = queue_.begin();
229  range_t::second_type end = queue_.end();
230 
231  // Increment / Decrement iterators until time contraints are met
232  if (!start.isZero())
233  {
234  while (begin != end && (*begin).time < start)
235  ++begin;
236  }
237  if (!stop.isZero())
238  {
239  while (end != begin && (*(end - 1)).time > stop)
240  --end;
241  }
242  return range_t(begin, end);
243 }
244 
245 Snapshotter::Snapshotter(SnapshotterOptions const& options) : options_(options), recording_(true), writing_(false)
246 {
247  status_pub_ = nh_.advertise<rosbag_snapshot_msgs::SnapshotStatus>("snapshot_status", 10);
248 }
249 
251 {
252  // Each buffer contains a pointer to the subscriber and vice versa, so we need to
253  // shutdown the subscriber to allow garbage collection to happen
254  for (std::pair<const std::string, boost::shared_ptr<MessageQueue>>& buffer : buffers_)
255  {
256  buffer.second->sub_->shutdown();
257  }
258 }
259 
261 {
268 }
269 
271 {
272  size_t ind = file.rfind(".bag");
273  // If requested ends in .bag, this is literal name do not append date
274  if (ind != string::npos && ind == file.size() - 4)
275  {
276  return true;
277  }
278  // Otherwise treat as prefix and append datetime and extension
279  file += timeAsStr() + ".bag";
280  return true;
281 }
282 
284 {
285  std::stringstream msg;
286  const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
287  boost::posix_time::time_facet* const f = new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S");
288  msg.imbue(std::locale(msg.getloc(), f));
289  msg << now;
290  return msg.str();
291 }
292 
295 {
296  // If recording is paused (or writing), exit
297  {
298  boost::shared_lock<boost::upgrade_mutex> lock(state_lock_);
299  if (!recording_)
300  {
301  return;
302  }
303  }
304 
305  // Pack message and metadata into SnapshotMessage holder
306  SnapshotMessage out(msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), msg_event.getReceiptTime());
307  queue->push(out);
308 }
309 
311 {
312  ROS_INFO("Subscribing to %s", topic.c_str());
313 
314  shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
316  ops.topic = topic;
318  ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
319  ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
320  ops.helper =
321  boost::make_shared<ros::SubscriptionCallbackHelperT<const ros::MessageEvent<topic_tools::ShapeShifter const>&> >(
322  std::bind(&Snapshotter::topicCB, this, std::placeholders::_1, queue));
323  *sub = nh_.subscribe(ops);
324  queue->setSubscriber(sub);
325 }
326 
327 bool Snapshotter::writeTopic(rosbag::Bag& bag, MessageQueue& message_queue, string const& topic,
328  rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
329  rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
330 {
331  // acquire lock for this queue
332  boost::mutex::scoped_lock l(message_queue.lock);
333 
334  MessageQueue::range_t range = message_queue.rangeFromTimes(req.start_time, req.stop_time);
335 
336  // open bag if this the first valid topic and there is data
337  if (!bag.isOpen() && range.second > range.first)
338  {
339  try
340  {
341  bag.open(req.filename, rosbag::bagmode::Write);
342  }
343  catch (rosbag::BagException const& err)
344  {
345  res.success = false;
346  res.message = string("failed to open bag: ") + err.what();
347  return false;
348  }
349  ROS_INFO("Writing snapshot to %s", req.filename.c_str());
350 
351  // Setting compression type
352  if (options_.compression_ == "LZ4")
353  {
354  ROS_INFO("Bag compression type LZ4");
356  }
357  else if (options_.compression_ == "BZ2")
358  {
359  ROS_INFO("Bag compression type BZ2");
361  }
362  else
363  {
365  }
366  }
367 
368  // write queue
369  try
370  {
371  for (MessageQueue::range_t::first_type msg_it = range.first; msg_it != range.second; ++msg_it)
372  {
373  SnapshotMessage const& msg = *msg_it;
374  bag.write(topic, msg.time, msg.msg, msg.connection_header);
375  }
376  }
377  catch (rosbag::BagException const& err)
378  {
379  res.success = false;
380  res.message = string("failed to write bag: ") + err.what();
381  }
382  return true;
383 }
384 
385 bool Snapshotter::triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
386  rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
387 {
388  if (!postfixFilename(req.filename))
389  {
390  res.success = false;
391  res.message = "invalid";
392  return true;
393  }
394  bool recording_prior; // Store if we were recording prior to write to restore this state after write
395  {
396  boost::upgrade_lock<boost::upgrade_mutex> read_lock(state_lock_);
397  recording_prior = recording_;
398  if (writing_)
399  {
400  res.success = false;
401  res.message = "Already writing";
402  return true;
403  }
404  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
405  if (recording_prior)
406  pause();
407  writing_ = true;
408  }
409 
410  // Ensure that state is updated when function exits, regardlesss of branch path / exception events
411  BOOST_SCOPE_EXIT(&state_lock_, &writing_, recording_prior, this_)
412  {
413  // Clear buffers beacuase time gaps (skipped messages) may have occured while paused
414  boost::unique_lock<boost::upgrade_mutex> write_lock(state_lock_);
415  // Turn off writing flag and return recording to its state before writing
416  writing_ = false;
417  if (recording_prior)
418  this_->resume();
419  }
420  BOOST_SCOPE_EXIT_END
421 
422  // Create bag
423  rosbag::Bag bag;
424 
425  // Write each selected topic's queue to bag file
426  if (req.topics.size() && req.topics.at(0).size())
427  {
428  for (std::string& topic : req.topics)
429  {
430  // Resolve and clean topic
431  try
432  {
433  topic = ros::names::resolve(nh_.getNamespace(), topic);
434  }
435  catch (ros::InvalidNameException const& err)
436  {
437  ROS_WARN("Requested topic %s is invalid, skipping.", topic.c_str());
438  continue;
439  }
440 
441  // Find the message queue for this topic if it exsists
442  buffers_t::iterator found = buffers_.find(topic);
443  // If topic not found, error and exit
444  if (found == buffers_.end())
445  {
446  ROS_WARN("Requested topic %s is not subscribed, skipping.", topic.c_str());
447  continue;
448  }
449  MessageQueue& message_queue = *(*found).second;
450  if (!writeTopic(bag, message_queue, topic, req, res))
451  return true;
452  }
453  }
454  // If topic list empty, record all buffered topics
455  else
456  {
457  for (const buffers_t::value_type& pair : buffers_)
458  {
459  MessageQueue& message_queue = *(pair.second);
460  std::string const& topic = pair.first;
461  if (!writeTopic(bag, message_queue, topic, req, res))
462  return true;
463  }
464  }
465 
466  // If no topics were subscribed/valid/contained data, this is considered a non-success
467  if (!bag.isOpen())
468  {
469  res.success = false;
470  res.message = res.NO_DATA_MESSAGE;
471  return true;
472  }
473 
474  res.success = true;
475  return true;
476 }
477 
479 {
480  for (const buffers_t::value_type& pair : buffers_)
481  {
482  pair.second->clear();
483  }
484 }
485 
487 {
488  ROS_INFO("Buffering paused");
489  recording_ = false;
490 }
491 
493 {
495  {
496  clear();
497  ROS_INFO("Old data cleared");
498  }
499  recording_ = true;
500  ROS_INFO("Buffering resumed");
501 }
502 
503 bool Snapshotter::enableCB(std_srvs::SetBool::Request& req, std_srvs::SetBool::Response& res)
504 {
505  boost::upgrade_lock<boost::upgrade_mutex> read_lock(state_lock_);
506  if (req.data && writing_) // Cannot enable while writing
507  {
508  res.success = false;
509  res.message = "cannot enable recording while writing.";
510  return true;
511  }
512  // Obtain write lock and update state if requested state is different from current
513  if (req.data && !recording_)
514  {
515  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
516  resume();
517  }
518  else if (!req.data && recording_)
519  {
520  boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
521  pause();
522  }
523  res.success = true;
524  return true;
525 }
526 
528 {
529  (void)e; // Make your "unused variable" warnings a thing of the past with cast to void!
530  // Don't bother doing computing if no one is subscribed
532  return;
533 
534  // TODO(any): consider options to make this faster
535  // (caching and updating last status, having queues track their own status)
536  rosbag_snapshot_msgs::SnapshotStatus msg;
537  {
538  boost::shared_lock<boost::upgrade_mutex> lock(state_lock_);
539  msg.enabled = recording_;
540  }
541  std::string node_id = ros::this_node::getName();
542  for (const buffers_t::value_type& pair : buffers_)
543  {
544  rosgraph_msgs::TopicStatistics status;
545  status.node_sub = node_id;
546  status.topic = pair.first;
547  pair.second->fillStatus(status);
548  msg.topics.push_back(status);
549  }
550 
551  status_pub_.publish(msg);
552 }
553 
555 {
556  (void)e;
558  if (ros::master::getTopics(topics))
559  {
560  for (ros::master::TopicInfo const& t : topics)
561  {
562  std::string topic = t.name;
563  if (options->addTopic(topic))
564  {
565  SnapshotterTopicOptions topic_options;
566  fixTopicOptions(topic_options);
568  queue.reset(new MessageQueue(topic_options));
569  std::pair<buffers_t::iterator, bool> res = buffers_.insert(buffers_t::value_type(topic, queue));
570  ROS_ASSERT_MSG(res.second, "failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
571  subscribe(topic, queue);
572  }
573  }
574  }
575  else
576  {
577  ROS_WARN_THROTTLE(5, "Failed to get topics from the ROS master");
578  }
579 }
580 
582 {
583  if (!nh_.ok())
584  return 0;
585 
586  // Create the queue for each topic and set up the subscriber to add to it on new messages
587  for (SnapshotterOptions::topics_t::value_type& pair : options_.topics_)
588  {
589  string topic = ros::names::resolve(nh_.getNamespace(), pair.first);
590  fixTopicOptions(pair.second);
592  queue.reset(new MessageQueue(pair.second));
593  std::pair<buffers_t::iterator, bool> res = buffers_.insert(buffers_t::value_type(topic, queue));
594  ROS_ASSERT_MSG(res.second, "failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
595  subscribe(topic, queue);
596  }
597 
598  // Now that subscriptions are setup, setup service servers for writing and pausing
600  enable_server_ = nh_.advertiseService("enable_snapshot", &Snapshotter::enableCB, this);
601 
602  // Start timer to publish status regularly
605 
606  // Start timer to poll ROS master for topics
607  if (options_.all_topics_)
609  std::bind(&Snapshotter::pollTopics, this,
610  std::placeholders::_1, &options_));
611 
612  // Use multiple callback threads
613  ros::MultiThreadedSpinner spinner(4); // Use 4 threads
614  spinner.spin(); // spin() will not return until the node has been shutdown
615  return 0;
616 }
617 
619 {
620 }
621 
623 {
625  {
626  ros::ServiceClient client = nh_.serviceClient<rosbag_snapshot_msgs::TriggerSnapshot>("trigger_snapshot");
627  if (!client.exists())
628  {
629  ROS_ERROR("Service %s does not exist. Is snapshot running in this namespace?", "trigger_snapshot");
630  return 1;
631  }
632  rosbag_snapshot_msgs::TriggerSnapshotRequest req;
633  req.topics = opts.topics_;
634  // Prefix mode
635  if (opts.filename_.empty())
636  {
637  req.filename = opts.prefix_;
638  size_t ind = req.filename.rfind(".bag");
639  if (ind != string::npos && ind == req.filename.size() - 4)
640  req.filename.erase(ind);
641  }
642  else
643  {
644  req.filename = opts.filename_;
645  size_t ind = req.filename.rfind(".bag");
646  if (ind == string::npos || ind != req.filename.size() - 4)
647  req.filename += ".bag";
648  }
649 
650  // Resolve filename relative to clients working directory to avoid confusion
651  if (req.filename.empty()) // Special case of no specified file, ensure still in working directory of client
652  req.filename = "./";
653  boost::filesystem::path p(boost::filesystem::system_complete(req.filename));
654  req.filename = p.string();
655 
656  rosbag_snapshot_msgs::TriggerSnapshotResponse res;
657  if (!client.call(req, res))
658  {
659  ROS_ERROR("Failed to call service");
660  return 1;
661  }
662  if (!res.success)
663  {
664  ROS_ERROR("%s", res.message.c_str());
665  return 1;
666  }
667  return 0;
668  }
670  {
671  ros::ServiceClient client = nh_.serviceClient<std_srvs::SetBool>("enable_snapshot");
672  if (!client.exists())
673  {
674  ROS_ERROR("Service %s does not exist. Is snapshot running in this namespace?", "enable_snapshot");
675  return 1;
676  }
677  std_srvs::SetBoolRequest req;
678  req.data = (opts.action_ == SnapshotterClientOptions::RESUME);
679  std_srvs::SetBoolResponse res;
680  if (!client.call(req, res))
681  {
682  ROS_ERROR("Failed to call service.");
683  return 1;
684  }
685  if (!res.success)
686  {
687  ROS_ERROR("%s", res.message.c_str());
688  return 1;
689  }
690  return 0;
691  }
692  else
693  {
694  ROS_ASSERT_MSG(false, "Invalid value of enum.");
695  return 1;
696  }
697 }
698 
699 } // namespace rosbag_snapshot
rosbag_snapshot::Snapshotter::pause
void pause()
Definition: snapshotter.cpp:486
rosbag_snapshot::Snapshotter::buffers_
buffers_t buffers_
Definition: snapshotter.h:242
rosbag_snapshot::SnapshotterClientOptions::filename_
std::string filename_
Definition: snapshotter.h:304
ros::MessageEvent::getReceiptTime
ros::Time getReceiptTime() const
ros::MultiThreadedSpinner
snapshotter.h
rosbag_snapshot::SnapshotterOptions::topics_
topics_t topics_
Definition: snapshotter.h:146
rosbag_snapshot::Snapshotter::nh_
ros::NodeHandle nh_
Definition: snapshotter.h:249
rosbag::Bag
rosbag_snapshot::SnapshotterClient::run
int run(SnapshotterClientOptions const &opts)
Definition: snapshotter.cpp:622
rosbag_snapshot::SnapshotterOptions::all_topics_
bool all_topics_
Definition: snapshotter.h:135
rosbag_snapshot::SnapshotterTopicOptions
Definition: snapshotter.h:95
rosbag_snapshot::SnapshotterTopicOptions::NO_DURATION_LIMIT
static const ros::Duration NO_DURATION_LIMIT
Definition: snapshotter.h:98
boost::shared_ptr
rosbag_snapshot::SnapshotterTopicOptions::NO_COUNT_LIMIT
static const int32_t NO_COUNT_LIMIT
Definition: snapshotter.h:102
ros::ServiceClient::exists
bool exists()
rosbag_snapshot::MessageQueue::size_
int64_t size_
Definition: snapshotter.h:186
rosbag_snapshot::Snapshotter::publishStatus
void publishStatus(ros::TimerEvent const &e)
Definition: snapshotter.cpp:527
ROS_WARN_THROTTLE
#define ROS_WARN_THROTTLE(period,...)
ros.h
time.h
rosbag_snapshot::Snapshotter::enableCB
bool enableCB(std_srvs::SetBool::Request &req, std_srvs::SetBool::Response &res)
Definition: snapshotter.cpp:503
rosbag_snapshot::MessageQueue::fillStatus
void fillStatus(rosgraph_msgs::TopicStatistics &status)
Definition: snapshotter.cpp:109
rosbag_snapshot::MessageQueue::setSubscriber
void setSubscriber(boost::shared_ptr< ros::Subscriber > sub)
Definition: snapshotter.cpp:104
rosbag_snapshot::SnapshotterClientOptions::TRIGGER_WRITE
@ TRIGGER_WRITE
Definition: snapshotter.h:295
ros::SubscribeOptions::topic
std::string topic
rosbag_snapshot::MessageQueue::pop
SnapshotMessage pop()
Definition: snapshotter.cpp:190
rosbag_snapshot::MessageQueue::clear
void clear()
Definition: snapshotter.cpp:120
ros::NodeHandle::serviceClient
ServiceClient serviceClient(const std::string &service_name, bool persistent=false, const M_string &header_values=M_string())
rosbag_snapshot::MessageQueue::options_
SnapshotterTopicOptions options_
Definition: snapshotter.h:184
rosbag::compression::LZ4
LZ4
TimeBase< Time, Duration >::toSec
double toSec() const
ros::NodeHandle::advertiseService
ServiceServer advertiseService(AdvertiseServiceOptions &ops)
rosbag_snapshot::SnapshotterTopicOptions::duration_limit_
ros::Duration duration_limit_
Definition: snapshotter.h:111
rosbag_snapshot::SnapshotMessage
Definition: snapshotter.h:162
rosbag_snapshot::MessageQueue::preparePush
bool preparePush(int32_t size, ros::Time const &time)
Definition: snapshotter.cpp:140
ros::MultiThreadedSpinner::spin
virtual void spin(CallbackQueue *queue=0)
rosbag_snapshot::SnapshotterOptions
Definition: snapshotter.h:124
rosbag::compression::BZ2
BZ2
rosbag_snapshot::SnapshotterClient::SnapshotterClient
SnapshotterClient()
Definition: snapshotter.cpp:618
rosbag_snapshot::SnapshotterOptions::SnapshotterOptions
SnapshotterOptions(ros::Duration default_duration_limit=ros::Duration(30), int32_t default_memory_limit=-1, int32_t default_count_limit=-1, ros::Duration status_period=ros::Duration(1), bool clear_buffer=true)
Definition: snapshotter.cpp:71
ros::master::getTopics
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
rosbag_snapshot::Snapshotter::options_
SnapshotterOptions options_
Definition: snapshotter.h:240
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
rosbag_snapshot::MessageQueue::getMessageSize
int64_t getMessageSize(SnapshotMessage const &msg) const
Definition: snapshotter.cpp:196
rosbag_snapshot::SnapshotterClientOptions
Definition: snapshotter.h:290
ros::NodeHandle::advertise
Publisher advertise(AdvertiseOptions &ops)
ros::names::resolve
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
f
f
rosbag_snapshot::SnapshotterOptions::status_period_
ros::Duration status_period_
Definition: snapshotter.h:133
rosbag::BagException
rosbag_snapshot::SnapshotterTopicOptions::memory_limit_
int32_t memory_limit_
Definition: snapshotter.h:113
rosbag_snapshot::SnapshotterTopicOptions::INHERIT_DURATION_LIMIT
static const ros::Duration INHERIT_DURATION_LIMIT
Definition: snapshotter.h:104
rosbag_snapshot::SnapshotterClientOptions::PAUSE
@ PAUSE
Definition: snapshotter.h:296
ros::SubscribeOptions::md5sum
std::string md5sum
rosbag_snapshot::SnapshotterClientOptions::topics_
std::vector< std::string > topics_
Definition: snapshotter.h:302
rosbag_snapshot::Snapshotter::~Snapshotter
~Snapshotter()
Definition: snapshotter.cpp:250
rosbag_snapshot::Snapshotter::subscribe
void subscribe(std::string const &topic, boost::shared_ptr< MessageQueue > queue)
Definition: snapshotter.cpp:310
rosbag_snapshot::MessageQueue::rangeFromTimes
range_t rangeFromTimes(ros::Time const &start, ros::Time const &end)
Definition: snapshotter.cpp:226
rosbag_snapshot
Definition: snapshotter.h:55
rosbag_snapshot::SnapshotterTopicOptions::NO_MEMORY_LIMIT
static const int32_t NO_MEMORY_LIMIT
Definition: snapshotter.h:100
rosbag_snapshot::MessageQueue::_push
void _push(SnapshotMessage const &msg)
Definition: snapshotter.cpp:206
rosbag_snapshot::SnapshotMessage::connection_header
boost::shared_ptr< ros::M_string > connection_header
Definition: snapshotter.h:167
rosbag_snapshot::MessageQueue::range_t
std::pair< queue_t::const_iterator, queue_t::const_iterator > range_t
Definition: snapshotter.h:206
rosbag::Bag::setCompression
void setCompression(CompressionType compression)
ros::master::V_TopicInfo
std::vector< TopicInfo > V_TopicInfo
TimeBase< Time, Duration >::isZero
bool isZero() const
rosbag_snapshot::SnapshotterOptions::addTopic
bool addTopic(std::string const &topic, ros::Duration duration_limit=SnapshotterTopicOptions::INHERIT_DURATION_LIMIT, int32_t memory_limit=SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT, int32_t count_limit=SnapshotterTopicOptions::INHERIT_COUNT_LIMIT)
Definition: snapshotter.cpp:82
rosbag_snapshot::Snapshotter::pollTopics
void pollTopics(ros::TimerEvent const &e, rosbag_snapshot::SnapshotterOptions *options)
Definition: snapshotter.cpp:554
rosbag_snapshot::SnapshotterClient::nh_
ros::NodeHandle nh_
Definition: snapshotter.h:317
rosbag::Bag::isOpen
bool isOpen() const
rosbag_snapshot::SnapshotterOptions::default_duration_limit_
ros::Duration default_duration_limit_
Definition: snapshotter.h:127
rosbag_snapshot::SnapshotterOptions::queue_size_
int queue_size_
Definition: snapshotter.h:141
rosbag_snapshot::Snapshotter::recording_
bool recording_
Definition: snapshotter.h:246
rosbag_snapshot::Snapshotter::poll_topic_timer_
ros::Timer poll_topic_timer_
Definition: snapshotter.h:254
rosbag_snapshot::Snapshotter::trigger_snapshot_server_
ros::ServiceServer trigger_snapshot_server_
Definition: snapshotter.h:250
rosbag_snapshot::SnapshotMessage::msg
topic_tools::ShapeShifter::ConstPtr msg
Definition: snapshotter.h:166
ROS_ASSERT_MSG
#define ROS_ASSERT_MSG(cond,...)
ros::ServiceClient
rosbag_snapshot::Snapshotter::enable_server_
ros::ServiceServer enable_server_
Definition: snapshotter.h:251
rosbag_snapshot::Snapshotter::run
int run()
Definition: snapshotter.cpp:581
rosbag_snapshot::SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT
static const int32_t INHERIT_MEMORY_LIMIT
Definition: snapshotter.h:106
ROS_WARN
#define ROS_WARN(...)
rosbag::bagmode::Write
Write
rosbag_snapshot::SnapshotterClientOptions::RESUME
@ RESUME
Definition: snapshotter.h:297
rosbag_snapshot::Snapshotter::Snapshotter
Snapshotter(SnapshotterOptions const &options)
Definition: snapshotter.cpp:245
rosbag_snapshot::MessageQueue::_clear
void _clear()
Definition: snapshotter.cpp:126
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
ros::TimerEvent
rosbag_snapshot::SnapshotterClientOptions::action_
Action action_
Definition: snapshotter.h:300
rosbag_snapshot::Snapshotter::writing_
bool writing_
Definition: snapshotter.h:248
rosbag_snapshot::Snapshotter::writeTopic
bool writeTopic(rosbag::Bag &bag, MessageQueue &message_queue, std::string const &topic, rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
Definition: snapshotter.cpp:327
rosbag_snapshot::Snapshotter::resume
void resume()
Definition: snapshotter.cpp:492
rosbag_snapshot::SnapshotterTopicOptions::count_limit_
int32_t count_limit_
Definition: snapshotter.h:115
rosbag_snapshot::SnapshotterClientOptions::SnapshotterClientOptions
SnapshotterClientOptions()
Definition: snapshotter.cpp:90
rosbag_snapshot::MessageQueue::push
void push(SnapshotMessage const &msg)
Definition: snapshotter.cpp:179
shape_shifter.h
ros::this_node::getName
const ROSCPP_DECL std::string & getName()
start
ROSCPP_DECL void start()
ros::NodeHandle::ok
bool ok() const
rosbag::Bag::write
void write(std::string const &topic, ros::MessageEvent< T > const &event)
rosbag_snapshot::Snapshotter::status_timer_
ros::Timer status_timer_
Definition: snapshotter.h:253
rosbag_snapshot::Snapshotter::status_pub_
ros::Publisher status_pub_
Definition: snapshotter.h:252
rosbag_snapshot::Snapshotter::triggerSnapshotCb
bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
Definition: snapshotter.cpp:385
ros::SubscribeOptions
rosbag_snapshot::Snapshotter::fixTopicOptions
void fixTopicOptions(SnapshotterTopicOptions &options)
Definition: snapshotter.cpp:260
ros::Time
rosbag_snapshot::Snapshotter::clear
void clear()
Definition: snapshotter.cpp:478
rosbag_snapshot::Snapshotter::topicCB
void topicCB(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, boost::shared_ptr< MessageQueue > queue)
Definition: snapshotter.cpp:293
ros::SubscribeOptions::queue_size
uint32_t queue_size
rosbag_snapshot::MessageQueue::_pop
SnapshotMessage _pop()
Definition: snapshotter.cpp:217
rosbag_snapshot::Snapshotter::state_lock_
boost::upgrade_mutex state_lock_
Definition: snapshotter.h:244
rosbag_snapshot::MessageQueue::sub_
boost::shared_ptr< ros::Subscriber > sub_
Definition: snapshotter.h:190
rosbag_snapshot::SnapshotterTopicOptions::SnapshotterTopicOptions
SnapshotterTopicOptions(ros::Duration duration_limit=INHERIT_DURATION_LIMIT, int32_t memory_limit=INHERIT_MEMORY_LIMIT, int32_t count_limit=INHERIT_COUNT_LIMIT)
Definition: snapshotter.cpp:65
rosbag_snapshot::SnapshotMessage::SnapshotMessage
SnapshotMessage(topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
Definition: snapshotter.cpp:94
rosbag_snapshot::MessageQueue::lock
boost::mutex lock
Definition: snapshotter.h:182
rosbag_snapshot::SnapshotterOptions::clear_buffer_
bool clear_buffer_
Definition: snapshotter.h:137
rosbag_snapshot::SnapshotterTopicOptions::INHERIT_COUNT_LIMIT
static const int32_t INHERIT_COUNT_LIMIT
Definition: snapshotter.h:108
ROS_ERROR
#define ROS_ERROR(...)
rosbag::compression::Uncompressed
Uncompressed
ros::ServiceClient::call
bool call(const MReq &req, MRes &resp, const std::string &service_md5sum)
rosbag_snapshot::Snapshotter::timeAsStr
std::string timeAsStr()
Return current local datetime as a string such as 2018-05-22-14-28-51. Used to generate bag filenames...
Definition: snapshotter.cpp:283
rosbag_snapshot::SnapshotterClientOptions::prefix_
std::string prefix_
Definition: snapshotter.h:306
ros::InvalidNameException
ros::MessageEvent::getConnectionHeaderPtr
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
rosbag_snapshot::MessageQueue::MessageQueue
MessageQueue(SnapshotterTopicOptions const &options)
Definition: snapshotter.cpp:100
rosbag_snapshot::SnapshotterOptions::compression_
std::string compression_
Definition: snapshotter.h:139
ros::MessageEvent::getMessage
boost::shared_ptr< M > getMessage() const
ros::SubscribeOptions::datatype
std::string datatype
ros::Publisher::getNumSubscribers
uint32_t getNumSubscribers() const
ros::NodeHandle::getNamespace
const std::string & getNamespace() const
rosbag::Bag::open
void open(std::string const &filename, uint32_t mode=bagmode::Read)
ros::SubscribeOptions::helper
SubscriptionCallbackHelperPtr helper
assert.h
ROS_INFO
#define ROS_INFO(...)
ros::master::TopicInfo
rosbag_snapshot::MessageQueue
Definition: snapshotter.h:176
ros::NodeHandle::createTimer
Timer createTimer(Duration period, const TimerCallback &callback, bool oneshot=false, bool autostart=true) const
ros::Duration
rosbag_snapshot::SnapshotterOptions::default_memory_limit_
int32_t default_memory_limit_
Definition: snapshotter.h:129
ros::MessageEvent
rosbag_snapshot::SnapshotMessage::time
ros::Time time
Definition: snapshotter.h:169
rosbag_snapshot::MessageQueue::duration
ros::Duration duration() const
Definition: snapshotter.cpp:132
rosbag_snapshot::MessageQueue::queue_
queue_t queue_
Definition: snapshotter.h:188
rosbag_snapshot::Snapshotter::postfixFilename
bool postfixFilename(std::string &file)
Definition: snapshotter.cpp:270
stream.h


rosbag_snapshot
Author(s): Kevin Allen
autogenerated on Wed May 14 2025 02:13:18