40 from python_qt_binding.QtCore
import Qt, QTimer, qWarning, Signal
42 from python_qt_binding.QtGui
import QGraphicsScene, QMessageBox
44 from python_qt_binding.QtWidgets
import QGraphicsScene, QMessageBox
48 from .dynamic_timeline_frame
import DynamicTimelineFrame
50 from .message_loader_thread
import MessageLoaderThread
58 BagTimeline contains bag files, all information required to display the bag data visualization on the screen 61 status_bar_changed_signal = Signal()
62 selected_region_changed = Signal(rospy.Time, rospy.Time)
63 timeline_updated = Signal()
64 Topic = collections.namedtuple(
'Topic', [
'subscriber',
'queue'])
65 Message = collections.namedtuple(
'Message', [
'stamp',
'message'])
69 :param context: plugin context hook to enable adding rqt_bag plugin widgets as ROS_GUI snapin panes, ''PluginContext'' 71 super(DynamicTimeline, self).
__init__()
103 self._play_timer.timeout.connect(self.
on_idle)
104 self._play_timer.setInterval(3)
116 self.setBackgroundBrush(Qt.white)
118 self._timeline_frame.setPos(0, 0)
129 :returns: the ROS_GUI context, 'PluginContext' 139 self._redraw_timer.shutdown()
144 Cleans up the timeline, subscribers and any threads 150 self._play_timer.stop()
157 self._recorder.stop()
160 self._timeline_frame.handle_close()
162 self.
_topics[topic][0].unregister()
165 self._context.remove_widget(frame)
169 playhead = self._timeline_frame._playhead
170 end =
True if playhead >= self._timeline_frame.play_region[1]
else False 171 start =
True if playhead <= self._timeline_frame.play_region[0]
else False 177 self._timeline_frame.reset_zoom()
183 self._timeline_frame._set_playhead(self._timeline_frame.play_region[1])
185 self._timeline_frame._set_playhead(self._timeline_frame.play_region[0])
188 self._timeline_frame._set_playhead(playhead)
190 self.timeline_updated.emit()
194 Called whenever a message is received on any of the subscribed topics 196 :param topic: the topic on which the message was received 197 :param msg: the message received 200 message = self.
Message(stamp=rospy.Time.now(), message=msg)
202 self.
_topics[topic].queue.append(message)
205 with self._timeline_frame.index_cache_cv:
206 self._timeline_frame.invalidated_caches.add(topic)
209 self._timeline_frame.index_cache_cv.notify()
212 """Creates an indexing thread for the new topic. Fixes the borders and notifies 213 the indexing thread to index the new items bags 215 :param topic: a topic to listen to 216 :param type: type of the topic to listen to 217 :param num_msgs: number of messages to retain on this topic. If this 218 value is exceeded, the oldest messages will be dropped 220 :return: false if topic already in the timeline, true otherwise 225 self.
_topics[topic] = self.
Topic(subscriber=rospy.Subscriber(topic, type, queue_size=1, callback=self.
topic_callback, callback_args=topic),
226 queue=collections.deque(maxlen=num_msgs))
227 self._datatypes.setdefault(type, []).append(topic)
240 self._timeline_frame.reset_zoom()
243 with self._timeline_frame.index_cache_cv:
244 self._timeline_frame.invalidated_caches.add(topic)
245 if topic
in self._timeline_frame.index_cache:
246 del self._timeline_frame.index_cache[topic]
247 self._timeline_frame.index_cache_cv.notify()
255 :return: stamp of the first message received on any of the topics, or none if no messages have been received, ''rospy.Time'' 259 for topic_name, topic_tuple
in self._topics.iteritems():
260 topic_start_stamp = topic_helper.get_start_stamp(topic_tuple)
261 if topic_start_stamp
is not None and (start_stamp
is None or topic_start_stamp < start_stamp):
262 start_stamp = topic_start_stamp
269 :return: stamp of the last message received on any of the topics, or none if no messages have been received, ''rospy.Time'' 273 for topic_name, topic_tuple
in self._topics.iteritems():
274 topic_end_stamp = topic_helper.get_end_stamp(topic_tuple)
275 if topic_end_stamp
is not None and (end_stamp
is None or topic_end_stamp > end_stamp):
276 end_stamp = topic_end_stamp
281 :return: sorted list of topic names, ''list(str)'' 287 return sorted(topics)
291 :return: dict of list of topics for each datatype, ''dict(datatype:list(topic))'' 298 :return: datatype associated with a topic, ''str'' 299 :raises: if there are multiple datatypes assigned to a single topic, ''Exception'' 305 if len(topic_types) == 1:
306 raise Exception(
"Topic {0} had multiple datatypes ({1}) associated with it".format(topic, str(topic_types)))
307 topic_types.append(datatype._type)
312 return topic_types[0]
316 generator function for topic entries 317 :param topics: list of topics to query, ''list(str)'' 318 :param start_stamp: stamp to start at, ''rospy.Time'' 319 :param end_stamp: stamp to end at, ''rospy,Time'' 320 :returns: messages on the given topics in chronological order, ''msg'' 327 rospy.logwarn(
"Dynamic Timeline : Topic {0} was not in the topic list. Skipping.".format(topic))
331 topic_start_time = topic_helper.get_start_stamp(self.
_topics[topic])
332 if topic_start_time
is not None and topic_start_time > end_stamp:
335 topic_end_time = topic_helper.get_end_stamp(self.
_topics[topic])
336 if topic_end_time
is not None and topic_end_time < start_stamp:
339 topic_queue = self.
_topics[topic].queue
340 start_ind, first_entry = self.
_entry_at(start_stamp, topic_queue)
344 if first_entry.stamp < start_stamp:
349 end_ind, last_entry = self.
_entry_at(end_stamp, topic_queue)
351 topic_entries.extend(list(itertools.islice(topic_queue, start_ind, end_ind + 1)))
353 for entry
in sorted(topic_entries, key=
lambda x: x.stamp):
358 generator function for bag entries 359 :param topics: list of topics to query, ''list(str)'' 360 :param start_stamp: stamp to start at, ''rospy.Time'' 361 :param end_stamp: stamp to end at, ''rospy,Time'' 362 :returns: tuple of (bag, entry) for the entries in the bag file, ''(rosbag.bag, msg)'' 365 from rosbag
import bag
370 bag_start_time = bag_helper.get_start_stamp(b)
371 if bag_start_time
is not None and bag_start_time > end_stamp:
374 bag_end_time = bag_helper.get_end_stamp(b)
375 if bag_end_time
is not None and bag_end_time < start_stamp:
378 connections = list(b._get_connections(topic))
379 it = iter(b._get_entries(connections, start_stamp, end_stamp))
381 bag_entries.append(it)
383 for entry, it
in bag._mergesort(bag_entries, key=
lambda entry: entry.time):
384 yield bag_by_iter[it], entry
387 """Get the entry and index in the queue at the given time. 389 :param ``rospy.Time`` t: time to check 390 :param ``collections.deque`` queue: deque to look at 392 :return: (index, Message) tuple. If there is no message in the queue at 393 the exact time, the previous index is returned. If the time is 394 before or after the first and last message times, the first or last 395 index and message are returned 400 ind = bisect.bisect(queue, self.
Message(stamp=t, message=
''))
403 if ind == len(queue):
404 return (ind - 1, queue[-1])
413 return (ind - 1, queue[ind - 1])
416 """Get a message in the queues for a specific topic 417 :param ``rospy.Time`` t: time of the message to retrieve 418 :param str topic: the topic to be accessed 419 :return: message corresponding to time t and topic. If there is no 420 corresponding entry at exactly the given time, the latest entry 421 before the given time is returned. If the topic does not exist, or 422 there is no entry, None. 434 Get the latest message before the given time on any of the topics 435 :param t: time, ``rospy.Time`` 436 :return: tuple of (topic, entry) corresponding to time t, ``(str, msg)`` 439 entry_topic, entry =
None,
None 441 _, topic_entry = self.
_entry_at(t - rospy.Duration(0,1), self.
_topics[topic].queue)
442 if topic_entry
and (
not entry
or topic_entry.stamp > entry.stamp):
443 entry_topic, entry = topic, topic_entry
445 return entry_topic, entry
449 Get the earliest message on any topic after the given time 450 :param t: time, ''rospy.Time'' 451 :return: tuple of (bag, entry) corresponding to time t, ''(rosbag.bag, msg)'' 454 entry_topic, entry =
None,
None 460 ind = ind + 1
if ind + 1 < len(self.
_topics[topic].queue)
else ind
461 topic_entry = self.
_topics[topic].queue[ind]
462 if topic_entry
and (
not entry
or topic_entry.stamp < entry.stamp):
463 entry_topic, entry = topic, topic_entry
465 return entry_topic, entry
469 :return: time of the message after the current playhead position,''rospy.Time'' 471 if self._timeline_frame.playhead
is None:
476 return self._timeline_frame._end_stamp
482 :return: time of the message before the current playhead position,''rospy.Time'' 484 if self._timeline_frame.playhead
is None:
489 return self._timeline_frame._start_stamp
495 self._player.resume()
501 Verify that a background task is not currently running before starting a new one 502 :param background_task: name of the background task, ''str'' 505 QMessageBox(QMessageBox.Warning,
'Exclamation',
'Background operation already running:\n\n%s' % self.
background_task, QMessageBox.Ok).exec_()
516 if len(self._bags) > 0:
517 self.
_export_region(filename, self._timeline_frame.topics, self._timeline_frame.play_region[0], self._timeline_frame.play_region[1])
521 Starts a thread to save the current selection to a new bag file 522 :param path: filesystem path to write to, ''str'' 523 :param topics: topics to write to the file, ''list(str)'' 524 :param start_stamp: start of area to save, ''rospy.Time'' 525 :param end_stamp: end of area to save, ''rospy.Time'' 536 total_messages = len(bag_entries)
539 if total_messages == 0:
540 QMessageBox(QMessageBox.Warning,
'rqt_bag',
'No messages found', QMessageBox.Ok).exec_()
548 QMessageBox(QMessageBox.Warning,
'rqt_bag',
'Error opening bag file [%s] for writing' % path, QMessageBox.Ok).exec_()
554 self._export_thread.start()
558 Threaded function that saves the current selection to a new bag file 559 :param export_bag: bagfile to write to, ''rosbag.bag'' 560 :param topics: topics to write to the file, ''list(str)'' 561 :param start_stamp: start of area to save, ''rospy.Time'' 562 :param end_stamp: end of area to save, ''rospy.Time'' 564 total_messages = len(bag_entries)
565 update_step = max(1, total_messages / 100)
569 for bag, entry
in bag_entries:
574 export_bag.write(topic, msg, t)
575 except Exception
as ex:
576 qWarning(
'Error exporting message at position %s: %s' % (str(entry.position), str(ex)))
581 if message_num % update_step == 0
or message_num == total_messages:
582 new_progress = int(100.0 * (float(message_num) / total_messages))
583 if new_progress != progress:
584 progress = new_progress
587 self.status_bar_changed_signal.emit()
594 self.status_bar_changed_signal.emit()
596 except Exception
as ex:
597 QMessageBox(QMessageBox.Warning,
'rqt_bag',
'Error closing bag file [%s]: %s' % (export_bag.filename, str(ex)), QMessageBox.Ok).exec_()
602 return self.
get_entry(position, topic).message
606 When the user clicks down in the timeline. 608 if event.buttons() == Qt.LeftButton:
609 self._timeline_frame.on_left_down(event)
610 elif event.buttons() == Qt.MidButton:
611 self._timeline_frame.on_middle_down(event)
612 elif event.buttons() == Qt.RightButton:
613 topic = self._timeline_frame.map_y_to_topic(event.y())
617 self._timeline_frame.on_mouse_up(event)
620 self._timeline_frame.on_mouse_move(event)
623 self._timeline_frame.on_mousewheel(event)
628 self._timeline_frame.zoom_in()
631 self._timeline_frame.zoom_out()
634 self._timeline_frame.reset_zoom()
637 self._timeline_frame.translate_timeline_left()
640 self._timeline_frame.translate_timeline_right()
644 return self.
_player and self._player.is_publishing(topic)
650 self._player.start_publishing(topic)
657 self._player.stop_publishing(topic)
665 self._player.start_clock_publishing()
666 except Exception
as ex:
667 qWarning(
'Error starting player; aborting publish: %s' % str(ex))
674 for topic
in self._timeline_frame.topics:
678 for topic
in self._timeline_frame.topics:
695 play_all = property(_get_play_all, _set_play_all)
706 moves the playhead to the next position based on the desired position 721 Moves the playhead a fixed distance into the future based on the current play speed 723 if self.
play_speed == 0.0
or not self._timeline_frame.playhead:
728 now = rospy.Time.from_sec(time.time())
732 new_playhead = self.end_stamp
734 new_playhead = self._timeline_frame.playhead + rospy.Duration.from_sec((now - self.
last_frame).to_sec() * self.
play_speed)
736 start_stamp, end_stamp = self._timeline_frame.play_region
738 if new_playhead > end_stamp:
741 new_playhead = start_stamp
743 new_playhead = end_stamp
745 new_playhead = end_stamp
750 elif new_playhead < start_stamp:
753 new_playhead = end_stamp
755 new_playhead = start_stamp
757 new_playhead = start_stamp
760 self._timeline_frame.playhead = new_playhead
767 Move the playhead to the next message 769 if self.
play_speed <= 0.0
or not self._timeline_frame.playhead:
778 delta = rospy.Time.from_sec(time.time()) - self.
last_frame 779 if delta > rospy.Duration.from_sec(0.1):
780 delta = rospy.Duration.from_sec(0.1)
787 self._timeline_frame.playhead = next_message_time
791 self.
last_frame = rospy.Time.from_sec(time.time())
796 def record_bag(self, filename, all=True, topics=[], regex=False, limit=0):
798 self.
_recorder =
Recorder(filename, bag_lock=self._bag_lock, all=all, topics=topics, regex=regex, limit=limit)
799 except Exception, ex:
800 qWarning(
'Error opening bag for recording [%s]: %s' % (filename, str(ex)))
805 self.add_bag(self._recorder.bag)
807 self._recorder.start()
810 self._timeline_frame._index_cache_thread.period = 0.1
816 self._recorder.toggle_paused()
820 if self._timeline_frame._start_stamp
is None:
821 self._timeline_frame._start_stamp = t
822 self._timeline_frame._end_stamp = t
823 self._timeline_frame._playhead = t
824 elif self._timeline_frame._end_stamp
is None or t > self._timeline_frame._end_stamp:
825 self._timeline_frame._end_stamp = t
827 if not self._timeline_frame.topics
or topic
not in self._timeline_frame.topics:
835 if self._timeline_frame._stamp_left
is None:
839 with self._timeline_frame.index_cache_cv:
840 self._timeline_frame.invalidated_caches.add(topic)
841 self._timeline_frame.index_cache_cv.notify()
846 listener.timeline_changed()
847 except Exception, ex:
848 qWarning(
'Error calling timeline_changed on %s: %s' % (type(listener), str(ex)))
852 self._views.append(frame)
858 self._listeners.setdefault(topic, []).append(listener)
869 topic_listeners = self._listeners.get(topic)
870 if topic_listeners
is not None and listener
in topic_listeners:
871 topic_listeners.remove(listener)
873 if len(topic_listeners) == 0:
886 if self._timeline_frame._paused:
896 elif play_speed < 0.0:
905 play_speed = property(_get_play_speed, _set_play_speed)
915 self.
last_frame = rospy.Time.from_sec(time.time())
917 self._play_timer.start()
921 self._play_timer.stop()
937 new_play_speed = -1.0
954 self._timeline_frame.playhead = self._timeline_frame.play_region[0]
957 self._timeline_frame.playhead = self._timeline_frame.play_region[1]
def _stop_redraw_timer(self)
def remove_listener(self, topic, listener)
def start_publishing(self, topic)
def get_entries_with_bags(self, topic, start_stamp, end_stamp)
def _set_play_all(self, play_all)
def on_mouse_move(self, event)
def _set_play_speed(self, play_speed)
_message_listener_threads
def on_mouse_up(self, event)
def on_idle(self)
Playing.
def record_bag(self, filename, all=True, topics=[], regex=False, limit=0)
Recording.
def on_mouse_down(self, event)
def get_datatype(self, topic)
def add_topic(self, topic, type, num_msgs=20)
def translate_timeline_right(self)
def toggle_recording(self)
def _get_start_stamp(self)
def is_publishing(self, topic)
Publishing.
def _message_recorded(self, topic, msg, t)
def copy_region_to_bag(self, filename)
def step_next_message(self)
def get_next_message_time(self)
def navigate_fastforward(self)
def add_listener(self, topic, listener)
def get_entry_before(self, t)
def _redraw_timeline(self, timer)
def get_entry_after(self, t)
def get_previous_message_time(self)
def _start_redraw_timer(self)
def _export_region(self, path, topics, start_stamp, end_stamp)
def set_publishing_state(self, start_publishing)
def on_mousewheel(self, event)
def start_background_task(self, background_task)
Copy messages to...
def _entry_at(self, t, queue)
def translate_timeline_left(self)
def __init__(self, context, publish_clock)
def _get_topics_by_datatype(self)
def _get_play_speed(self)
Playhead.
def navigate_previous(self)
def topic_callback(self, msg, topic)
def read_message(self, topic, position)
def stop_publishing(self, topic)
def toggle_play_all(self)
def navigate_rewind(self)
def has_listeners(self, topic)
def _run_export_region(self, export_bag, topics, start_stamp, end_stamp, bag_entries)
def stop_background_task(self)
def get_entry(self, t, topic)
def get_entries(self, topics, start_stamp, end_stamp)
def add_view(self, topic, frame)
Views / listeners.