rolling_recorder_test_base.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (c) 2020, Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License").
5 # You may not use this file except in compliance with the License.
6 # A copy of the License is located at
7 #
8 # http://aws.amazon.com/apache2.0
9 #
10 # or in the "license" file accompanying this file. This file is distributed
11 # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 # express or implied. See the License for the specific language governing
13 # permissions and limitations under the License.
14 
15 import os
16 import sys
17 import time
18 import unittest
19 
20 import rosbag
21 import rosnode
22 import rospy
23 import rostest
24 import rostopic
25 
26 from file_helpers import get_latest_bag_by_regex, get_latest_bags_by_regex
27 from std_msgs.msg import String
28 
29 TEST_NODE_NAME = 'test_rolling_recorder_client'
30 ROLLING_RECORDER_NODE_START_TIMEOUT = 5
31 
32 # Max time it should take for the current bag to go from active to inactive
33 # as bags are not immediately rolled switched because of polling delays
34 BAG_DEACTIVATE_TIME = 0.5
35 
36 class RollingRecorderTestBase(unittest.TestCase):
37  @classmethod
38  def setUpClass(cls):
39  rospy.init_node(TEST_NODE_NAME, log_level=rospy.DEBUG)
40 
41  @classmethod
42  def tearDownClass(cls):
43  pass
44 
45  def setUp(self):
46  self.bag_deactivate_time = BAG_DEACTIVATE_TIME
47  self.bag_rollover_time = rospy.get_param("~bag_rollover_time")
48  self.rosbag_directory = os.path.expanduser(rospy.get_param("~write_directory"))
49 
50  def tearDown(self):
51  pass
52 
53  def wait_for_rolling_recorder_nodes(self, timeout=5):
54  required_nodes = set(['/rolling_recorder'])
55  start_time = time.time()
56  while not required_nodes.issubset(rosnode.get_node_names()):
57  if time.time() > start_time + timeout:
58  raise Exception("Timed out waiting for rolling recorder nodes")
59  time.sleep(0.1)
60 
62  rostopic.wait_for_subscriber(self.test_publisher, ROLLING_RECORDER_NODE_START_TIMEOUT)
63 
64  def get_latest_bag_by_regex(self, regex_pattern):
65  return get_latest_bag_by_regex(self.rosbag_directory, regex_pattern)
66 
67  def get_latest_bags_by_regex(self, regex_pattern, count):
68  return get_latest_bags_by_regex(self.rosbag_directory, regex_pattern, count)


integ_tests
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:32