35 from collections
import deque
36 from python_qt_binding.QtCore
import Signal, Slot, QObject
41 from diagnostic_msgs.msg
import DiagnosticArray
46 A class which represents the status history of diagnostics 47 It can be queried for a past history of diagnostics, and paused 49 message_updated = Signal(dict)
50 queue_updated = Signal()
51 pause_changed = Signal(bool)
52 position_changed = Signal(int)
54 def __init__(self, topic, topic_type, count=30):
73 Turn off this Timeline 74 Internally, this just shuts down the subscriber 81 Slot, to be called to change the pause status of the timeline 83 This is generally intended to be connected to the status signal 84 from a button or checkbox 101 """ True if this timeline is paused """ 107 ROS Callback for new diagnostic messages 109 Puts new msg into the queue, and emits a signal to let listeners know 110 that the timeline has been updated 112 If the timeline is paused, new messages are placed into a separate 113 queue and swapped back in when the timeline is unpaused 115 :type msg: Either DiagnosticArray or DiagnosticsStatus. Can be 116 determined by __init__'s arg "msg_callback". 119 dic = {status.name: status
for status
in msg.status}
133 True if this timeline has received any messages. 134 False if no messages have been received yet 137 return len(self.
_queue) > 0
141 """ Get the age (in seconds) of the most recent diagnostic message """ 142 current_time = rospy.get_time()
148 """ True is this timeline is stale. """ 156 index = len(self.
_queue) + index
157 if index == len(self.
_queue) - 1:
164 max_index = len(self.
_queue) - 1
165 min_index = -len(self.
_queue)
166 index = min(index, max_index)
167 index = max(index, min_index)
184 return [status[name]
for status
in list(self.
_queue)]
194 for msg
in list(self.
_queue):
def set_position(self, position)
def get_current_status_by_name(self, name)
def set_paused(self, pause)
def get_all_status_by_name(self, name)
def __init__(self, topic, topic_type, count=30)