39 from rosbag 
import Bag
    40 from std_srvs.srv 
import SetBool
    41 from rosbag_snapshot_msgs.msg 
import SnapshotStatus
    42 from rosbag_snapshot_msgs.srv 
import TriggerSnapshot, TriggerSnapshotRequest, TriggerSnapshotResponse
    47     Tests the "rosbag_snapshot" command.    48     Relies on the nodes launched in snapshot.test    51         self.
params = rospy.get_param(
"snapshot")
    55         self.
trigger = rospy.ServiceProxy(
"trigger_snapshot", TriggerSnapshot)
    56         self.
enable = rospy.ServiceProxy(
"enable_snapshot", SetBool)
    57         super(TestRosbagSnapshot, self).
__init__(*args)
    61         Parse launch parameters of snapshot to cache the topic limits in a map    62         so it is easier to check compliance in later tests.    67         for topic_obj 
in self.
params[
'topics']:
    70             if type(topic_obj) == dict:
    71                 topic = list(topic_obj.keys())[0]
    72                 duration = topic_obj[topic].get(
'duration', duration)
    73                 memory = topic_obj[topic].get(
'memory', memory)
    76             topic = rospy.resolve_name(topic)
    77             duration = rospy.Duration(duration)
    86         Asserts that calling TriggerWrite service with    87         specifed parameters responds non-success and did not create    90         filename = tempfile.mktemp()
    91         res = self.
trigger(filename=filename, topics=topics)
    92         self.assertFalse(res.success)
    93         self.assertEqual(res.message, TriggerSnapshotResponse.NO_DATA_MESSAGE)
    94         self.assertFalse(os.path.isfile(filename))
    98         Assert that the recording SetBool service responds with success   101         self.assertTrue(res.success)
   102         self.assertEqual(res.message, 
"")
   112         Asserts that the TriggerWrite services succeeds for the specified request arguments   113         and that the specified bag file is actually created   114         @param prefix_mode: If True, don't put .bag at the end of reqest to check prefix filename mode   117             d = tempfile.mkdtemp()
   118             filename = tempfile.mktemp(dir=d)
   120             filename = tempfile.mktemp(suffix=
'.bag')
   121         req = TriggerSnapshotRequest(filename=filename, topics=topics, **kwargs)
   123         self.assertTrue(res.success, msg=
"snapshot should have succeeded. message: {}".format(res.message))
   124         self.assertTrue(res.message == 
"")
   126             dircontents = os.listdir(d)
   127             self.assertEqual(len(dircontents), 1)
   128             filename = os.path.join(d, dircontents[0])
   129             self.assertEqual(filename[-4:], 
'.bag')
   130         self.assertTrue(os.path.isfile(filename))
   135         Asserts that the measured duration and memory for a topic comply with the launch parameters   137         @param duration: rospy.Duration, age of buffer   138         @param memory: integer, bytes of memory used   140         test_topic = rospy.resolve_name(test_topic)
   143         if limits[0] > rospy.Duration():
   144             self.assertLessEqual(duration, limits[0])
   146             self.assertLessEqual(memory, limits[1])
   150         Check that a status message contains info on all subscribed topics   151         and reports that their buffer complies with the configured limits.   154         topics = [msg.topic 
for msg 
in self.last_status.topics]
   156         status_topics = [rospy.resolve_name(list(topic.keys())[0] 
if type(topic) == dict 
else topic)
   157                          for topic 
in self.
params[
'topics']]
   158         self.assertEquals(set(topics), set(status_topics))  
   159         for topic 
in self.last_status.topics:
   160             duration = topic.window_stop - topic.window_start
   161             memory = topic.traffic
   166         Open the bagfile at the specified filename and read it to ensure topic limits were   167         enforced and the optional topic list and start/stop times are also enforced.   170         topics_dict = bag.get_type_and_topic_info()[1]
   171         bag_topics = set(topics_dict.keys())
   172         param_topics = set(self.topic_limits.keys())
   174             self.assertEqual(bag_topics, set(topics))
   175         self.assertTrue(bag_topics.issubset(param_topics))
   176         for topic 
in topics_dict:
   177             size = topics_dict[topic].message_count * 8  
   178             gen = bag.read_messages(topics=topic)
   179             _, _, first_time = next(gen)
   180             last_time = first_time  
   182                 self.assertGreaterEqual(first_time, start_time)
   183             for _, _, last_time 
in gen:  
   186                 self.assertLessEqual(last_time, stop_time)
   187             duration = last_time - first_time
   192         Check that both services provided by snapshot exist.   194         self.trigger.wait_for_service()
   195         self.enable.wait_for_service()
   199         Wait long enough for memory & duration limits to need to be used   208         Test the more advanced features: pausing and resuming, specific write times, and specific topic list.   213         start = rospy.Time.now()
   217         stop = rospy.Time.now()
   225         cropped_start = start + rospy.Duration(0.5)
   226         cropped_stop = stop - rospy.Duration(1.0)
   228         self.
_assert_bag_valid(filename, start_time=cropped_start, stop_time=cropped_stop)
   231         specific_topics = [rospy.resolve_name(topic) 
for topic 
in [
'/test2', 
'test1']]
   240         Test that an invalid topic or one not subscribed to fails   246 if __name__ == 
'__main__':
   249     rospy.init_node(
'test_rosbag_snapshot', anonymous=
True)
   250     rostest.run(PKG, 
'TestRosbagSnapshot', TestRosbagSnapshot, sys.argv)
 
def test_invalid_topics(self)
def test_write_advanced(self)
def _parse_params(self, params)
def _assert_no_data(self, topics=[])
def _assert_status_valid(self)
def test_1_service_connects(self)
def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None)
def _assert_limits_enforced(self, test_topic, duration, memory)
def _assert_write_success(self, topics=[], prefix_mode=False, kwargs)
def _assert_record_success(self, data)
def _status_cb(self, msg)