26 from file_helpers
import get_latest_bag_by_regex, get_latest_bags_by_regex
27 from std_msgs.msg
import String
29 TEST_NODE_NAME =
'test_rolling_recorder_client' 30 ROLLING_RECORDER_NODE_START_TIMEOUT = 5
34 BAG_DEACTIVATE_TIME = 0.5
39 rospy.init_node(TEST_NODE_NAME, log_level=rospy.DEBUG)
54 required_nodes = set([
'/rolling_recorder'])
55 start_time = time.time()
56 while not required_nodes.issubset(rosnode.get_node_names()):
57 if time.time() > start_time + timeout:
58 raise Exception(
"Timed out waiting for rolling recorder nodes")
62 rostopic.wait_for_subscriber(self.test_publisher, ROLLING_RECORDER_NODE_START_TIMEOUT)
def get_latest_bag_by_regex(self, regex_pattern)
def wait_for_rolling_recorder_node_to_subscribe_to_topic(self)
def wait_for_rolling_recorder_nodes(self, timeout=5)
def get_latest_bags_by_regex(self, regex_pattern, count)