test_rolling_recorder_default_topics.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (c) 2020, Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License").
5 # You may not use this file except in compliance with the License.
6 # A copy of the License is located at
7 #
8 # http://aws.amazon.com/apache2.0
9 #
10 # or in the "license" file accompanying this file. This file is distributed
11 # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 # express or implied. See the License for the specific language governing
13 # permissions and limitations under the License.
14 
15 import os
16 import random
17 import string
18 import sys
19 import time
20 
21 from std_msgs.msg import String
22 
23 import rosbag
24 import rosnode
25 import rospy
26 import rostest
27 import rostopic
28 
29 from rolling_recorder_test_base import RollingRecorderTestBase
30 
31 PKG = 'rosbag_uploader_ros1_integration_tests'
32 NAME = 'rolling_recorder_default_topics'
33 
35  # Curently by default the rolling_recorder records on all topics.
36  # This test ensures that when posting to a random topic with default settings
37  # the messages will be recorded correctly
39  # Wait for rolling recorder node to start
41 
42  # Create publishers
43  topic_name = '/my_random_topic_' + ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(8)])
44  self.test_publisher = rospy.Publisher(topic_name, String, queue_size=10)
46 
47  # Find start time of active file
48  active_rosbag = self.get_latest_bag_by_regex("*.bag.active")
49  rospy.loginfo("Active rosbag: %s" % active_rosbag)
50  active_rosbag_start_time = os.path.getctime(active_rosbag)
51 
52  # Calculate time active bag will roll over
53  bag_finish_time = active_rosbag_start_time + self.bag_rollover_time
54  bag_finish_time_remaining = bag_finish_time - time.time()
55  rospy.loginfo("Bag finish time remaining: %f" % bag_finish_time_remaining)
56 
57  # Emit some data to the test topic
58  total_test_messages = 10
59  sleep_between_message = (bag_finish_time_remaining * 0.5) / total_test_messages
60  rospy.loginfo("Sleep between messages: %f" % sleep_between_message)
61  for x in range(total_test_messages):
62  self.test_publisher.publish("Test message %d" % x)
63  time.sleep(sleep_between_message)
64 
65  # Wait for all messages to be saved to bags.
66  # Because of buffering some messages may go into a second bag, which is
67  # why we're waiting for it to roll over twice
68  bag_finish_time_remaining = bag_finish_time - time.time()
69  time.sleep(bag_finish_time_remaining + self.bag_rollover_time + self.bag_deactivate_time)
70 
71  # Check that the data is inside the latest rosbags
72  latest_bags = self.get_latest_bags_by_regex("*.bag", 2)
73  rospy.loginfo("Latest bags: %s " % latest_bags)
74  total_messages = 0
75  total_topic_messages = 0
76  for bag_path in latest_bags:
77  bag = rosbag.Bag(bag_path)
78  for topic, msg, _ in bag.read_messages():
79  total_messages += 1
80  if topic == topic_name:
81  total_topic_messages += 1
82 
83  # Ensure that all messages published to this topic are recorded
84  self.assertEquals(total_topic_messages, total_test_messages)
85  # Ensure that more topics than just this topic are recorded
86  # The additional messages will be logs about what the rosbag recorder is doing
87  # published by the rosbag recorder itself, such as Opening/Closing rosbags
88  # as well as any logs this system has written to /rosout.
89  self.assertTrue(total_messages > total_topic_messages)
90 
91 if __name__ == '__main__':
92  rostest.rosrun(PKG, NAME, TestRollingRecorderDefaultTopics, sys.argv)


integ_tests
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:32