28 from s3_client
import S3Client
30 from recorder_msgs.msg
import RollingRecorderAction, RollingRecorderGoal
31 from rolling_recorder_test_base
import RollingRecorderTestBase
32 from std_msgs.msg
import String
34 PKG =
'rosbag_uploader_ros1_integration_tests' 35 NAME =
'rolling_recorder_custom_topic' 36 ACTION =
'/rolling_recorder/RosbagRollingRecord' 38 GOAL_COMPLETION_TIMEOUT_SEC = 30.0
42 super(TestRollingRecorderUploadOnGoal, self).
setUp()
43 self.
s3_region = rospy.get_param(
'/s3_file_uploader/aws_client_configuration/region')
50 super(TestRollingRecorderUploadOnGoal, self).
setUp()
67 rospy.loginfo(
"Active rosbag: %s" % active_rosbag)
68 active_rosbag_start_time = os.path.getctime(active_rosbag)
69 start_time = rospy.Time.from_sec(active_rosbag_start_time - 1)
73 bag_finish_time_remaining = bag_finish_time - time.time()
74 rospy.loginfo(
"Bag finish time remaining: %f" % bag_finish_time_remaining)
77 total_test_messages = 10
78 sleep_between_message = (bag_finish_time_remaining * 0.5) / total_test_messages
79 rospy.loginfo(
"Sleep between messages: %f" % sleep_between_message)
80 for x
in range(total_test_messages):
81 self.test_publisher.publish(
"Test message %d" % x)
82 time.sleep(sleep_between_message)
85 bag_finish_time_remaining = bag_finish_time - time.time()
86 rospy.loginfo(
"Bag finish time remaining after publish: %f" % bag_finish_time_remaining)
89 time.sleep(bag_finish_time_remaining + 0.5)
96 res = self.action_client.wait_for_server()
97 self.assertTrue(res,
'Failed to connect to rolling recorder action server')
100 s3_folder =
'test_rr/' 101 s3_subfolder =
''.join([random.choice(string.ascii_letters + string.digits)
for _
in range(8)])
102 s3_destination = os.path.join(s3_folder, s3_subfolder)
103 end_time = rospy.Time.now()
104 goal = RollingRecorderGoal(destination=s3_destination)
105 self.action_client.send_goal(goal)
106 res = self.action_client.wait_for_result(rospy.Duration.from_sec(GOAL_COMPLETION_TIMEOUT_SEC))
107 self.assertTrue(res,
"Rolling Recorder Goal timed out")
108 result = self.action_client.get_result()
109 self.assertEquals(result.result.result, RESULT_SUCCESS)
111 s3_key = os.path.join(s3_destination, os.path.basename(latest_bag))
112 with tempfile.NamedTemporaryFile()
as f:
115 total_bag_messages = 0
116 for _, msg, _
in bag.read_messages():
117 total_bag_messages += 1
119 self.assertEquals(total_bag_messages, total_test_messages)
121 if __name__ ==
'__main__':
122 rostest.rosrun(PKG, NAME, TestRollingRecorderUploadOnGoal, sys.argv)
def get_latest_bag_by_regex(self, regex_pattern)
def wait_for_rolling_recorder_node_to_subscribe_to_topic(self)
def wait_for_rolling_recorder_nodes(self, timeout=5)
def test_record_custom_topic(self)