test_snapshot.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2018, Open Source Robotics Foundation, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Open Source Robotics Foundation, Inc. nor the
18 # names of its contributors may be used to endorse or promote products
19 # derived from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import tempfile
37 import unittest
38 import rospy
39 from rosbag import Bag
40 from std_srvs.srv import SetBool
41 from rosbag_snapshot_msgs.msg import SnapshotStatus
42 from rosbag_snapshot_msgs.srv import TriggerSnapshot, TriggerSnapshotRequest, TriggerSnapshotResponse
43 
44 
45 class TestRosbagSnapshot(unittest.TestCase):
46  '''
47  Tests the "rosbag_snapshot" command.
48  Relies on the nodes launched in snapshot.test
49  '''
50 
51  def __init__(self, *args):
52  self.params = rospy.get_param("snapshot")
53  self._parse_params(self.params)
54  self.last_status = None
55  self.status_sub = rospy.Subscriber("snapshot_status", SnapshotStatus, self._status_cb, queue_size=5)
56  self.trigger = rospy.ServiceProxy("trigger_snapshot", TriggerSnapshot)
57  self.enable = rospy.ServiceProxy("enable_snapshot", SetBool)
58  super(TestRosbagSnapshot, self).__init__(*args)
59 
60  def _parse_params(self, params):
61  '''
62  Parse launch parameters of snapshot to cache the topic limits in a map
63  so it is easier to check compliance in later tests.
64  '''
65  self.topic_limits = {}
66  self.default_duration_limit = params['default_duration_limit']
67  self.default_memory_limit = params['default_memory_limit']
68  self.default_count_limit = params['default_count_limit']
69  for topic_obj in self.params['topics']:
70  duration = self.default_duration_limit
71  memory = self.default_memory_limit
72  count = self.default_count_limit
73  if type(topic_obj) == dict:
74  topic = list(topic_obj.keys())[0]
75  duration = topic_obj[topic].get('duration', duration)
76  memory = topic_obj[topic].get('memory', memory)
77  count = topic_obj[topic].get('count', count)
78  else:
79  topic = topic_obj
80  topic = rospy.resolve_name(topic)
81  duration = rospy.Duration(duration)
82  memory = 1E6 * memory
83  self.topic_limits[topic] = (duration, memory, count)
84 
85  def _status_cb(self, msg):
86  self.last_status = msg
87 
88  def _assert_no_data(self, topics=[]):
89  '''
90  Asserts that calling TriggerWrite service with
91  specifed parameters responds non-success and did not create
92  a bag file.
93  '''
94  filename = tempfile.mktemp()
95  res = self.trigger(filename=filename, topics=topics)
96  self.assertFalse(res.success)
97  self.assertEqual(res.message, TriggerSnapshotResponse.NO_DATA_MESSAGE)
98  self.assertFalse(os.path.isfile(filename))
99 
100  def _assert_record_success(self, data):
101  '''
102  Assert that the recording SetBool service responds with success
103  '''
104  res = self.enable(data)
105  self.assertTrue(res.success)
106  self.assertEqual(res.message, "")
107 
108  def _pause(self):
109  self._assert_record_success(False)
110 
111  def _resume(self):
112  self._assert_record_success(True)
113 
114  def _assert_write_success(self, topics=[], prefix_mode=False, **kwargs):
115  '''
116  Asserts that the TriggerWrite services succeeds for the specified request arguments
117  and that the specified bag file is actually created
118  @param prefix_mode: If True, don't put .bag at the end of reqest to check prefix filename mode
119  '''
120  if prefix_mode:
121  d = tempfile.mkdtemp()
122  filename = tempfile.mktemp(dir=d)
123  else:
124  filename = tempfile.mktemp(suffix='.bag')
125  req = TriggerSnapshotRequest(filename=filename, topics=topics, **kwargs)
126  res = self.trigger(req)
127  self.assertTrue(res.success, msg="snapshot should have succeeded. message: {}".format(res.message))
128  self.assertTrue(res.message == "")
129  if prefix_mode:
130  dircontents = os.listdir(d)
131  self.assertEqual(len(dircontents), 1)
132  filename = os.path.join(d, dircontents[0])
133  self.assertEqual(filename[-4:], '.bag')
134  self.assertTrue(os.path.isfile(filename))
135  return filename
136 
137  def _assert_limits_enforced(self, test_topic, duration, memory, count):
138  '''
139  Asserts that the measured duration and memory for a topic comply with the launch parameters
140  @param topic: string
141  @param duration: rospy.Duration, age of buffer
142  @param memory: integer, bytes of memory used
143  @param count: integer, number of messages stored
144  '''
145  test_topic = rospy.resolve_name(test_topic)
146  self.assertIn(test_topic, self.topic_limits)
147  limits = self.topic_limits[test_topic]
148  if limits[0] > rospy.Duration():
149  self.assertLessEqual(duration, limits[0])
150  if limits[1] > 0:
151  self.assertLessEqual(memory, limits[1])
152  if limits[2] > 0:
153  self.assertLessEqual(count, limits[2])
154 
156  '''
157  Check that a status message contains info on all subscribed topics
158  and reports that their buffer complies with the configured limits.
159  '''
160  self.assertIsNotNone(self.last_status) # A message was recieved
161  topics = [msg.topic for msg in self.last_status.topics]
162  # Oneliners :)
163  status_topics = [rospy.resolve_name(list(topic.keys())[0] if type(topic) == dict else topic)
164  for topic in self.params['topics']]
165  self.assertEquals(set(topics), set(status_topics)) # Topics from params are same as topics in status message
166  for topic in self.last_status.topics:
167  duration = topic.window_stop - topic.window_start
168  memory = topic.traffic
169  count = topic.delivered_msgs
170  self._assert_limits_enforced(topic.topic, duration, memory, count)
171 
172  def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None):
173  '''
174  Open the bagfile at the specified filename and read it to ensure topic limits were
175  enforced and the optional topic list and start/stop times are also enforced.
176  '''
177  bag = Bag(filename)
178  topics_dict = bag.get_type_and_topic_info()[1]
179  bag_topics = set(topics_dict.keys())
180  param_topics = set(self.topic_limits.keys())
181  if topics:
182  self.assertEqual(bag_topics, set(topics))
183  self.assertTrue(bag_topics.issubset(param_topics))
184  for topic in topics_dict:
185  size = topics_dict[topic].message_count * 8 # Calculate stored message size as each message is 8 bytes
186  count = topics_dict[topic].message_count
187  gen = bag.read_messages(topics=topic)
188  _, _, first_time = next(gen)
189  last_time = first_time # in case the next for loop does not execute
190  if start_time:
191  self.assertGreaterEqual(first_time, start_time)
192  for _, _, last_time in gen: # Read through all messages so last_time is valid
193  pass
194  if stop_time:
195  self.assertLessEqual(last_time, stop_time)
196  duration = last_time - first_time
197  self._assert_limits_enforced(topic, duration, size, count)
198 
200  '''
201  Check that both services provided by snapshot exist.
202  '''
203  self.trigger.wait_for_service()
204  self.enable.wait_for_service()
205 
206  def test_write_all(self):
207  '''
208  Wait long enough for memory & duration limits to need to be used
209  '''
210  rospy.sleep(3.0) # Give some time to fill buffers to maximums
211  self._assert_status_valid()
212  filename = self._assert_write_success(prefix_mode=True)
213  self._assert_bag_valid(filename)
214 
216  '''
217  Test the more advanced features: pausing and resuming, specific write times, and specific topic list.
218  '''
219  # Pause, resume, and pause again so buffer should only contain data from a known time internal
220  self._pause()
221  rospy.sleep(1.5)
222  start = rospy.Time.now()
223  self._resume()
224  rospy.sleep(3.0)
225  self._pause()
226  stop = rospy.Time.now()
227  rospy.sleep(1.0)
228 
229  # Write all buffer data, check that only data from resumed interval is present
230  filename = self._assert_write_success()
231  self._assert_bag_valid(filename, start_time=start, stop_time=stop)
232 
233  # With recording still paused (same buffer as last write), write only a shorter time range
234  cropped_start = start + rospy.Duration(0.5)
235  cropped_stop = stop - rospy.Duration(1.0)
236  filename = self._assert_write_success(start_time=cropped_start, stop_time=cropped_stop)
237  self._assert_bag_valid(filename, start_time=cropped_start, stop_time=cropped_stop)
238 
239  # Write only specific topics and ensure that only those are present in bag file
240  specific_topics = [rospy.resolve_name(topic) for topic in ['/test2', 'test1']]
241  filename = self._assert_write_success(topics=specific_topics)
242  self._assert_bag_valid(filename, topics=specific_topics)
243 
244  # Resume recording for other tests
245  self._resume()
246 
248  '''
249  Test that an invalid topic or one not subscribed to fails
250  '''
251  self._assert_no_data(['_invalid_graph_name'])
252  self._assert_no_data(['/test5'])
253 
254 
255 if __name__ == '__main__':
256  import rostest
257  PKG = 'rosbag'
258  rospy.init_node('test_rosbag_snapshot', anonymous=True)
259  rostest.run(PKG, 'TestRosbagSnapshot', TestRosbagSnapshot, sys.argv)
def _assert_no_data(self, topics=[])
def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None)
def _assert_limits_enforced(self, test_topic, duration, memory, count)
def _assert_write_success(self, topics=[], prefix_mode=False, kwargs)
def _assert_record_success(self, data)


rosbag_snapshot
Author(s): Kevin Allen
autogenerated on Mon May 22 2023 02:10:47