35 from collections
import deque
36 from python_qt_binding.QtCore
import Signal, Slot, QObject
41 from diagnostic_msgs.msg
import DiagnosticArray
46 A class which represents the status history of diagnostics 47 It can be queried for a past history of diagnostics, and paused 49 message_updated = Signal(dict)
50 queue_updated = Signal()
51 pause_changed = Signal(bool)
52 position_changed = Signal(int)
54 def __init__(self, topic, topic_type, count=30):
73 Turn off this Timeline 74 Internally, this just shuts down the subscriber 76 self._subscriber.unregister()
81 Slot, to be called to change the pause status of the timeline 83 This is generally intended to be connected to the status signal 84 from a button or checkbox 97 self.pause_changed.emit(pause)
101 """ True if this timeline is paused """ 107 ROS Callback for new diagnostic messages 109 Puts new msg into the queue, and emits a signal to let listeners know 110 that the timeline has been updated 112 If the timeline is paused, new messages are placed into a separate 113 queue and swapped back in when the timeline is unpaused 115 :type msg: Either DiagnosticArray or DiagnosticsStatus. Can be 116 determined by __init__'s arg "msg_callback". 119 dic = {status.name: status
for status
in msg.status}
123 self._paused_queue.append(dic)
125 self._queue.append(dic)
126 self.queue_updated.emit()
128 self.message_updated.emit(dic)
133 True if this timeline has received any messages. 134 False if no messages have been received yet 137 return len(self.
_queue) > 0
141 """ Get the age (in seconds) of the most recent diagnostic message """ 142 current_time = rospy.get_time()
148 """ True is this timeline is stale. """ 156 index = len(self.
_queue) + index
157 if index == len(self.
_queue) - 1:
164 max_index = len(self.
_queue) - 1
165 min_index = -len(self.
_queue)
166 index = min(index, max_index)
167 index = max(index, min_index)
170 self.message_updated.emit(self.
_queue[index])
175 self.position_changed.emit(position)
184 return [status[name]
for status
in list(self.
_queue)]
194 for msg
in list(self.
_queue):
def set_position(self, position)
def get_current_status_by_name(self, name)
def set_paused(self, pause)
def get_all_status_by_name(self, name)
def __init__(self, topic, topic_type, count=30)