15 from functools
import partial
27 from s3_client
import S3Client
29 from std_msgs.msg
import String
30 from recorder_msgs.msg
import DurationRecorderResult, DurationRecorderGoal
31 from duration_recorder_test_base
import DurationRecorderTestBase
33 PKG =
'rosbag_uploader_ros1_integration_tests' 34 NAME =
'test_duration_record_general' 35 RESULT_CODE_SUCCESS = 0
36 RESULT_CODE_SKIPPED = 2
40 start_time = time.time()
42 self.assertEquals(action_result.result.result, RESULT_CODE_SUCCESS)
46 start_time = time.time()
48 for _
in range(total_recordings):
50 self.assertEquals(action_result.result.result, RESULT_CODE_SUCCESS)
56 When attempting to record while it's already recording, ensure it 57 rejects the request gracefully 60 start_time = time.time()
62 total_attempted_recordings = 25
63 time_between_attempts = (record_time / total_attempted_recordings) / 2
66 goal = DurationRecorderGoal(
67 duration=rospy.Duration.from_sec(record_time),
68 topics_to_record=[
'/rosout']
70 primary_goal_action_client.send_goal(goal)
73 for _
in range(total_attempted_recordings):
75 self.assertEquals(action_result.result.result, RESULT_CODE_SKIPPED)
76 time.sleep(time_between_attempts)
78 res = primary_goal_action_client.wait_for_result(rospy.Duration.from_sec(15.0))
79 self.assertTrue(res,
'Timed out waiting for result after sending Duration Recorder Goal')
85 start_time = time.time()
89 total_test_messages = 10
96 goal = DurationRecorderGoal(
97 duration=rospy.Duration.from_sec(duration),
98 topics_to_record=[topic_name]
100 self.action_client.send_goal(goal)
109 self.action_client.wait_for_result(rospy.Duration.from_sec(10.0))
110 action_result = self.action_client.get_result()
117 self.assertEquals(action_result.result.result, RESULT_CODE_SUCCESS)
122 total_topic_messages = 0
124 for topic, msg, _
in bag.read_messages():
125 if topic == topic_name:
126 total_topic_messages += 1
127 self.assertEquals(total_topic_messages, total_test_messages)
130 s3_region = rospy.get_param(
'/s3_file_uploader/aws_client_configuration/region')
132 s3_bucket_name = rospy.get_param(
'/s3_file_uploader/s3_bucket')
133 s3_key = os.path.basename(latest_bag)
134 with tempfile.NamedTemporaryFile()
as f:
135 s3_client.download_file(s3_bucket_name, s3_key, f.name)
136 total_topic_messages = 0
138 for topic, msg, _
in bag.read_messages():
139 if topic == topic_name:
140 total_topic_messages += 1
141 self.assertEquals(total_topic_messages, total_test_messages)
144 publisher = rospy.Publisher(topic, String, queue_size=total_messages)
145 for _
in range(total_messages):
147 publisher.publish(msg)
151 return ''.join([random.choice(string.ascii_letters + string.digits)
for _
in range(length)])
154 if __name__ ==
'__main__':
155 rostest.rosrun(PKG, NAME, TestDurationRecorderGeneral, sys.argv)
def create_random_word(length)
def test_record_duration(self)
def test_record_overlapping_times(self)
def test_record_multiple_times(self)
def get_latest_bag_by_regex(self, regex_pattern)
def publish_periodic_data_to_topic(self, topic, interval, total_messages)
def test_record_specific_topic(self)
def check_rosbags_were_recorded(self, start_time, total_bags)
def record_for_duration(self, duration=5, topics=None)
def _create_duration_recorder_action_client(self)