launch_testing_ros package

Subpackages

Submodules

Module contents

class launch_testing_ros.DataRepublisher(node, listen_topic, publish_topic, msg_type, transform_fn)

Bases: object

Republish mesasges with a transform function applied.

get_num_received()

Get the number of messages received on the listen_topic.

get_num_republished()

Get the number of messages published on publish_topic.

This may be lower than get_num_received if the transform_fn indicated a message should be dropped

get_republished()

Get a list of all of the transformed messages republished.

shutdown()

Stop republishing messages.

class launch_testing_ros.LaunchTestRunner(*args: Any, **kwargs: Any)

Bases: LaunchTestRunner

class launch_testing_ros.MessagePump(node, context=None)

Bases: object

Calls rclpy.spin on a thread so tests don’t need to.

start()
stop()
class launch_testing_ros.WaitForTopics(topic_tuples, timeout=5.0, messages_received_buffer_length=10)

Bases: object

Wait to receive messages on supplied topics.

Example usage:

from std_msgs.msg import String

# Method 1, using the 'with' keyword
def method_1():
    topic_list = [('topic_1', String), ('topic_2', String)]
    with WaitForTopics(topic_list, timeout=5.0):
        # 'topic_1' and 'topic_2' received at least one message each
        print('Given topics are receiving messages !')

# Method 2, calling wait() and shutdown() manually
def method_2():
    topic_list = [('topic_1', String), ('topic_2', String)]
    wait_for_topics = WaitForTopics(topic_list, timeout=5.0)
    assert wait_for_topics.wait()
    print('Given topics are receiving messages !')
    print(wait_for_topics.topics_not_received()) # Should be an empty set
    print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'}
    print(wait_for_topics.messages_received('topic_1')) # Should be [message_1, ...]
    wait_for_topics.shutdown()
received_messages(topic_name)

List of received messages of a specific topic.

shutdown()
topics_not_received()

Topics that did not receive any messages.

topics_received()

Topics that received at least one message.

wait()