test_rolling_recorder_custom_topic.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (c) 2020, Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License").
5 # You may not use this file except in compliance with the License.
6 # A copy of the License is located at
7 #
8 # http://aws.amazon.com/apache2.0
9 #
10 # or in the "license" file accompanying this file. This file is distributed
11 # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 # express or implied. See the License for the specific language governing
13 # permissions and limitations under the License.
14 
15 import os
16 import sys
17 import time
18 
19 from std_msgs.msg import String
20 
21 import rosbag
22 import rosnode
23 import rospy
24 import rostest
25 import rostopic
26 
27 from rolling_recorder_test_base import RollingRecorderTestBase
28 
29 PKG = 'rosbag_uploader_ros1_integration_tests'
30 NAME = 'rolling_recorder_custom_topic'
31 
34  # Get the custom topic we specified in the test file
35  self.topic_to_record = rospy.get_param("~topic_to_record")
36 
37  # Wait for rolling recorder node to start
39 
40  # Create publishers
41  self.test_publisher = rospy.Publisher(self.topic_to_record, String, queue_size=10)
43 
44  # Find start time of active file
45  active_rosbag = self.get_latest_bag_by_regex("*.bag.active")
46  rospy.loginfo("Active rosbag: %s" % active_rosbag)
47  active_rosbag_start_time = os.path.getctime(active_rosbag)
48 
49  # Calculate time active bag will roll over
50  bag_finish_time = active_rosbag_start_time + self.bag_rollover_time
51  bag_finish_time_remaining = bag_finish_time - time.time()
52  rospy.loginfo("Bag finish time remaining: %f" % bag_finish_time_remaining)
53 
54  # Emit some data to the test topic
55  total_test_messages = 10
56  sleep_between_message = (bag_finish_time_remaining * 0.5) / total_test_messages
57  rospy.loginfo("Sleep between messages: %f" % sleep_between_message)
58  for x in range(total_test_messages):
59  self.test_publisher.publish("Test message %d" % x)
60  time.sleep(sleep_between_message)
61 
62  # Wait for current bag to finish recording and roll over
63  bag_finish_time_remaining = bag_finish_time - time.time()
64  rospy.loginfo("Bag finish time remaining after publish: %f" % bag_finish_time_remaining)
65 
66  # Add 0.5s as it takes some time for bag rotation to occur
67  time.sleep(bag_finish_time_remaining + 0.5)
68 
69  # Check that the data is inside the latest rosbag
70  latest_bag = self.get_latest_bag_by_regex("*.bag")
71  rospy.loginfo("Latest bag: %s " % latest_bag)
72  bag = rosbag.Bag(latest_bag)
73  total_bag_messages = 0
74  for _, msg, _ in bag.read_messages():
75  total_bag_messages += 1
76 
77  self.assertEquals(total_bag_messages, total_test_messages)
78 
79 if __name__ == '__main__':
80  rostest.rosrun(PKG, NAME, TestRollingRecorderCustomTopic, sys.argv)


integ_tests
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:32