39 from rosbag
import Bag
40 from std_srvs.srv
import SetBool
41 from rosbag_snapshot_msgs.msg
import SnapshotStatus
42 from rosbag_snapshot_msgs.srv
import TriggerSnapshot, TriggerSnapshotRequest, TriggerSnapshotResponse
47 Tests the "rosbag_snapshot" command. 48 Relies on the nodes launched in snapshot.test 52 self.
params = rospy.get_param(
"snapshot")
56 self.
trigger = rospy.ServiceProxy(
"trigger_snapshot", TriggerSnapshot)
57 self.
enable = rospy.ServiceProxy(
"enable_snapshot", SetBool)
58 super(TestRosbagSnapshot, self).
__init__(*args)
62 Parse launch parameters of snapshot to cache the topic limits in a map 63 so it is easier to check compliance in later tests. 69 for topic_obj
in self.
params[
'topics']:
73 if type(topic_obj) == dict:
74 topic = list(topic_obj.keys())[0]
75 duration = topic_obj[topic].get(
'duration', duration)
76 memory = topic_obj[topic].get(
'memory', memory)
77 count = topic_obj[topic].get(
'count', count)
80 topic = rospy.resolve_name(topic)
81 duration = rospy.Duration(duration)
90 Asserts that calling TriggerWrite service with 91 specifed parameters responds non-success and did not create 94 filename = tempfile.mktemp()
95 res = self.
trigger(filename=filename, topics=topics)
96 self.assertFalse(res.success)
97 self.assertEqual(res.message, TriggerSnapshotResponse.NO_DATA_MESSAGE)
98 self.assertFalse(os.path.isfile(filename))
102 Assert that the recording SetBool service responds with success 105 self.assertTrue(res.success)
106 self.assertEqual(res.message,
"")
116 Asserts that the TriggerWrite services succeeds for the specified request arguments 117 and that the specified bag file is actually created 118 @param prefix_mode: If True, don't put .bag at the end of reqest to check prefix filename mode 121 d = tempfile.mkdtemp()
122 filename = tempfile.mktemp(dir=d)
124 filename = tempfile.mktemp(suffix=
'.bag')
125 req = TriggerSnapshotRequest(filename=filename, topics=topics, **kwargs)
127 self.assertTrue(res.success, msg=
"snapshot should have succeeded. message: {}".format(res.message))
128 self.assertTrue(res.message ==
"")
130 dircontents = os.listdir(d)
131 self.assertEqual(len(dircontents), 1)
132 filename = os.path.join(d, dircontents[0])
133 self.assertEqual(filename[-4:],
'.bag')
134 self.assertTrue(os.path.isfile(filename))
139 Asserts that the measured duration and memory for a topic comply with the launch parameters 141 @param duration: rospy.Duration, age of buffer 142 @param memory: integer, bytes of memory used 143 @param count: integer, number of messages stored 145 test_topic = rospy.resolve_name(test_topic)
148 if limits[0] > rospy.Duration():
149 self.assertLessEqual(duration, limits[0])
151 self.assertLessEqual(memory, limits[1])
153 self.assertLessEqual(count, limits[2])
157 Check that a status message contains info on all subscribed topics 158 and reports that their buffer complies with the configured limits. 161 topics = [msg.topic
for msg
in self.
last_status.topics]
163 status_topics = [rospy.resolve_name(list(topic.keys())[0]
if type(topic) == dict
else topic)
164 for topic
in self.
params[
'topics']]
165 self.assertEquals(set(topics), set(status_topics))
167 duration = topic.window_stop - topic.window_start
168 memory = topic.traffic
169 count = topic.delivered_msgs
174 Open the bagfile at the specified filename and read it to ensure topic limits were 175 enforced and the optional topic list and start/stop times are also enforced. 178 topics_dict = bag.get_type_and_topic_info()[1]
179 bag_topics = set(topics_dict.keys())
182 self.assertEqual(bag_topics, set(topics))
183 self.assertTrue(bag_topics.issubset(param_topics))
184 for topic
in topics_dict:
185 size = topics_dict[topic].message_count * 8
186 count = topics_dict[topic].message_count
187 gen = bag.read_messages(topics=topic)
188 _, _, first_time = next(gen)
189 last_time = first_time
191 self.assertGreaterEqual(first_time, start_time)
192 for _, _, last_time
in gen:
195 self.assertLessEqual(last_time, stop_time)
196 duration = last_time - first_time
201 Check that both services provided by snapshot exist. 203 self.
trigger.wait_for_service()
204 self.
enable.wait_for_service()
208 Wait long enough for memory & duration limits to need to be used 217 Test the more advanced features: pausing and resuming, specific write times, and specific topic list. 222 start = rospy.Time.now()
226 stop = rospy.Time.now()
234 cropped_start = start + rospy.Duration(0.5)
235 cropped_stop = stop - rospy.Duration(1.0)
237 self.
_assert_bag_valid(filename, start_time=cropped_start, stop_time=cropped_stop)
240 specific_topics = [rospy.resolve_name(topic)
for topic
in [
'/test2',
'test1']]
249 Test that an invalid topic or one not subscribed to fails 255 if __name__ ==
'__main__':
258 rospy.init_node(
'test_rosbag_snapshot', anonymous=
True)
259 rostest.run(PKG,
'TestRosbagSnapshot', TestRosbagSnapshot, sys.argv)
def test_invalid_topics(self)
def test_write_advanced(self)
def _parse_params(self, params)
def _assert_no_data(self, topics=[])
def _assert_status_valid(self)
def test_1_service_connects(self)
def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None)
def _assert_limits_enforced(self, test_topic, duration, memory, count)
def _assert_write_success(self, topics=[], prefix_mode=False, kwargs)
def _assert_record_success(self, data)
def _status_cb(self, msg)