22 from std_msgs.msg
import String
30 from rolling_recorder_test_base
import RollingRecorderTestBase
32 PKG =
'rosbag_uploader_ros1_integration_tests' 33 NAME =
'periodic_file_deleter' 37 super(TestPeriodicFileDeleter, self).
setUp()
51 rospy.loginfo(
"Active rosbag: %s" % active_rosbag)
52 active_rosbag_start_time = os.path.getctime(active_rosbag)
56 bag_finish_time_remaining = bag_finish_time - time.time()
57 rospy.loginfo(
"Bag finish time remaining: %f" % bag_finish_time_remaining)
60 total_test_messages = 10
61 sleep_between_message = (bag_finish_time_remaining * 0.5) / total_test_messages
62 rospy.loginfo(
"Sleep between messages: %f" % sleep_between_message)
63 for x
in range(total_test_messages):
64 self.test_publisher.publish(
"Test message %d" % x)
65 time.sleep(sleep_between_message)
68 bag_finish_time_remaining = bag_finish_time - time.time()
69 rospy.loginfo(
"Bag finish time remaining after publish: %f" % bag_finish_time_remaining)
71 time.sleep(bag_finish_time_remaining + 0.3)
73 rospy.loginfo(
"Latest rosbag: %s" % latest_bag)
77 rospy.loginfo(
"Checking latest rosbag still exists: %s" % latest_bag)
78 self.assertFalse(os.path.exists(latest_bag))
80 if __name__ ==
'__main__':
81 rostest.rosrun(PKG, NAME, TestPeriodicFileDeleter, sys.argv)
def get_latest_bag_by_regex(self, regex_pattern)
def wait_for_rolling_recorder_node_to_subscribe_to_topic(self)
periodic_deleter_interval
def wait_for_rolling_recorder_nodes(self, timeout=5)
def test_record_custom_topic(self)