test_rolling_recorder_upload.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (c) 2020, Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License").
5 # You may not use this file except in compliance with the License.
6 # A copy of the License is located at
7 #
8 # http://aws.amazon.com/apache2.0
9 #
10 # or in the "license" file accompanying this file. This file is distributed
11 # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 # express or implied. See the License for the specific language governing
13 # permissions and limitations under the License.
14 
15 import os
16 import random
17 import string
18 import sys
19 import tempfile
20 import time
21 
22 import actionlib
23 import rosbag
24 import rosnode
25 import rospy
26 import rostest
27 import rostopic
28 from s3_client import S3Client
29 
30 from recorder_msgs.msg import RollingRecorderAction, RollingRecorderGoal
31 from rolling_recorder_test_base import RollingRecorderTestBase
32 from std_msgs.msg import String
33 
34 PKG = 'rosbag_uploader_ros1_integration_tests'
35 NAME = 'rolling_recorder_custom_topic'
36 ACTION = '/rolling_recorder/RosbagRollingRecord'
37 RESULT_SUCCESS = 0
38 GOAL_COMPLETION_TIMEOUT_SEC = 30.0
39 
41  def setUp(self):
42  super(TestRollingRecorderUploadOnGoal, self).setUp()
43  self.s3_region = rospy.get_param('/s3_file_uploader/aws_client_configuration/region')
44  self.s3_bucket_name = rospy.get_param('/s3_file_uploader/s3_bucket')
46  self.s3_client.create_bucket(self.s3_bucket_name)
47  self.s3_client.wait_for_bucket_create(self.s3_bucket_name)
48 
49  def tearDown(self):
50  super(TestRollingRecorderUploadOnGoal, self).setUp()
51  self.s3_client.delete_all_objects(self.s3_bucket_name)
52  self.s3_client.delete_bucket(self.s3_bucket_name)
53 
55  # Get the custom topic we specified in the test file
56  self.topic_to_record = rospy.get_param("~topic_to_record")
57 
58  # Wait for rolling recorder node and action server to start
60 
61  # Create publishers
62  self.test_publisher = rospy.Publisher(self.topic_to_record, String, queue_size=10)
64 
65  # Find start time of active file
66  active_rosbag = self.get_latest_bag_by_regex("*.bag.active")
67  rospy.loginfo("Active rosbag: %s" % active_rosbag)
68  active_rosbag_start_time = os.path.getctime(active_rosbag)
69  start_time = rospy.Time.from_sec(active_rosbag_start_time - 1)
70 
71  # Calculate time active bag will roll over
72  bag_finish_time = active_rosbag_start_time + self.bag_rollover_time
73  bag_finish_time_remaining = bag_finish_time - time.time()
74  rospy.loginfo("Bag finish time remaining: %f" % bag_finish_time_remaining)
75 
76  # Emit some data to the test topic
77  total_test_messages = 10
78  sleep_between_message = (bag_finish_time_remaining * 0.5) / total_test_messages
79  rospy.loginfo("Sleep between messages: %f" % sleep_between_message)
80  for x in range(total_test_messages):
81  self.test_publisher.publish("Test message %d" % x)
82  time.sleep(sleep_between_message)
83 
84  # Wait for current bag to finish recording and roll over
85  bag_finish_time_remaining = bag_finish_time - time.time()
86  rospy.loginfo("Bag finish time remaining after publish: %f" % bag_finish_time_remaining)
87 
88  # Add 0.5s as it takes some time for bag rotation to occur
89  time.sleep(bag_finish_time_remaining + 0.5)
90 
91  latest_bag = self.get_latest_bag_by_regex("*.bag")
92 
93  # Send a goal to upload the bag data to S3
94  # Create an Action client to send the goal
95  self.action_client = actionlib.SimpleActionClient(ACTION, RollingRecorderAction)
96  res = self.action_client.wait_for_server()
97  self.assertTrue(res, 'Failed to connect to rolling recorder action server')
98 
99  # Create the goal and send through action client
100  s3_folder = 'test_rr/'
101  s3_subfolder = ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(8)])
102  s3_destination = os.path.join(s3_folder, s3_subfolder)
103  end_time = rospy.Time.now()
104  goal = RollingRecorderGoal(destination=s3_destination)
105  self.action_client.send_goal(goal)
106  res = self.action_client.wait_for_result(rospy.Duration.from_sec(GOAL_COMPLETION_TIMEOUT_SEC))
107  self.assertTrue(res, "Rolling Recorder Goal timed out")
108  result = self.action_client.get_result()
109  self.assertEquals(result.result.result, RESULT_SUCCESS)
110 
111  s3_key = os.path.join(s3_destination, os.path.basename(latest_bag))
112  with tempfile.NamedTemporaryFile() as f:
113  self.s3_client.download_file(self.s3_bucket_name, s3_key, f.name)
114  bag = rosbag.Bag(f.name)
115  total_bag_messages = 0
116  for _, msg, _ in bag.read_messages():
117  total_bag_messages += 1
118 
119  self.assertEquals(total_bag_messages, total_test_messages)
120 
121 if __name__ == '__main__':
122  rostest.rosrun(PKG, NAME, TestRollingRecorderUploadOnGoal, sys.argv)


integ_tests
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:32