40 from python_qt_binding.QtCore
import Qt, QTimer, qWarning, Signal
42 from python_qt_binding.QtGui
import QGraphicsScene, QMessageBox
44 from python_qt_binding.QtWidgets
import QGraphicsScene, QMessageBox
46 from .dynamic_timeline_frame
import DynamicTimelineFrame
48 from .message_loader_thread
import MessageLoaderThread
53 from .
import topic_helper
58 BagTimeline contains bag files, all information required to display the bag data visualization on the screen
61 status_bar_changed_signal = Signal()
62 selected_region_changed = Signal(rospy.Time, rospy.Time)
63 timeline_updated = Signal()
64 Topic = collections.namedtuple(
'Topic', [
'subscriber',
'queue'])
65 Message = collections.namedtuple(
'Message', [
'stamp',
'message'])
69 :param context: plugin context hook to enable adding rqt_bag plugin widgets as ROS_GUI snapin panes, ''PluginContext''
71 super(DynamicTimeline, self).
__init__()
116 self.setBackgroundBrush(Qt.white)
129 :returns: the ROS_GUI context, 'PluginContext'
144 Cleans up the timeline, subscribers and any threads
162 self.
_topics[topic][0].unregister()
174 start =
True if playhead <= self.
_timeline_frame.play_region[0]
else False
175 end =
True if playhead >= self.
_timeline_frame.play_region[1]
else False
198 Called whenever a message is received on any of the subscribed topics
200 :param topic: the topic on which the message was received
201 :param msg: the message received
204 message = self.
Message(stamp=rospy.Time.now(), message=msg)
206 self.
_topics[topic].queue.append(message)
216 """Creates an indexing thread for the new topic. Fixes the borders and notifies
217 the indexing thread to index the new items bags
219 :param topic: a topic to listen to
220 :param type: type of the topic to listen to
221 :param num_msgs: number of messages to retain on this topic. If this
222 value is exceeded, the oldest messages will be dropped
224 :return: false if topic already in the timeline, true otherwise
229 self.
_topics[topic] = self.
Topic(subscriber=rospy.Subscriber(topic, type, queue_size=1, callback=self.
topic_callback, callback_args=topic),
230 queue=collections.deque(maxlen=num_msgs))
231 self.
_datatypes.setdefault(type, []).append(topic)
259 :return: stamp of the first message received on any of the topics, or none if no messages have been received, ''rospy.Time''
263 for unused_topic_name, topic_tuple
in self.
_topics.items():
264 topic_start_stamp = topic_helper.get_start_stamp(topic_tuple)
265 if topic_start_stamp
is not None and (start_stamp
is None or topic_start_stamp < start_stamp):
266 start_stamp = topic_start_stamp
272 :return: stamp of the last message received on any of the topics, or none if no messages have been received, ''rospy.Time''
276 for unused_topic_name, topic_tuple
in self.
_topics.items():
277 topic_end_stamp = topic_helper.get_end_stamp(topic_tuple)
278 if topic_end_stamp
is not None and (end_stamp
is None or topic_end_stamp > end_stamp):
279 end_stamp = topic_end_stamp
284 :return: sorted list of topic names, ''list(str)''
290 return sorted(topics)
294 :return: dict of list of topics for each datatype, ''dict(datatype:list(topic))''
301 :return: datatype associated with a topic, ''str''
302 :raises: if there are multiple datatypes assigned to a single topic, ''Exception''
308 if len(topic_types) == 1:
309 raise Exception(
"Topic {0} had multiple datatypes ({1}) associated with it".format(topic, str(topic_types)))
310 topic_types.append(datatype._type)
315 return topic_types[0]
319 generator function for topic entries
320 :param topics: list of topics to query, ''list(str)''
321 :param start_stamp: stamp to start at, ''rospy.Time''
322 :param end_stamp: stamp to end at, ''rospy,Time''
323 :returns: messages on the given topics in chronological order, ''msg''
330 rospy.logwarn(
"Dynamic Timeline : Topic {0} was not in the topic list. Skipping.".format(topic))
334 topic_start_time = topic_helper.get_start_stamp(self.
_topics[topic])
335 if topic_start_time
is not None and topic_start_time > end_stamp:
338 topic_end_time = topic_helper.get_end_stamp(self.
_topics[topic])
339 if topic_end_time
is not None and topic_end_time < start_stamp:
342 topic_queue = self.
_topics[topic].queue
343 start_ind, first_entry = self.
_entry_at(start_stamp, topic_queue)
347 if first_entry.stamp < start_stamp:
352 end_ind, last_entry = self.
_entry_at(end_stamp, topic_queue)
354 topic_entries.extend(list(itertools.islice(topic_queue, start_ind, end_ind + 1)))
356 for entry
in sorted(topic_entries, key=
lambda x: x.stamp):
361 generator function for bag entries
362 :param topics: list of topics to query, ''list(str)''
363 :param start_stamp: stamp to start at, ''rospy.Time''
364 :param end_stamp: stamp to end at, ''rospy,Time''
365 :returns: tuple of (bag, entry) for the entries in the bag file, ''(rosbag.bag, msg)''
368 from rosbag
import bag
373 bag_start_time = bag_helper.get_start_stamp(b)
374 if bag_start_time
is not None and bag_start_time > end_stamp:
377 bag_end_time = bag_helper.get_end_stamp(b)
378 if bag_end_time
is not None and bag_end_time < start_stamp:
381 connections = list(b._get_connections(topic))
382 it = iter(b._get_entries(connections, start_stamp, end_stamp))
384 bag_entries.append(it)
386 for entry, it
in bag._mergesort(bag_entries, key=
lambda entry: entry.time):
387 yield bag_by_iter[it], entry
390 """Get the entry and index in the queue at the given time.
392 :param ``rospy.Time`` t: time to check
393 :param ``collections.deque`` queue: deque to look at
395 :return: (index, Message) tuple. If there is no message in the queue at
396 the exact time, the previous index is returned. If the time is
397 before or after the first and last message times, the first or last
398 index and message are returned
408 ind = next(i
for i, msg
in enumerate(queue)
if t < msg.stamp)
409 except StopIteration:
413 if ind == len(queue):
414 return (ind - 1, queue[-1])
423 return (ind - 1, queue[ind - 1])
426 """Get a message in the queues for a specific topic
427 :param ``rospy.Time`` t: time of the message to retrieve
428 :param str topic: the topic to be accessed
429 :return: message corresponding to time t and topic. If there is no
430 corresponding entry at exactly the given time, the latest entry
431 before the given time is returned. If the topic does not exist, or
432 there is no entry, None.
444 Get the latest message before the given time on any of the topics
445 :param t: time, ``rospy.Time``
446 :return: tuple of (topic, entry) corresponding to time t, ``(str, msg)``
449 entry_topic, entry =
None,
None
451 _, topic_entry = self.
_entry_at(t - rospy.Duration(0,1), self.
_topics[topic].queue)
452 if topic_entry
and (
not entry
or topic_entry.stamp > entry.stamp):
453 entry_topic, entry = topic, topic_entry
455 return entry_topic, entry
459 Get the earliest message on any topic after the given time
460 :param t: time, ''rospy.Time''
461 :return: tuple of (bag, entry) corresponding to time t, ''(rosbag.bag, msg)''
464 entry_topic, entry =
None,
None
470 ind = ind + 1
if ind + 1 < len(self.
_topics[topic].queue)
else ind
471 topic_entry = self.
_topics[topic].queue[ind]
472 if topic_entry
and (
not entry
or topic_entry.stamp < entry.stamp):
473 entry_topic, entry = topic, topic_entry
475 return entry_topic, entry
479 :return: time of the message after the current playhead position,''rospy.Time''
492 :return: time of the message before the current playhead position,''rospy.Time''
511 Verify that a background task is not currently running before starting a new one
512 :param background_task: name of the background task, ''str''
515 QMessageBox(QMessageBox.Warning,
'Exclamation',
'Background operation already running:\n\n%s' % self.
background_task, QMessageBox.Ok).exec_()
526 if len(self._bags) > 0:
531 Starts a thread to save the current selection to a new bag file
532 :param path: filesystem path to write to, ''str''
533 :param topics: topics to write to the file, ''list(str)''
534 :param start_stamp: start of area to save, ''rospy.Time''
535 :param end_stamp: end of area to save, ''rospy.Time''
546 total_messages = len(bag_entries)
549 if total_messages == 0:
550 QMessageBox(QMessageBox.Warning,
'rqt_bag',
'No messages found', QMessageBox.Ok).exec_()
558 QMessageBox(QMessageBox.Warning,
'rqt_bag',
'Error opening bag file [%s] for writing' % path, QMessageBox.Ok).exec_()
568 Threaded function that saves the current selection to a new bag file
569 :param export_bag: bagfile to write to, ''rosbag.bag''
570 :param topics: topics to write to the file, ''list(str)''
571 :param start_stamp: start of area to save, ''rospy.Time''
572 :param end_stamp: end of area to save, ''rospy.Time''
574 total_messages = len(bag_entries)
575 update_step = max(1, total_messages / 100)
579 for bag, entry
in bag_entries:
584 export_bag.write(topic, msg, t)
585 except Exception
as ex:
586 qWarning(
'Error exporting message at position %s: %s' % (str(entry.position), str(ex)))
591 if message_num % update_step == 0
or message_num == total_messages:
592 new_progress = int(100.0 * (float(message_num) / total_messages))
593 if new_progress != progress:
594 progress = new_progress
606 except Exception
as ex:
607 QMessageBox(QMessageBox.Warning,
'rqt_bag',
'Error closing bag file [%s]: %s' % (export_bag.filename, str(ex)), QMessageBox.Ok).exec_()
612 return self.
get_entry(position, topic).message
616 When the user clicks down in the timeline.
618 if event.buttons() == Qt.LeftButton:
620 elif event.buttons() == Qt.MidButton:
622 elif event.buttons() == Qt.RightButton:
675 self.
_player.start_clock_publishing()
676 except Exception
as ex:
677 qWarning(
'Error starting player; aborting publish: %s' % str(ex))
705 play_all = property(_get_play_all, _set_play_all)
716 moves the playhead to the next position based on the desired position
731 Moves the playhead a fixed distance into the future based on the current play speed
738 now = rospy.Time.from_sec(time.time())
742 new_playhead = self.end_stamp
748 if new_playhead > end_stamp:
751 new_playhead = start_stamp
753 new_playhead = end_stamp
755 new_playhead = end_stamp
760 elif new_playhead < start_stamp:
763 new_playhead = end_stamp
765 new_playhead = start_stamp
767 new_playhead = start_stamp
777 Move the playhead to the next message
788 delta = rospy.Time.from_sec(time.time()) - self.
last_frame
789 if delta > rospy.Duration.from_sec(0.1):
790 delta = rospy.Duration.from_sec(0.1)
801 self.
last_frame = rospy.Time.from_sec(time.time())
805 def record_bag(self, filename, all=True, topics=[], regex=False, limit=0):
807 self.
_recorder =
Recorder(filename, bag_lock=self._bag_lock, all=all, topics=topics, regex=regex, limit=limit)
808 except Exception
as ex:
809 qWarning(
'Error opening bag for recording [%s]: %s' % (filename, str(ex)))
855 listener.timeline_changed()
856 except Exception
as ex:
857 qWarning(
'Error calling timeline_changed on %s: %s' % (type(listener), str(ex)))
867 self.
_listeners.setdefault(topic, []).append(listener)
879 if topic_listeners
is not None and listener
in topic_listeners:
880 topic_listeners.remove(listener)
882 if len(topic_listeners) == 0:
905 elif play_speed < 0.0:
914 play_speed = property(_get_play_speed, _set_play_speed)
924 self.
last_frame = rospy.Time.from_sec(time.time())
946 new_play_speed = -1.0