23 from s3_client
import S3Client
25 from file_helpers
import get_latest_bag_by_regex, get_latest_bags_by_regex
26 from recorder_msgs.msg
import DurationRecorderAction, DurationRecorderGoal
28 ACTION =
'/duration_recorder/RosbagDurationRecord' 29 TEST_NODE_NAME =
'duration_record_action_client' 30 AWS_DEFAULT_REGION =
'us-west-2' 36 rospy.init_node(TEST_NODE_NAME, log_level=rospy.DEBUG)
37 s3_region = rospy.get_param(
'/s3_file_uploader/aws_client_configuration/region')
39 s3_bucket_name = rospy.get_param(
'/s3_file_uploader/s3_bucket')
40 s3.create_bucket(s3_bucket_name)
41 s3.wait_for_bucket_create(s3_bucket_name)
45 s3_region = rospy.get_param(
'/s3_file_uploader/aws_client_configuration/region')
47 s3_bucket_name = rospy.get_param(
'/s3_file_uploader/s3_bucket')
48 s3.delete_all_objects(s3_bucket_name)
49 s3.delete_bucket(s3_bucket_name)
60 res = action_client.wait_for_server()
61 self.assertTrue(res,
'Failed to connect to action server')
67 goal = DurationRecorderGoal(
68 duration=rospy.Duration.from_sec(duration),
69 topics_to_record=topics
71 self.action_client.send_goal(goal)
72 res = self.action_client.wait_for_result(rospy.Duration.from_sec(30.0))
73 self.assertTrue(res,
'Timed out waiting for result after sending Duration Recorder Goal')
74 return self.action_client.get_result()
78 total_bags_found = len(latest_bags)
79 self.assertEquals(total_bags_found, total_bags,
"Expected %d bags but only found %d" % (total_bags, total_bags_found))
80 for bag_path
in latest_bags:
81 bag_create_time = os.path.getctime(bag_path)
82 self.assertGreater(bag_create_time, start_time)
86 for bag_path
in all_bags:
def get_latest_bags_by_regex(self, regex_pattern, count)
def get_latest_bag_by_regex(self, regex_pattern)
def check_rosbags_were_recorded(self, start_time, total_bags)
def record_for_duration(self, duration=5, topics=None)
def _create_duration_recorder_action_client(self)
def delete_all_rosbags(self)