19 from std_msgs.msg
import String
27 from rolling_recorder_test_base
import RollingRecorderTestBase
29 PKG =
'rosbag_uploader_ros1_integration_tests' 30 NAME =
'rolling_recorder_custom_topic' 46 rospy.loginfo(
"Active rosbag: %s" % active_rosbag)
47 active_rosbag_start_time = os.path.getctime(active_rosbag)
51 bag_finish_time_remaining = bag_finish_time - time.time()
52 rospy.loginfo(
"Bag finish time remaining: %f" % bag_finish_time_remaining)
55 total_test_messages = 10
56 sleep_between_message = (bag_finish_time_remaining * 0.5) / total_test_messages
57 rospy.loginfo(
"Sleep between messages: %f" % sleep_between_message)
58 for x
in range(total_test_messages):
59 self.test_publisher.publish(
"Test message %d" % x)
60 time.sleep(sleep_between_message)
63 bag_finish_time_remaining = bag_finish_time - time.time()
64 rospy.loginfo(
"Bag finish time remaining after publish: %f" % bag_finish_time_remaining)
67 time.sleep(bag_finish_time_remaining + 0.5)
71 rospy.loginfo(
"Latest bag: %s " % latest_bag)
73 total_bag_messages = 0
74 for _, msg, _
in bag.read_messages():
75 total_bag_messages += 1
77 self.assertEquals(total_bag_messages, total_test_messages)
79 if __name__ ==
'__main__':
80 rostest.rosrun(PKG, NAME, TestRollingRecorderCustomTopic, sys.argv)
def get_latest_bag_by_regex(self, regex_pattern)
def test_record_custom_topic(self)
def wait_for_rolling_recorder_node_to_subscribe_to_topic(self)
def wait_for_rolling_recorder_nodes(self, timeout=5)