duration_recorder_test_base.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (c) 2020, Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License").
5 # You may not use this file except in compliance with the License.
6 # A copy of the License is located at
7 #
8 # http://aws.amazon.com/apache2.0
9 #
10 # or in the "license" file accompanying this file. This file is distributed
11 # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 # express or implied. See the License for the specific language governing
13 # permissions and limitations under the License.
14 
15 import os
16 import sys
17 import unittest
18 
19 import actionlib
20 import rospy
21 import rostest
22 
23 from s3_client import S3Client
24 
25 from file_helpers import get_latest_bag_by_regex, get_latest_bags_by_regex
26 from recorder_msgs.msg import DurationRecorderAction, DurationRecorderGoal
27 
28 ACTION = '/duration_recorder/RosbagDurationRecord'
29 TEST_NODE_NAME = 'duration_record_action_client'
30 AWS_DEFAULT_REGION = 'us-west-2'
31 
32 
33 class DurationRecorderTestBase(unittest.TestCase):
34  @classmethod
35  def setUpClass(cls):
36  rospy.init_node(TEST_NODE_NAME, log_level=rospy.DEBUG)
37  s3_region = rospy.get_param('/s3_file_uploader/aws_client_configuration/region')
38  s3 = S3Client(s3_region)
39  s3_bucket_name = rospy.get_param('/s3_file_uploader/s3_bucket')
40  s3.create_bucket(s3_bucket_name)
41  s3.wait_for_bucket_create(s3_bucket_name)
42 
43  @classmethod
44  def tearDownClass(cls):
45  s3_region = rospy.get_param('/s3_file_uploader/aws_client_configuration/region')
46  s3 = S3Client(s3_region)
47  s3_bucket_name = rospy.get_param('/s3_file_uploader/s3_bucket')
48  s3.delete_all_objects(s3_bucket_name)
49  s3.delete_bucket(s3_bucket_name)
50 
51  def setUp(self):
53  self.rosbag_directory = os.path.expanduser(rospy.get_param('~write_directory'))
54 
55  def tearDown(self):
56  self.delete_all_rosbags()
57 
59  action_client = actionlib.SimpleActionClient(ACTION, DurationRecorderAction)
60  res = action_client.wait_for_server()
61  self.assertTrue(res, 'Failed to connect to action server')
62  return action_client
63 
64  def record_for_duration(self, duration=5, topics=None):
65  if topics is None:
66  topics = ['/rosout']
67  goal = DurationRecorderGoal(
68  duration=rospy.Duration.from_sec(duration),
69  topics_to_record=topics
70  )
71  self.action_client.send_goal(goal)
72  res = self.action_client.wait_for_result(rospy.Duration.from_sec(30.0))
73  self.assertTrue(res, 'Timed out waiting for result after sending Duration Recorder Goal')
74  return self.action_client.get_result()
75 
76  def check_rosbags_were_recorded(self, start_time, total_bags):
77  latest_bags = self.get_latest_bags_by_regex("*.bag", total_bags)
78  total_bags_found = len(latest_bags)
79  self.assertEquals(total_bags_found, total_bags, "Expected %d bags but only found %d" % (total_bags, total_bags_found))
80  for bag_path in latest_bags:
81  bag_create_time = os.path.getctime(bag_path)
82  self.assertGreater(bag_create_time, start_time)
83 
84  def delete_all_rosbags(self):
85  all_bags = get_latest_bags_by_regex(self.rosbag_directory, "*.bag")
86  for bag_path in all_bags:
87  os.remove(bag_path)
88 
89  def get_latest_bag_by_regex(self, regex_pattern):
90  return get_latest_bag_by_regex(self.rosbag_directory, regex_pattern)
91 
92  def get_latest_bags_by_regex(self, regex_pattern, count):
93  return get_latest_bags_by_regex(self.rosbag_directory, regex_pattern, count)


integ_tests
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:32