snapshotter.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2018, Open Source Robotics Foundation, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Open Source Robotics Foundation, Inc. nor the
18 * names of its contributors may be used to endorse or promote products
19 * derived from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 #ifndef ROSBAG_SNAPSHOT_SNAPSHOTTER_H
35 #define ROSBAG_SNAPSHOT_SNAPSHOTTER_H
36 
37 #include <boost/atomic.hpp>
38 #include <boost/thread/mutex.hpp>
39 #include <boost/thread/shared_mutex.hpp>
40 #include <ros/ros.h>
41 #include <ros/time.h>
42 #include <rosbag_snapshot_msgs/TriggerSnapshot.h>
43 #include <std_srvs/SetBool.h>
45 #include <rosgraph_msgs/TopicStatistics.h>
46 #include <rosbag_snapshot_msgs/SnapshotStatus.h>
47 #include <rosbag/bag.h>
48 #include <rosbag/macros.h>
49 #include <deque>
50 #include <map>
51 #include <string>
52 #include <utility>
53 #include <vector>
54 
55 namespace rosbag_snapshot
56 {
58 
59 /* Configuration for a single topic in the Snapshotter node. Holds
60  * the buffer limits for a topic by duration (time difference between newest and oldest message)
61  * and memory usage, in bytes.
62  */
64 {
65  // When the value of duration_limit_, do not truncate the buffer no matter how large the duration is
66  static const ros::Duration NO_DURATION_LIMIT;
67  // When the value of memory_limit_, do not trunctate the buffer no matter how much memory it consumes (DANGROUS)
68  static const int32_t NO_MEMORY_LIMIT;
69  // When the value of count_limit_, do not trunctate the buffer no matter how many message it consumes
70  static const int32_t NO_COUNT_LIMIT;
71  // When the value of duration_limit_, inherit the limit from the node's configured default
72  static const ros::Duration INHERIT_DURATION_LIMIT;
73  // When the value of memory_limit_, inherit the limit from the node's configured default
74  static const int32_t INHERIT_MEMORY_LIMIT;
75  // When the value of count_limit_, inherit the limit from the node's configured default
76  static const int32_t INHERIT_COUNT_LIMIT;
77 
78  // Maximum difference in time from newest and oldest message in buffer before older messages are removed
79  ros::Duration duration_limit_;
80  // Maximum memory usage of the buffer before older messages are removed
81  int32_t memory_limit_;
82  // Maximum number of message in the buffer before older messages are removed
83  int32_t count_limit_;
84 
85  SnapshotterTopicOptions(ros::Duration duration_limit = INHERIT_DURATION_LIMIT,
86  int32_t memory_limit = INHERIT_MEMORY_LIMIT, int32_t count_limit = INHERIT_COUNT_LIMIT);
87 };
88 
89 /* Configuration for the Snapshotter node. Contains default limits for memory and duration
90  * and a map of topics to their limits which may override the defaults.
91  */
93 {
94  // Duration limit to use for a topic's buffer if one is not specified
95  ros::Duration default_duration_limit_;
96  // Memory limit to use for a topic's buffer if one is not specified
97  int32_t default_memory_limit_;
98  // Count limit to use for a topic's buffer if one is not specified
99  int32_t default_count_limit_;
100  // Period between publishing topic status messages. If <= ros::Duration(0), don't publish status
101  ros::Duration status_period_;
102  // Flag if all topics should be recorded
103  bool all_topics_;
104  // Flag for clearing buffers or not after writing to bag
105  bool clear_buffer_;
106  // Compression type
107  std::string compression_;
108  // Queue size for subscriptions
109  int queue_size_;
110 
111 
112  typedef std::map<std::string, SnapshotterTopicOptions> topics_t;
113  // Provides list of topics to snapshot and their limit configurations
114  topics_t topics_;
115 
116  SnapshotterOptions(ros::Duration default_duration_limit = ros::Duration(30), int32_t default_memory_limit = -1,
117  int32_t default_count_limit = -1, ros::Duration status_period = ros::Duration(1),
118  bool clear_buffer = true);
119 
120  // Add a new topic to the configuration, returns false if the topic was already present
121  bool addTopic(std::string const& topic,
123  int32_t memory_limit = SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT,
125 };
126 
127 /* Stores a buffered message of an ambiguous type and it's associated metadata (time of arrival, connection data),
128  * for later writing to disk
129  */
131 {
133  ros::Time _time);
136  // ROS time when messaged arrived (does not use header stamp)
137  ros::Time time;
138 };
139 
140 /* Stores a queue of buffered messages for a single topic ensuring
141  * that the duration and memory limits are respected by truncating
142  * as needed on push() operations.
143  */
145 {
146  friend Snapshotter;
147 
148 private:
149  // Locks access to size_ and queue_
150  boost::mutex lock;
151  // Stores limits on buffer size and duration
152  SnapshotterTopicOptions options_;
153  // Current total size of the queue, in bytes
154  int64_t size_;
155  typedef std::deque<SnapshotMessage> queue_t;
156  queue_t queue_;
157  // Subscriber to the callback which uses this queue
159 
160 public:
161  explicit MessageQueue(SnapshotterTopicOptions const& options);
162  // Add a new message to the internal queue if possible, truncating the front of the queue as needed to enforce limits
163  void push(SnapshotMessage const& msg);
164  // Removes the message at the front of the queue (oldest) and returns it
165  SnapshotMessage pop();
166  // Returns the time difference between back and front of queue, or 0 if size <= 1
167  ros::Duration duration() const;
168  // Clear internal buffer
169  void clear();
170  // Store the subscriber for this topic's queue internaly so it is not deleted
171  void setSubscriber(boost::shared_ptr<ros::Subscriber> sub);
172  // Put data about oldest/newest message time, message count, and buffersize into status message
173  void fillStatus(rosgraph_msgs::TopicStatistics& status);
174  typedef std::pair<queue_t::const_iterator, queue_t::const_iterator> range_t;
175  // Get a begin and end iterator into the buffer respecting the start and end timestamp constraints
176  range_t rangeFromTimes(ros::Time const& start, ros::Time const& end);
177 
178  // Return the total message size including the meta-information
179  int64_t getMessageSize(SnapshotMessage const& msg) const;
180 
181 private:
182  // Internal push whitch does not obtain lock
183  void _push(SnapshotMessage const& msg);
184  // Internal pop which does not obtain lock
185  SnapshotMessage _pop();
186  // Internal clear which does not obtain lock
187  void _clear();
188  // Truncate front of queue as needed to fit a new message of specified size and time. Returns False if this is
189  // impossible.
190  bool preparePush(int32_t size, ros::Time const& time);
191 };
192 
193 /* Snapshotter node. Maintains a circular buffer of the most recent messages from configured topics
194  * while enforcing limits on memory and duration. The node can be triggered to write some or all
195  * of these buffers to a bag file via a service call. Useful in live testing scenerios where interesting
196  * data may be produced before a user has the oppurtunity to "rosbag record" the data.
197  */
199 {
200 public:
201  explicit Snapshotter(SnapshotterOptions const& options);
202  ~Snapshotter();
203 
204  // Sets up callbacks and spins until node is killed
205  int run();
206 
207 private:
208  SnapshotterOptions options_;
209  typedef std::map<std::string, boost::shared_ptr<MessageQueue> > buffers_t;
210  buffers_t buffers_;
211  // Locks recording_ and writing_ states.
212  boost::upgrade_mutex state_lock_;
213  // True if new messages are being written to the internal buffer
214  bool recording_;
215  // True if currently writing buffers to a bag file
216  bool writing_;
217  ros::NodeHandle nh_;
218  ros::ServiceServer trigger_snapshot_server_;
219  ros::ServiceServer enable_server_;
220  ros::Publisher status_pub_;
221  ros::Timer status_timer_;
222  ros::Timer poll_topic_timer_;
223 
224  // Replace individual topic limits with node defaults if they are flagged for it (see SnapshotterTopicOptions)
225  void fixTopicOptions(SnapshotterTopicOptions& options);
226  // If file is "prefix" mode (doesn't end in .bag), append current datetime and .bag to end
227  bool postfixFilename(std::string& file);
229  std::string timeAsStr();
230  // Clear the internal buffers of all topics. Used when resuming after a pause to avoid time gaps
231  void clear();
232  // Subscribe to one of the topics, setting up the callback to add to the respective queue
233  void subscribe(std::string const& topic, boost::shared_ptr<MessageQueue> queue);
234  // Called on new message from any configured topic. Adds to queue for that topic
235  void topicCB(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event,
237  // Service callback, write all of part of the internal buffers to a bag file according to request parameters
238  bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
239  rosbag_snapshot_msgs::TriggerSnapshot::Response& res);
240  // Service callback, enable or disable recording (storing new messages into queue). Used to pause before writing
241  bool enableCB(std_srvs::SetBool::Request& req, std_srvs::SetBool::Response& res);
242  // Set recording_ to false and do nessesary cleaning, CALLER MUST OBTAIN LOCK
243  void pause();
244  // Set recording_ to true and do nesessary cleaning, CALLER MUST OBTAIN LOCK
245  void resume();
246  // Publish status containing statistics of currently buffered topics and other state
247  void publishStatus(ros::TimerEvent const& e);
248  // Poll master for new topics
249  void pollTopics(ros::TimerEvent const& e, rosbag_snapshot::SnapshotterOptions *options);
250  // Write the parts of message_queue within the time constraints of req to the queue
251  // If returns false, there was an error opening/writing the bag and an error message was written to res.message
252  bool writeTopic(rosbag::Bag& bag, MessageQueue& message_queue, std::string const& topic,
253  rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
254  rosbag_snapshot_msgs::TriggerSnapshot::Response& res);
255 };
256 
257 // Configuration for SnapshotterClient
259 {
261  enum Action
262  {
263  TRIGGER_WRITE,
264  PAUSE,
265  RESUME
266  };
267  // What to do when SnapshotterClient.run is called
268  Action action_;
269  // List of topics to write when action_ == TRIGGER_WRITE. If empty, write all buffered topics.
270  std::vector<std::string> topics_;
271  // Name of file to write to when action_ == TRIGGER_WRITE, relative to snapshot node. If empty, use prefix
272  std::string filename_;
273  // Prefix of the name of file written to when action_ == TRIGGER_WRITE.
274  std::string prefix_;
275 };
276 
277 // Node used to call services which interface with the snapshotter node to trigger write, pause, and resume
278 class ROSBAG_DECL SnapshotterClient
279 {
280 public:
281  SnapshotterClient();
282  int run(SnapshotterClientOptions const& opts);
283 
284 private:
285  ros::NodeHandle nh_;
286 };
287 
288 } // namespace rosbag_snapshot
289 
290 #endif // ROSBAG_SNAPSHOT_SNAPSHOTTER_H
rosbag::Bag
ros::Publisher
run
void run(class_loader::ClassLoader *loader)
rosbag_snapshot::SnapshotterTopicOptions
Definition: snapshotter.h:95
boost::shared_ptr< ShapeShifter const >
ros.h
time.h
rosbag_snapshot::SnapshotMessage
Definition: snapshotter.h:162
rosbag_snapshot::SnapshotterOptions
Definition: snapshotter.h:124
bag.h
rosbag_snapshot::SnapshotterClientOptions
Definition: snapshotter.h:290
ros::ServiceServer
rosbag_snapshot::SnapshotterTopicOptions::INHERIT_DURATION_LIMIT
static const ros::Duration INHERIT_DURATION_LIMIT
Definition: snapshotter.h:104
rosbag_snapshot
Definition: snapshotter.h:55
rosbag_snapshot::MessageQueue::range_t
std::pair< queue_t::const_iterator, queue_t::const_iterator > range_t
Definition: snapshotter.h:206
rosbag_snapshot::SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT
static const int32_t INHERIT_MEMORY_LIMIT
Definition: snapshotter.h:106
subscribe
void subscribe()
ros::TimerEvent
shape_shifter.h
ros::Time
ROSBAG_DECL
#define ROSBAG_DECL
rosbag_snapshot::SnapshotterTopicOptions::INHERIT_COUNT_LIMIT
static const int32_t INHERIT_COUNT_LIMIT
Definition: snapshotter.h:108
rosbag_snapshot::Snapshotter
class ROSBAG_DECL Snapshotter
Definition: snapshotter.h:89
macros.h
rosbag_snapshot::MessageQueue
Definition: snapshotter.h:176
ros::Duration
ros::Timer
ros::NodeHandle
ros::MessageEvent
rosbag_snapshot::Snapshotter
Definition: snapshotter.h:230


rosbag_snapshot
Author(s): Kevin Allen
autogenerated on Wed May 14 2025 02:13:18