greenwave_monitor.test_utils module

greenwave_monitor.test_utils.call_manage_topic_service(node: rclpy.node.Node, service_client, add: bool, topic: str, timeout_sec: float = 8.0) greenwave_monitor_interfaces.srv.ManageTopic.Response | None

Call the manage_topic service with given parameters.

greenwave_monitor.test_utils.call_set_frequency_service(node: rclpy.node.Node, service_client, topic_name: str, expected_hz: float = 0.0, tolerance_percent: float = 0.0, clear: bool = False, add_if_missing: bool = True, timeout_sec: float = 8.0) greenwave_monitor_interfaces.srv.SetExpectedFrequency.Response | None

Call the set_expected_frequency service with given parameters.

greenwave_monitor.test_utils.collect_diagnostics_for_topic(node: rclpy.node.Node, topic_name: str, expected_count: int = 5, timeout_sec: float = 10.0) List[diagnostic_msgs.msg.DiagnosticStatus]

Collect diagnostic messages for a specific topic.

greenwave_monitor.test_utils.create_minimal_publisher(topic: str, frequency_hz: float, message_type: str, id_suffix: str = '')

Create a minimal publisher node with the given parameters.

greenwave_monitor.test_utils.create_monitor_node(namespace: str = 'test_namespace', node_name: str = 'test_greenwave_monitor', parameters: List[dict[str, Any]] = None)

Create a greenwave_monitor node for testing.

greenwave_monitor.test_utils.create_service_clients(node: rclpy.node.Node, namespace: str = 'test_namespace', node_name: str = 'test_greenwave_monitor')

Create service clients for the monitor node.

greenwave_monitor.test_utils.find_best_diagnostic(diagnostics: List[diagnostic_msgs.msg.DiagnosticStatus], expected_frequency: float, message_type: str) Tuple[diagnostic_msgs.msg.DiagnosticStatus | None, Tuple[float, float, float] | None]

Find the diagnostic message with frequency closest to expected.

greenwave_monitor.test_utils.verify_diagnostic_values(status: diagnostic_msgs.msg.DiagnosticStatus, values: Tuple[float, float, float], expected_frequency: float, message_type: str, tolerance_hz: float) List[str]

Verify diagnostic values and return list of assertion errors.

greenwave_monitor.test_utils.wait_for_service_connection(node: rclpy.node.Node, service_client, timeout_sec: float = 3.0, service_name: str = 'service') bool

Wait for a service to become available.