launch_testing_ros.wait_for_topics module
- class launch_testing_ros.wait_for_topics.WaitForTopics(topic_tuples, timeout=5.0, messages_received_buffer_length=10)
Bases:
object
Wait to receive messages on supplied topics.
Example usage:
from std_msgs.msg import String
# Method 1, using the 'with' keyword def method_1(): topic_list = [('topic_1', String), ('topic_2', String)] with WaitForTopics(topic_list, timeout=5.0): # 'topic_1' and 'topic_2' received at least one message each print('Given topics are receiving messages !') # Method 2, calling wait() and shutdown() manually def method_2(): topic_list = [('topic_1', String), ('topic_2', String)] wait_for_topics = WaitForTopics(topic_list, timeout=5.0) assert wait_for_topics.wait() print('Given topics are receiving messages !') print(wait_for_topics.topics_not_received()) # Should be an empty set print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'} print(wait_for_topics.messages_received('topic_1')) # Should be [message_1, ...] wait_for_topics.shutdown()
- received_messages(topic_name)
List of received messages of a specific topic.
- shutdown()
- topics_not_received()
Topics that did not receive any messages.
- topics_received()
Topics that received at least one message.
- wait()