test_snapshot.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2018, Open Source Robotics Foundation, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Open Source Robotics Foundation, Inc. nor the
18 # names of its contributors may be used to endorse or promote products
19 # derived from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import tempfile
37 import unittest
38 import rospy
39 from rosbag import Bag
40 from std_srvs.srv import SetBool
41 from rosbag_snapshot_msgs.msg import SnapshotStatus
42 from rosbag_snapshot_msgs.srv import TriggerSnapshot, TriggerSnapshotRequest, TriggerSnapshotResponse
43 
44 
45 class TestRosbagSnapshot(unittest.TestCase):
46  '''
47  Tests the "rosbag_snapshot" command.
48  Relies on the nodes launched in snapshot.test
49  '''
50  def __init__(self, *args):
51  self.params = rospy.get_param("snapshot")
52  self._parse_params(self.params)
53  self.last_status = None
54  self.status_sub = rospy.Subscriber("snapshot_status", SnapshotStatus, self._status_cb, queue_size=5)
55  self.trigger = rospy.ServiceProxy("trigger_snapshot", TriggerSnapshot)
56  self.enable = rospy.ServiceProxy("enable_snapshot", SetBool)
57  super(TestRosbagSnapshot, self).__init__(*args)
58 
59  def _parse_params(self, params):
60  '''
61  Parse launch parameters of snapshot to cache the topic limits in a map
62  so it is easier to check compliance in later tests.
63  '''
64  self.topic_limits = {}
65  self.default_duration_limit = params['default_duration_limit']
66  self.default_memory_limit = params['default_memory_limit']
67  for topic_obj in self.params['topics']:
68  duration = self.default_duration_limit
69  memory = self.default_memory_limit
70  if type(topic_obj) == dict:
71  topic = list(topic_obj.keys())[0]
72  duration = topic_obj[topic].get('duration', duration)
73  memory = topic_obj[topic].get('memory', memory)
74  else:
75  topic = topic_obj
76  topic = rospy.resolve_name(topic)
77  duration = rospy.Duration(duration)
78  memory = 1E6 * memory
79  self.topic_limits[topic] = (duration, memory)
80 
81  def _status_cb(self, msg):
82  self.last_status = msg
83 
84  def _assert_no_data(self, topics=[]):
85  '''
86  Asserts that calling TriggerWrite service with
87  specifed parameters responds non-success and did not create
88  a bag file.
89  '''
90  filename = tempfile.mktemp()
91  res = self.trigger(filename=filename, topics=topics)
92  self.assertFalse(res.success)
93  self.assertEqual(res.message, TriggerSnapshotResponse.NO_DATA_MESSAGE)
94  self.assertFalse(os.path.isfile(filename))
95 
96  def _assert_record_success(self, data):
97  '''
98  Assert that the recording SetBool service responds with success
99  '''
100  res = self.enable(data)
101  self.assertTrue(res.success)
102  self.assertEqual(res.message, "")
103 
104  def _pause(self):
105  self._assert_record_success(False)
106 
107  def _resume(self):
108  self._assert_record_success(True)
109 
110  def _assert_write_success(self, topics=[], prefix_mode=False, **kwargs):
111  '''
112  Asserts that the TriggerWrite services succeeds for the specified request arguments
113  and that the specified bag file is actually created
114  @param prefix_mode: If True, don't put .bag at the end of reqest to check prefix filename mode
115  '''
116  if prefix_mode:
117  d = tempfile.mkdtemp()
118  filename = tempfile.mktemp(dir=d)
119  else:
120  filename = tempfile.mktemp(suffix='.bag')
121  req = TriggerSnapshotRequest(filename=filename, topics=topics, **kwargs)
122  res = self.trigger(req)
123  self.assertTrue(res.success, msg="snapshot should have succeeded. message: {}".format(res.message))
124  self.assertTrue(res.message == "")
125  if prefix_mode:
126  dircontents = os.listdir(d)
127  self.assertEqual(len(dircontents), 1)
128  filename = os.path.join(d, dircontents[0])
129  self.assertEqual(filename[-4:], '.bag')
130  self.assertTrue(os.path.isfile(filename))
131  return filename
132 
133  def _assert_limits_enforced(self, test_topic, duration, memory):
134  '''
135  Asserts that the measured duration and memory for a topic comply with the launch parameters
136  @param topic: string
137  @param duration: rospy.Duration, age of buffer
138  @param memory: integer, bytes of memory used
139  '''
140  test_topic = rospy.resolve_name(test_topic)
141  self.assertIn(test_topic, self.topic_limits)
142  limits = self.topic_limits[test_topic]
143  if limits[0] > rospy.Duration():
144  self.assertLessEqual(duration, limits[0])
145  if limits[1] > 0:
146  self.assertLessEqual(memory, limits[1])
147 
149  '''
150  Check that a status message contains info on all subscribed topics
151  and reports that their buffer complies with the configured limits.
152  '''
153  self.assertIsNotNone(self.last_status) # A message was recieved
154  topics = [msg.topic for msg in self.last_status.topics]
155  # Oneliners :)
156  status_topics = [rospy.resolve_name(list(topic.keys())[0] if type(topic) == dict else topic)
157  for topic in self.params['topics']]
158  self.assertEquals(set(topics), set(status_topics)) # Topics from params are same as topics in status message
159  for topic in self.last_status.topics:
160  duration = topic.window_stop - topic.window_start
161  memory = topic.traffic
162  self._assert_limits_enforced(topic.topic, duration, memory)
163 
164  def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None):
165  '''
166  Open the bagfile at the specified filename and read it to ensure topic limits were
167  enforced and the optional topic list and start/stop times are also enforced.
168  '''
169  bag = Bag(filename)
170  topics_dict = bag.get_type_and_topic_info()[1]
171  bag_topics = set(topics_dict.keys())
172  param_topics = set(self.topic_limits.keys())
173  if topics:
174  self.assertEqual(bag_topics, set(topics))
175  self.assertTrue(bag_topics.issubset(param_topics))
176  for topic in topics_dict:
177  size = topics_dict[topic].message_count * 8 # Calculate stored message size as each message is 8 bytes
178  gen = bag.read_messages(topics=topic)
179  _, _, first_time = next(gen)
180  last_time = first_time # in case the next for loop does not execute
181  if start_time:
182  self.assertGreaterEqual(first_time, start_time)
183  for _, _, last_time in gen: # Read through all messages so last_time is valid
184  pass
185  if stop_time:
186  self.assertLessEqual(last_time, stop_time)
187  duration = last_time - first_time
188  self._assert_limits_enforced(topic, duration, size)
189 
191  '''
192  Check that both services provided by snapshot exist.
193  '''
194  self.trigger.wait_for_service()
195  self.enable.wait_for_service()
196 
197  def test_write_all(self):
198  '''
199  Wait long enough for memory & duration limits to need to be used
200  '''
201  rospy.sleep(3.0) # Give some time to fill buffers to maximums
202  self._assert_status_valid()
203  filename = self._assert_write_success(prefix_mode=True)
204  self._assert_bag_valid(filename)
205 
207  '''
208  Test the more advanced features: pausing and resuming, specific write times, and specific topic list.
209  '''
210  # Pause, resume, and pause again so buffer should only contain data from a known time internal
211  self._pause()
212  rospy.sleep(1.5)
213  start = rospy.Time.now()
214  self._resume()
215  rospy.sleep(3.0)
216  self._pause()
217  stop = rospy.Time.now()
218  rospy.sleep(1.0)
219 
220  # Write all buffer data, check that only data from resumed interval is present
221  filename = self._assert_write_success()
222  self._assert_bag_valid(filename, start_time=start, stop_time=stop)
223 
224  # With recording still paused (same buffer as last write), write only a shorter time range
225  cropped_start = start + rospy.Duration(0.5)
226  cropped_stop = stop - rospy.Duration(1.0)
227  filename = self._assert_write_success(start_time=cropped_start, stop_time=cropped_stop)
228  self._assert_bag_valid(filename, start_time=cropped_start, stop_time=cropped_stop)
229 
230  # Write only specific topics and ensure that only those are present in bag file
231  specific_topics = [rospy.resolve_name(topic) for topic in ['/test2', 'test1']]
232  filename = self._assert_write_success(topics=specific_topics)
233  self._assert_bag_valid(filename, topics=specific_topics)
234 
235  # Resume recording for other tests
236  self._resume()
237 
239  '''
240  Test that an invalid topic or one not subscribed to fails
241  '''
242  self._assert_no_data(['_invalid_graph_name'])
243  self._assert_no_data(['/test4'])
244 
245 
246 if __name__ == '__main__':
247  import rostest
248  PKG = 'rosbag'
249  rospy.init_node('test_rosbag_snapshot', anonymous=True)
250  rostest.run(PKG, 'TestRosbagSnapshot', TestRosbagSnapshot, sys.argv)
def _assert_no_data(self, topics=[])
def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None)
def _assert_limits_enforced(self, test_topic, duration, memory)
def _assert_write_success(self, topics=[], prefix_mode=False, kwargs)
def _assert_record_success(self, data)


rosbag_snapshot
Author(s): Kevin Allen
autogenerated on Mon Jan 18 2021 03:45:11