39 from rosbag
import Bag
40 from std_srvs.srv
import SetBool
41 from rosbag_snapshot_msgs.msg
import SnapshotStatus
42 from rosbag_snapshot_msgs.srv
import TriggerSnapshot, TriggerSnapshotRequest, TriggerSnapshotResponse
47 Tests the "rosbag_snapshot" command. 48 Relies on the nodes launched in snapshot.test 51 self.
params = rospy.get_param(
"snapshot")
55 self.
trigger = rospy.ServiceProxy(
"trigger_snapshot", TriggerSnapshot)
56 self.
enable = rospy.ServiceProxy(
"enable_snapshot", SetBool)
57 super(TestRosbagSnapshot, self).
__init__(*args)
61 Parse launch parameters of snapshot to cache the topic limits in a map 62 so it is easier to check compliance in later tests. 67 for topic_obj
in self.
params[
'topics']:
70 if type(topic_obj) == dict:
71 topic = list(topic_obj.keys())[0]
72 duration = topic_obj[topic].get(
'duration', duration)
73 memory = topic_obj[topic].get(
'memory', memory)
76 topic = rospy.resolve_name(topic)
77 duration = rospy.Duration(duration)
86 Asserts that calling TriggerWrite service with 87 specifed parameters responds non-success and did not create 90 filename = tempfile.mktemp()
91 res = self.
trigger(filename=filename, topics=topics)
92 self.assertFalse(res.success)
93 self.assertEqual(res.message, TriggerSnapshotResponse.NO_DATA_MESSAGE)
94 self.assertFalse(os.path.isfile(filename))
98 Assert that the recording SetBool service responds with success 101 self.assertTrue(res.success)
102 self.assertEqual(res.message,
"")
112 Asserts that the TriggerWrite services succeeds for the specified request arguments 113 and that the specified bag file is actually created 114 @param prefix_mode: If True, don't put .bag at the end of reqest to check prefix filename mode 117 d = tempfile.mkdtemp()
118 filename = tempfile.mktemp(dir=d)
120 filename = tempfile.mktemp(suffix=
'.bag')
121 req = TriggerSnapshotRequest(filename=filename, topics=topics, **kwargs)
123 self.assertTrue(res.success, msg=
"snapshot should have succeeded. message: {}".format(res.message))
124 self.assertTrue(res.message ==
"")
126 dircontents = os.listdir(d)
127 self.assertEqual(len(dircontents), 1)
128 filename = os.path.join(d, dircontents[0])
129 self.assertEqual(filename[-4:],
'.bag')
130 self.assertTrue(os.path.isfile(filename))
135 Asserts that the measured duration and memory for a topic comply with the launch parameters 137 @param duration: rospy.Duration, age of buffer 138 @param memory: integer, bytes of memory used 140 test_topic = rospy.resolve_name(test_topic)
143 if limits[0] > rospy.Duration():
144 self.assertLessEqual(duration, limits[0])
146 self.assertLessEqual(memory, limits[1])
150 Check that a status message contains info on all subscribed topics 151 and reports that their buffer complies with the configured limits. 154 topics = [msg.topic
for msg
in self.last_status.topics]
156 status_topics = [rospy.resolve_name(list(topic.keys())[0]
if type(topic) == dict
else topic)
157 for topic
in self.
params[
'topics']]
158 self.assertEquals(set(topics), set(status_topics))
159 for topic
in self.last_status.topics:
160 duration = topic.window_stop - topic.window_start
161 memory = topic.traffic
166 Open the bagfile at the specified filename and read it to ensure topic limits were 167 enforced and the optional topic list and start/stop times are also enforced. 170 topics_dict = bag.get_type_and_topic_info()[1]
171 bag_topics = set(topics_dict.keys())
172 param_topics = set(self.topic_limits.keys())
174 self.assertEqual(bag_topics, set(topics))
175 self.assertTrue(bag_topics.issubset(param_topics))
176 for topic
in topics_dict:
177 size = topics_dict[topic].message_count * 8
178 gen = bag.read_messages(topics=topic)
179 _, _, first_time = next(gen)
180 last_time = first_time
182 self.assertGreaterEqual(first_time, start_time)
183 for _, _, last_time
in gen:
186 self.assertLessEqual(last_time, stop_time)
187 duration = last_time - first_time
192 Check that both services provided by snapshot exist. 194 self.trigger.wait_for_service()
195 self.enable.wait_for_service()
199 Wait long enough for memory & duration limits to need to be used 208 Test the more advanced features: pausing and resuming, specific write times, and specific topic list. 213 start = rospy.Time.now()
217 stop = rospy.Time.now()
225 cropped_start = start + rospy.Duration(0.5)
226 cropped_stop = stop - rospy.Duration(1.0)
228 self.
_assert_bag_valid(filename, start_time=cropped_start, stop_time=cropped_stop)
231 specific_topics = [rospy.resolve_name(topic)
for topic
in [
'/test2',
'test1']]
240 Test that an invalid topic or one not subscribed to fails 246 if __name__ ==
'__main__':
249 rospy.init_node(
'test_rosbag_snapshot', anonymous=
True)
250 rostest.run(PKG,
'TestRosbagSnapshot', TestRosbagSnapshot, sys.argv)
def test_invalid_topics(self)
def test_write_advanced(self)
def _parse_params(self, params)
def _assert_no_data(self, topics=[])
def _assert_status_valid(self)
def test_1_service_connects(self)
def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None)
def _assert_limits_enforced(self, test_topic, duration, memory)
def _assert_write_success(self, topics=[], prefix_mode=False, kwargs)
def _assert_record_success(self, data)
def _status_cb(self, msg)