launch_testing_ros.wait_for_topics module

class launch_testing_ros.wait_for_topics.WaitForTopics(topic_tuples, timeout=5.0, messages_received_buffer_length=10)

Bases: object

Wait to receive messages on supplied topics.

Example usage:

from std_msgs.msg import String

# Method 1, using the 'with' keyword
def method_1():
    topic_list = [('topic_1', String), ('topic_2', String)]
    with WaitForTopics(topic_list, timeout=5.0):
        # 'topic_1' and 'topic_2' received at least one message each
        print('Given topics are receiving messages !')

# Method 2, calling wait() and shutdown() manually
def method_2():
    topic_list = [('topic_1', String), ('topic_2', String)]
    wait_for_topics = WaitForTopics(topic_list, timeout=5.0)
    assert wait_for_topics.wait()
    print('Given topics are receiving messages !')
    print(wait_for_topics.topics_not_received()) # Should be an empty set
    print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'}
    print(wait_for_topics.messages_received('topic_1')) # Should be [message_1, ...]
    wait_for_topics.shutdown()
received_messages(topic_name)

List of received messages of a specific topic.

shutdown()
topics_not_received()

Topics that did not receive any messages.

topics_received()

Topics that received at least one message.

wait()