launch_testing_ros package
Subpackages
Submodules
Module contents
- class launch_testing_ros.DataRepublisher(node, listen_topic, publish_topic, msg_type, transform_fn)
Bases:
object
Republish mesasges with a transform function applied.
- get_num_received()
Get the number of messages received on the listen_topic.
- get_num_republished()
Get the number of messages published on publish_topic.
This may be lower than get_num_received if the transform_fn indicated a message should be dropped
- get_republished()
Get a list of all of the transformed messages republished.
- shutdown()
Stop republishing messages.
- class launch_testing_ros.LaunchTestRunner(*args: Any, **kwargs: Any)
Bases:
LaunchTestRunner
- class launch_testing_ros.MessagePump(node, context=None)
Bases:
object
Calls rclpy.spin on a thread so tests don’t need to.
- start()
- stop()
- class launch_testing_ros.WaitForTopics(topic_tuples, timeout=5.0, messages_received_buffer_length=10)
Bases:
object
Wait to receive messages on supplied topics.
Example usage:
from std_msgs.msg import String
# Method 1, using the 'with' keyword def method_1(): topic_list = [('topic_1', String), ('topic_2', String)] with WaitForTopics(topic_list, timeout=5.0): # 'topic_1' and 'topic_2' received at least one message each print('Given topics are receiving messages !') # Method 2, calling wait() and shutdown() manually def method_2(): topic_list = [('topic_1', String), ('topic_2', String)] wait_for_topics = WaitForTopics(topic_list, timeout=5.0) assert wait_for_topics.wait() print('Given topics are receiving messages !') print(wait_for_topics.topics_not_received()) # Should be an empty set print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'} print(wait_for_topics.messages_received('topic_1')) # Should be [message_1, ...] wait_for_topics.shutdown()
- received_messages(topic_name)
List of received messages of a specific topic.
- shutdown()
- topics_not_received()
Topics that did not receive any messages.
- topics_received()
Topics that received at least one message.
- wait()