test_periodic_file_deleter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (c) 2020, Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License").
5 # You may not use this file except in compliance with the License.
6 # A copy of the License is located at
7 #
8 # http://aws.amazon.com/apache2.0
9 #
10 # or in the "license" file accompanying this file. This file is distributed
11 # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 # express or implied. See the License for the specific language governing
13 # permissions and limitations under the License.
14 
15 
16 import glob
17 import os
18 import sys
19 import time
20 import unittest
21 
22 from std_msgs.msg import String
23 
24 import rosbag
25 import rosnode
26 import rospy
27 import rostest
28 import rostopic
29 
30 from rolling_recorder_test_base import RollingRecorderTestBase
31 
32 PKG = 'rosbag_uploader_ros1_integration_tests'
33 NAME = 'periodic_file_deleter'
34 
36  def setUp(self):
37  super(TestPeriodicFileDeleter, self).setUp()
38  self.max_record_time = rospy.get_param("~max_record_time")
40 
42  # Wait for rolling recorder node to start
44 
45  # Create publishers
46  self.test_publisher = rospy.Publisher('/some_topic', String, queue_size=10)
48 
49  # Find start time of active file
50  active_rosbag = self.get_latest_bag_by_regex("*.bag.active")
51  rospy.loginfo("Active rosbag: %s" % active_rosbag)
52  active_rosbag_start_time = os.path.getctime(active_rosbag)
53 
54  # Calculate time active bag will be deleted
55  bag_finish_time = active_rosbag_start_time + self.periodic_deleter_interval
56  bag_finish_time_remaining = bag_finish_time - time.time()
57  rospy.loginfo("Bag finish time remaining: %f" % bag_finish_time_remaining)
58 
59  # Emit some data to the test topic
60  total_test_messages = 10
61  sleep_between_message = (bag_finish_time_remaining * 0.5) / total_test_messages
62  rospy.loginfo("Sleep between messages: %f" % sleep_between_message)
63  for x in range(total_test_messages):
64  self.test_publisher.publish("Test message %d" % x)
65  time.sleep(sleep_between_message)
66 
67  # Wait for current bag to finish recording and roll over
68  bag_finish_time_remaining = bag_finish_time - time.time()
69  rospy.loginfo("Bag finish time remaining after publish: %f" % bag_finish_time_remaining)
70  # Add 0.3s as it takes some time for bag rotation to occur
71  time.sleep(bag_finish_time_remaining + 0.3)
72  latest_bag = self.get_latest_bag_by_regex("*.bag")
73  rospy.loginfo("Latest rosbag: %s" % latest_bag)
74 
75  # Check that the bag is deleted after at least one interval and max record time has passed
76  time.sleep(self.periodic_deleter_interval + self.max_record_time)
77  rospy.loginfo("Checking latest rosbag still exists: %s" % latest_bag)
78  self.assertFalse(os.path.exists(latest_bag))
79 
80 if __name__ == '__main__':
81  rostest.rosrun(PKG, NAME, TestPeriodicFileDeleter, sys.argv)


integ_tests
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:32