snapshotter.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2018, Open Source Robotics Foundation, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Open Source Robotics Foundation, Inc. nor the
18 * names of its contributors may be used to endorse or promote products
19 * derived from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 #ifndef ROSBAG_SNAPSHOT_SNAPSHOTTER_H
35 #define ROSBAG_SNAPSHOT_SNAPSHOTTER_H
36 
37 #include <boost/atomic.hpp>
38 #include <boost/thread/mutex.hpp>
39 #include <boost/thread/shared_mutex.hpp>
40 #include <ros/ros.h>
41 #include <ros/time.h>
42 #include <rosbag_snapshot_msgs/TriggerSnapshot.h>
43 #include <std_srvs/SetBool.h>
45 #include <rosgraph_msgs/TopicStatistics.h>
46 #include <rosbag_snapshot_msgs/SnapshotStatus.h>
47 #include <rosbag/bag.h>
48 #include <rosbag/macros.h>
49 #include <deque>
50 #include <map>
51 #include <string>
52 #include <utility>
53 #include <vector>
54 
55 namespace rosbag_snapshot
56 {
58 
59 /* Configuration for a single topic in the Snapshotter node. Holds
60  * the buffer limits for a topic by duration (time difference between newest and oldest message)
61  * and memory usage, in bytes.
62  */
64 {
65  // When the value of duration_limit_, do not truncate the buffer no matter how large the duration is
67  // When the value of memory_limit_, do not trunctate the buffer no matter how much memory it consumes (DANGROUS)
68  static const int32_t NO_MEMORY_LIMIT;
69  // When the value of duration_limit_, inherit the limit from the node's configured default
71  // When the value of memory_limit_, inherit the limit from the node's configured default
72  static const int32_t INHERIT_MEMORY_LIMIT;
73 
74  // Maximum difference in time from newest and oldest message in buffer before older messages are removed
76  // Maximum memory usage of the buffer before older messages are removed
77  int32_t memory_limit_;
78 
79  SnapshotterTopicOptions(ros::Duration duration_limit = INHERIT_DURATION_LIMIT,
80  int32_t memory_limit = INHERIT_MEMORY_LIMIT);
81 };
82 
83 /* Configuration for the Snapshotter node. Contains default limits for memory and duration
84  * and a map of topics to their limits which may override the defaults.
85  */
87 {
88  // Duration limit to use for a topic's buffer if one is not specified
90  // Memory limit to use for a topic's buffer if one is not specified
92  // Period between publishing topic status messages. If <= ros::Duration(0), don't publish status
94  // Flag if all topics should be recorded
96 
97  typedef std::map<std::string, SnapshotterTopicOptions> topics_t;
98  // Provides list of topics to snapshot and their limit configurations
99  topics_t topics_;
100 
101  SnapshotterOptions(ros::Duration default_duration_limit = ros::Duration(30), int32_t default_memory_limit = -1,
102  ros::Duration status_period = ros::Duration(1));
103 
104  // Add a new topic to the configuration, returns false if the topic was already present
105  bool addTopic(std::string const& topic,
107  int32_t memory_limit = SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT);
108 };
109 
110 /* Stores a buffered message of an ambiguous type and it's associated metadata (time of arrival, connection data),
111  * for later writing to disk
112  */
114 {
116  ros::Time _time);
119  // ROS time when messaged arrived (does not use header stamp)
121 };
122 
123 /* Stores a queue of buffered messages for a single topic ensuring
124  * that the duration and memory limits are respected by truncating
125  * as needed on push() operations.
126  */
128 {
129  friend Snapshotter;
130 
131 private:
132  // Locks access to size_ and queue_
133  boost::mutex lock;
134  // Stores limits on buffer size and duration
136  // Current total size of the queue, in bytes
137  int64_t size_;
138  typedef std::deque<SnapshotMessage> queue_t;
139  queue_t queue_;
140  // Subscriber to the callback which uses this queue
142 
143 public:
144  explicit MessageQueue(SnapshotterTopicOptions const& options);
145  // Add a new message to the internal queue if possible, truncating the front of the queue as needed to enforce limits
146  void push(SnapshotMessage const& msg);
147  // Removes the message at the front of the queue (oldest) and returns it
148  SnapshotMessage pop();
149  // Returns the time difference between back and front of queue, or 0 if size <= 1
150  ros::Duration duration() const;
151  // Clear internal buffer
152  void clear();
153  // Store the subscriber for this topic's queue internaly so it is not deleted
154  void setSubscriber(boost::shared_ptr<ros::Subscriber> sub);
155  // Put data about oldest/newest message time, message count, and buffersize into status message
156  void fillStatus(rosgraph_msgs::TopicStatistics& status);
157  typedef std::pair<queue_t::const_iterator, queue_t::const_iterator> range_t;
158  // Get a begin and end iterator into the buffer respecting the start and end timestamp constraints
159  range_t rangeFromTimes(ros::Time const& start, ros::Time const& end);
160 
161 private:
162  // Internal push whitch does not obtain lock
163  void _push(SnapshotMessage const& msg);
164  // Internal pop which does not obtain lock
165  SnapshotMessage _pop();
166  // Internal clear which does not obtain lock
167  void _clear();
168  // Truncate front of queue as needed to fit a new message of specified size and time. Returns False if this is
169  // impossible.
170  bool preparePush(int32_t size, ros::Time const& time);
171 };
172 
173 /* Snapshotter node. Maintains a circular buffer of the most recent messages from configured topics
174  * while enforcing limits on memory and duration. The node can be triggered to write some or all
175  * of these buffers to a bag file via a service call. Useful in live testing scenerios where interesting
176  * data may be produced before a user has the oppurtunity to "rosbag record" the data.
177  */
179 {
180 public:
181  explicit Snapshotter(SnapshotterOptions const& options);
182  ~Snapshotter();
183 
184  // Sets up callbacks and spins until node is killed
185  int run();
186 
187 private:
188  // Subscribe queue size for each topic
189  static const int QUEUE_SIZE;
191  typedef std::map<std::string, boost::shared_ptr<MessageQueue> > buffers_t;
192  buffers_t buffers_;
193  // Locks recording_ and writing_ states.
194  boost::upgrade_mutex state_lock_;
195  // True if new messages are being written to the internal buffer
197  // True if currently writing buffers to a bag file
198  bool writing_;
205 
206  // Replace individual topic limits with node defaults if they are flagged for it (see SnapshotterTopicOptions)
207  void fixTopicOptions(SnapshotterTopicOptions& options);
208  // If file is "prefix" mode (doesn't end in .bag), append current datetime and .bag to end
209  bool postfixFilename(std::string& file);
211  std::string timeAsStr();
212  // Clear the internal buffers of all topics. Used when resuming after a pause to avoid time gaps
213  void clear();
214  // Subscribe to one of the topics, setting up the callback to add to the respective queue
215  void subscribe(std::string const& topic, boost::shared_ptr<MessageQueue> queue);
216  // Called on new message from any configured topic. Adds to queue for that topic
217  void topicCB(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event,
219  // Service callback, write all of part of the internal buffers to a bag file according to request parameters
220  bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
221  rosbag_snapshot_msgs::TriggerSnapshot::Response& res);
222  // Service callback, enable or disable recording (storing new messages into queue). Used to pause before writing
223  bool enableCB(std_srvs::SetBool::Request& req, std_srvs::SetBool::Response& res);
224  // Set recording_ to false and do nessesary cleaning, CALLER MUST OBTAIN LOCK
225  void pause();
226  // Set recording_ to true and do nesessary cleaning, CALLER MUST OBTAIN LOCK
227  void resume();
228  // Publish status containing statistics of currently buffered topics and other state
229  void publishStatus(ros::TimerEvent const& e);
230  // Poll master for new topics
231  void pollTopics(ros::TimerEvent const& e, rosbag_snapshot::SnapshotterOptions *options);
232  // Write the parts of message_queue within the time constraints of req to the queue
233  // If returns false, there was an error opening/writing the bag and an error message was written to res.message
234  bool writeTopic(rosbag::Bag& bag, MessageQueue& message_queue, std::string const& topic,
235  rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
236  rosbag_snapshot_msgs::TriggerSnapshot::Response& res);
237 };
238 
239 // Configuration for SnapshotterClient
241 {
243  enum Action
244  {
247  RESUME
248  };
249  // What to do when SnapshotterClient.run is called
251  // List of topics to write when action_ == TRIGGER_WRITE. If empty, write all buffered topics.
252  std::vector<std::string> topics_;
253  // Name of file to write to when action_ == TRIGGER_WRITE, relative to snapshot node. If empty, use prefix
254  std::string filename_;
255  // Prefix of the name of file written to when action_ == TRIGGER_WRITE.
256  std::string prefix_;
257 };
258 
259 // Node used to call services which interface with the snapshotter node to trigger write, pause, and resume
261 {
262 public:
264  int run(SnapshotterClientOptions const& opts);
265 
266 private:
268 };
269 
270 } // namespace rosbag_snapshot
271 
272 #endif // ROSBAG_SNAPSHOT_SNAPSHOTTER_H
boost::shared_ptr< ros::M_string > connection_header
Definition: snapshotter.h:118
boost::shared_ptr< ros::Subscriber > sub_
Definition: snapshotter.h:141
ros::ServiceServer enable_server_
Definition: snapshotter.h:201
std::map< std::string, boost::shared_ptr< MessageQueue > > buffers_t
Definition: snapshotter.h:191
static const int32_t INHERIT_MEMORY_LIMIT
Definition: snapshotter.h:72
static const ros::Duration NO_DURATION_LIMIT
Definition: snapshotter.h:66
topic_tools::ShapeShifter::ConstPtr msg
Definition: snapshotter.h:117
boost::upgrade_mutex state_lock_
Definition: snapshotter.h:194
SnapshotterOptions options_
Definition: snapshotter.h:190
void subscribe()
std::map< std::string, SnapshotterTopicOptions > topics_t
Definition: snapshotter.h:97
std::pair< queue_t::const_iterator, queue_t::const_iterator > range_t
Definition: snapshotter.h:157
#define ROSBAG_DECL
SnapshotterTopicOptions options_
Definition: snapshotter.h:135
std::vector< std::string > topics_
Definition: snapshotter.h:252
static const int QUEUE_SIZE
Definition: snapshotter.h:189
static const ros::Duration INHERIT_DURATION_LIMIT
Definition: snapshotter.h:70
std::deque< SnapshotMessage > queue_t
Definition: snapshotter.h:138
class ROSBAG_DECL Snapshotter
Definition: snapshotter.h:57
ros::ServiceServer trigger_snapshot_server_
Definition: snapshotter.h:200


rosbag_snapshot
Author(s): Kevin Allen
autogenerated on Mon Jan 18 2021 03:45:11