launch_testing_ros.wait_for_topics module

class launch_testing_ros.wait_for_topics.WaitForTopics(topic_tuples, timeout=5.0, messages_received_buffer_length=10, trigger=None, node_namespace=None)

Bases: object

Wait to receive messages on supplied topics.

Example usage:

from std_msgs.msg import String

# Method 1, using the 'with' keyword
def method_1():
    topic_list = [('topic_1', String), ('topic_2', String)]
    with WaitForTopics(topic_list, timeout=5.0):
        # 'topic_1' and 'topic_2' received at least one message each
        print('Given topics are receiving messages !')

# Method 2, calling wait() and shutdown() manually
def method_2():
    topic_list = [('topic_1', String), ('topic_2', String)]
    wait_for_topics = WaitForTopics(topic_list, timeout=5.0)
    assert wait_for_topics.wait()
    print('Given topics are receiving messages !')
    print(wait_for_topics.topics_not_received()) # Should be an empty set
    print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'}
    print(wait_for_topics.messages_received('topic_1')) # Should be [message_1, ...]
    wait_for_topics.shutdown()

# Method3, calling a trigger function before the wait. The trigger function takes
# the WaitForTopics node object as the first argument. Any additional arguments have
# to be passed to the wait(*args, **kwargs) method directly.
def trigger_function(node, arg=""):
    node.get_logger().info('Trigger function called with argument: ' + arg)

def method_3():
    topic_list = [('topic_1', String), ('topic_2', String)]
    wait_for_topics = WaitForTopics(topic_list, timeout=5.0, trigger=trigger_function)
    # The trigger function will be called inside the wait() method after the
    # subscribers are created and before the publishers are connected.
    assert wait_for_topics.wait("Hello World!")
    print('Given topics are receiving messages !')
    wait_for_topics.shutdown()
received_messages(topic_name)

List of received messages of a specific topic.

shutdown()
topics_not_received()

Topics that did not receive any messages.

topics_received()

Topics that received at least one message.

wait(*args, **kwargs)