topic_monitor.scripts.topic_monitor module

class topic_monitor.scripts.topic_monitor.DataReceivingThread(topic_monitor: TopicMonitor, options: Namespace)

Bases: Thread

run() None

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

stop() None
class topic_monitor.scripts.topic_monitor.MonitoredTopic(topic_id: str, stale_time: float, lock: lock)

Bases: object

Monitor for the statistics and status of a single topic.

allowed_latency_timer_callback() None
check_status(current_time: float | None = None) bool
current_reception_rate(window_size: int) float | None
get_data_from_msg(msg: std_msgs.msg.Header) int
increment_expected_value() None
topic_data_callback(msg: std_msgs.msg.Header, logger_: rclpy.impl.rcutils_logger.RcutilsLogger = rclpy.logging.get_logger) None
class topic_monitor.scripts.topic_monitor.TopicMonitor(window_size: int)

Bases: object

Monitor of a set of topics that match a specified topic name pattern.

add_monitored_topic(topic_type: type[std_msgs.msg.Header], topic_name: str, node: rclpy.node.Node, qos_profile: rclpy.qos.QoSProfile, expected_period: float = 1.0, allowed_latency: float = 1.0, stale_time: float = 1.0) None
calculate_statistics() None
check_status() bool
get_topic_info(topic_name: str) dict[str, str] | None

Infer topic info (e.g. QoS reliability) from the topic name.

get_window_size() int
is_supported_type(type_name: str) bool
output_status() None
update_topic_statuses() bool
class topic_monitor.scripts.topic_monitor.TopicMonitorDisplay(topic_monitor: TopicMonitor, update_period: float)

Bases: object

Display of the monitored topic reception rates.

add_monitored_topic(topic_name: str) None
make_plot() None
update_display() None
topic_monitor.scripts.topic_monitor.main() None
topic_monitor.scripts.topic_monitor.run_topic_listening(node: rclpy.node.Node, topic_monitor: TopicMonitor, options: Namespace) None

Subscribe to relevant topics and manage the data received from subscriptions.