21 from std_msgs.msg
import String
29 from rolling_recorder_test_base
import RollingRecorderTestBase
31 PKG =
'rosbag_uploader_ros1_integration_tests' 32 NAME =
'rolling_recorder_default_topics' 43 topic_name =
'/my_random_topic_' +
''.join([random.choice(string.ascii_letters + string.digits)
for _
in range(8)])
49 rospy.loginfo(
"Active rosbag: %s" % active_rosbag)
50 active_rosbag_start_time = os.path.getctime(active_rosbag)
54 bag_finish_time_remaining = bag_finish_time - time.time()
55 rospy.loginfo(
"Bag finish time remaining: %f" % bag_finish_time_remaining)
58 total_test_messages = 10
59 sleep_between_message = (bag_finish_time_remaining * 0.5) / total_test_messages
60 rospy.loginfo(
"Sleep between messages: %f" % sleep_between_message)
61 for x
in range(total_test_messages):
62 self.test_publisher.publish(
"Test message %d" % x)
63 time.sleep(sleep_between_message)
68 bag_finish_time_remaining = bag_finish_time - time.time()
73 rospy.loginfo(
"Latest bags: %s " % latest_bags)
75 total_topic_messages = 0
76 for bag_path
in latest_bags:
78 for topic, msg, _
in bag.read_messages():
80 if topic == topic_name:
81 total_topic_messages += 1
84 self.assertEquals(total_topic_messages, total_test_messages)
89 self.assertTrue(total_messages > total_topic_messages)
91 if __name__ ==
'__main__':
92 rostest.rosrun(PKG, NAME, TestRollingRecorderDefaultTopics, sys.argv)
def get_latest_bag_by_regex(self, regex_pattern)
def wait_for_rolling_recorder_node_to_subscribe_to_topic(self)
def wait_for_rolling_recorder_nodes(self, timeout=5)
def test_record_default_topics(self)
def get_latest_bags_by_regex(self, regex_pattern, count)