multi_topic_view.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2018-2019, Locus Robotics
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of the copyright holder nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 from collections import namedtuple
34 
35 import rospy
36 
37 from rqt_bag import TopicMessageView
38 
39 """
40  The topic is a string
41  The datatype can either be a string message type (i.e. 'nav_msgs/Path') or list of string message types
42  The callback will take the same arguments as message_viewed
43 """
44 ExtraTopic = namedtuple('ExtraTopic', 'topic datatype callback')
45 
46 
47 class MultiTopicView(TopicMessageView):
48  """Generic tool for visualizing multiple topics in one TopicMessageView.
49 
50  Visualization is triggered from one main_topic (whose datatype is determined
51  by Plugin.get_message_types) and then the other topics are determined by calling
52  get_extra_topics. The other messages visualized are those appearing in a time window
53  surrounding the message from the main topic.
54  """
55 
56  def __init__(self, timeline, parent, main_topic, window=0.5):
57  """Constructor.
58 
59  main_topic provides the topic of the messages to focus on.
60  window provides the number of seconds for the surrounding time window.
61  """
62  super(MultiTopicView, self).__init__(timeline, parent, main_topic)
63  self.main_topic = self.topic # Equivalent to main_topic
64  self.window = rospy.Duration(window)
65 
66  # confirm extra topics are in the bag
68  found_topics = timeline._get_topics()
69  missing_topics = []
70  for extra_topic, datatype, callback in self.get_extra_topics():
71  if extra_topic not in found_topics:
72  missing_topics.append(extra_topic)
73 
74  found_datatype = timeline.get_datatype(extra_topic)
75  if type(datatype) == list:
76  if found_datatype not in datatype:
77  rospy.logwarn('The type of extra topic {} ({}) does not match the declared types: {}'.format(
78  extra_topic, found_datatype, ', '.join(map(str, datatype))))
79  continue
80  elif datatype != found_datatype:
81  rospy.logwarn('The type of extra topic {} ({}) does not match the declared type {}'.format(
82  extra_topic, found_datatype, datatype))
83  continue
84 
85  self.extra_topic_callbacks[extra_topic] = callback
86 
87  if missing_topics:
88  rospy.logwarn('The following extra_topics were not found in the bag: ' + ', '.join(missing_topics))
89 
90  def get_extra_topics(self):
91  """Return a list of ExtraTopic tuples with extra topics to visualize. Should be overridden."""
92  return []
93 
94  def get_window_messages(self, topic, ts):
95  """Get all the messages within the time window."""
96  return self.timeline.get_entries([topic], ts - self.window, ts + self.window)
97 
98  def get_closest_message(self, topic, ts):
99  """Get the entry of the message closest to the given timestamp."""
100  best = None
101  best_t = None
102  for entry in self.get_window_messages(topic, ts):
103  if best is None or abs(entry.time - ts) < best_t:
104  best = entry
105  best_t = abs(entry.time - ts)
106  return best
107 
108  def message_viewed(self, bag, msg_details):
109  """Call the callbacks associated with the other topics.
110 
111  Automatically called whenever a new single message is focused on.
112  Classes extending this class should call this super-version of the method first.
113  """
114  super(MultiTopicView, self).message_viewed(bag, msg_details)
115  main_topic, msg, ts = msg_details
116  for extra_topic, callback in self.extra_topic_callbacks.iteritems():
117  best_entry = self.get_closest_message(extra_topic, ts)
118  if best_entry:
119  extra_msg_details = self.timeline.read_message(bag, best_entry.position)
120  callback(bag, extra_msg_details)
def __init__(self, timeline, parent, main_topic, window=0.5)


rqt_dwb_plugin
Author(s):
autogenerated on Mon Feb 28 2022 23:33:43