35 from collections
import deque
36 from python_qt_binding.QtCore
import Signal, Slot, QObject
40 from diagnostic_msgs.msg
import DiagnosticArray
45 A class which represents the status history of diagnostics 46 It can be queried for a past history of diagnostics, and paused 48 message_updated = Signal(dict)
49 queue_updated = Signal()
50 pause_changed = Signal(bool)
51 position_changed = Signal(int)
53 def __init__(self, topic, topic_type, count=30):
71 Turn off this Timeline 72 Internally, this just shuts down the subscriber 74 self._subscriber.unregister()
79 Slot, to be called to change the pause status of the timeline 81 This is generally intended to be connected to the status signal 82 from a button or checkbox 94 self.pause_changed.emit(pause)
98 """ True if this timeline is paused """ 103 ROS Callback for new diagnostic messages 105 Puts new msg into the queue, and emits a signal to let listeners know 106 that the timeline has been updated 108 If the timeline is paused, new messages are placed into a separate 109 queue and swapped back in when the timeline is unpaused 111 :type msg: Either DiagnosticArray or DiagnosticsStatus. Can be 112 determined by __init__'s arg "msg_callback". 115 dic = {status.name: status
for status
in msg.status}
118 self._paused_queue.append(dic)
120 self._queue.append(dic)
121 self.queue_updated.emit()
123 self.message_updated.emit(dic)
128 True if this timeline has received any messages. 129 False if no messages have been received yet 131 return len(self.
_queue) > 0
135 """ Get the age (in seconds) of the most recent diagnostic message """ 136 current_time = rospy.get_time()
142 """ True is this timeline is stale. """ 149 index = len(self.
_queue) + index
150 if index == len(self.
_queue) - 1:
156 max_index = len(self.
_queue) - 1
157 min_index = -len(self.
_queue)
158 index = min(index, max_index)
159 index = max(index, min_index)
162 self.message_updated.emit(self.
_queue[index])
167 self.position_changed.emit(position)
173 return [status[name]
for status
in list(self.
_queue)]
179 for msg
in list(self.
_queue):
def set_position(self, position)
def get_current_status_by_name(self, name)
def set_paused(self, pause)
def get_all_status_by_name(self, name)
def __init__(self, topic, topic_type, count=30)