test_duration_recorder_general.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (c) 2020, Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License").
5 # You may not use this file except in compliance with the License.
6 # A copy of the License is located at
7 #
8 # http://aws.amazon.com/apache2.0
9 #
10 # or in the "license" file accompanying this file. This file is distributed
11 # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 # express or implied. See the License for the specific language governing
13 # permissions and limitations under the License.
14 
15 from functools import partial
16 import os
17 import random
18 import string
19 import sys
20 import threading
21 import tempfile
22 import time
23 
24 import rosbag
25 import rospy
26 import rostest
27 from s3_client import S3Client
28 
29 from std_msgs.msg import String
30 from recorder_msgs.msg import DurationRecorderResult, DurationRecorderGoal
31 from duration_recorder_test_base import DurationRecorderTestBase
32 
33 PKG = 'rosbag_uploader_ros1_integration_tests'
34 NAME = 'test_duration_record_general'
35 RESULT_CODE_SUCCESS = 0
36 RESULT_CODE_SKIPPED = 2
37 
40  start_time = time.time()
41  action_result = self.record_for_duration(1)
42  self.assertEquals(action_result.result.result, RESULT_CODE_SUCCESS)
43  self.check_rosbags_were_recorded(start_time, 1)
44 
46  start_time = time.time()
47  total_recordings = 25
48  for _ in range(total_recordings):
49  action_result = self.record_for_duration(1)
50  self.assertEquals(action_result.result.result, RESULT_CODE_SUCCESS)
51  self.check_rosbags_were_recorded(start_time, total_recordings)
52 
53 
55  """
56  When attempting to record while it's already recording, ensure it
57  rejects the request gracefully
58  """
59  primary_goal_action_client = self._create_duration_recorder_action_client()
60  start_time = time.time()
61  record_time = 5
62  total_attempted_recordings = 25
63  time_between_attempts = (record_time / total_attempted_recordings) / 2
64 
65  # Start the duration recorder for `record_time` seconds
66  goal = DurationRecorderGoal(
67  duration=rospy.Duration.from_sec(record_time),
68  topics_to_record=['/rosout']
69  )
70  primary_goal_action_client.send_goal(goal)
71 
72  # Try to record multiple times while it's already recording using default action client
73  for _ in range(total_attempted_recordings):
74  action_result = self.record_for_duration(1)
75  self.assertEquals(action_result.result.result, RESULT_CODE_SKIPPED)
76  time.sleep(time_between_attempts)
77 
78  res = primary_goal_action_client.wait_for_result(rospy.Duration.from_sec(15.0))
79  self.assertTrue(res, 'Timed out waiting for result after sending Duration Recorder Goal')
80 
81  # Check only one bag was recorded since the start
82  self.check_rosbags_were_recorded(start_time, 1)
83 
85  start_time = time.time()
86  topic_name = '/my_random_topic_' + create_random_word(8)
87  duration = 5
88  interval = 0.1
89  total_test_messages = 10
90 
91  # Publish some data to the topic before recording is not started.
92  # This data SHOULD NOT be recorded into the rosbag
93  self.publish_periodic_data_to_topic(topic_name, interval, total_test_messages)
94 
95  # Start the duration recorder for `duration` seconds
96  goal = DurationRecorderGoal(
97  duration=rospy.Duration.from_sec(duration),
98  topics_to_record=[topic_name]
99  )
100  self.action_client.send_goal(goal)
101 
102  # Wait for duration recorder to start recording
103  time.sleep(0.5)
104 
105  # Publish some data to that topic
106  self.publish_periodic_data_to_topic(topic_name, interval, total_test_messages)
107 
108  # Wait for the duration recorder to finish
109  self.action_client.wait_for_result(rospy.Duration.from_sec(10.0))
110  action_result = self.action_client.get_result()
111 
112  # Publish some data to the topic after recording has finished
113  # This data SHOULD NOT be recorded into the rosbag
114  self.publish_periodic_data_to_topic(topic_name, interval, total_test_messages)
115 
116  # Ensure the duration recorder created the bag correctly
117  self.assertEquals(action_result.result.result, RESULT_CODE_SUCCESS)
118  self.check_rosbags_were_recorded(start_time, 1)
119 
120  # Ensure that the rosbag contains all the test messages
121  latest_bag = self.get_latest_bag_by_regex("*.bag")
122  total_topic_messages = 0
123  bag = rosbag.Bag(latest_bag)
124  for topic, msg, _ in bag.read_messages():
125  if topic == topic_name:
126  total_topic_messages += 1
127  self.assertEquals(total_topic_messages, total_test_messages)
128 
129  # Ensure that the rosbag uploaded to S3 contains all the test messages
130  s3_region = rospy.get_param('/s3_file_uploader/aws_client_configuration/region')
131  s3_client = S3Client(s3_region)
132  s3_bucket_name = rospy.get_param('/s3_file_uploader/s3_bucket')
133  s3_key = os.path.basename(latest_bag)
134  with tempfile.NamedTemporaryFile() as f:
135  s3_client.download_file(s3_bucket_name, s3_key, f.name)
136  total_topic_messages = 0
137  bag = rosbag.Bag(f.name)
138  for topic, msg, _ in bag.read_messages():
139  if topic == topic_name:
140  total_topic_messages += 1
141  self.assertEquals(total_topic_messages, total_test_messages)
142 
143  def publish_periodic_data_to_topic(self, topic, interval, total_messages):
144  publisher = rospy.Publisher(topic, String, queue_size=total_messages)
145  for _ in range(total_messages):
146  msg = create_random_word(64)
147  publisher.publish(msg)
148  time.sleep(interval)
149 
150 def create_random_word(length):
151  return ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(length)])
152 
153 
154 if __name__ == '__main__':
155  rostest.rosrun(PKG, NAME, TestDurationRecorderGeneral, sys.argv)
def publish_periodic_data_to_topic(self, topic, interval, total_messages)


integ_tests
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:32