topic_helpers.py
Go to the documentation of this file.
1 # Copyright (c) 2011, Dorian Scholz, TU Darmstadt
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 #
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following
12 # disclaimer in the documentation and/or other materials provided
13 # with the distribution.
14 # * Neither the name of the TU Darmstadt nor the names of its
15 # contributors may be used to endorse or promote products derived
16 # from this software without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
21 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 # COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
24 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
30 
31 import roslib
32 import roslib.msgs
33 import roslib.message
34 from rostopic import get_topic_type
35 from python_qt_binding.QtCore import qDebug
36 
37 
38 def get_type_class(type_name):
39  if roslib.msgs.is_valid_constant_type(type_name):
40  if type_name == 'string':
41  return str
42  elif type_name == 'bool':
43  return bool
44  else:
45  return type(roslib.msgs._convert_val(type_name, 0))
46  else:
47  return roslib.message.get_message_class(type_name)
48 
49 
50 def get_field_type(topic_name):
51  """
52  Get the Python type of a specific field in the given registered topic.
53  If the field is an array, the type of the array's values are returned and the is_array flag is set to True.
54  This is a static type check, so it works for unpublished topics and with empty arrays.
55 
56  :param topic_name: name of field of a registered topic, ``str``, i.e. '/rosout/file'
57  :returns: field_type, is_array
58  """
59  # get topic_type and message_evaluator
60  topic_type, real_topic_name, _ = get_topic_type(topic_name)
61  if topic_type is None:
62  # qDebug('topic_helpers.get_field_type(%s): get_topic_type failed' % (topic_name))
63  return None, False
64 
65  message_class = roslib.message.get_message_class(topic_type)
66  if message_class is None:
67  qDebug('topic_helpers.get_field_type(%s): get_message_class(%s) failed' %
68  (topic_name, topic_type))
69  return None, False
70 
71  slot_path = topic_name[len(real_topic_name):]
72  return get_slot_type(message_class, slot_path)
73 
74 
75 def get_slot_type(message_class, slot_path):
76  """
77  Get the Python type of a specific slot in the given message class.
78  If the field is an array, the type of the array's values are returned and the is_array flag is set to True.
79  This is a static type check, so it works for unpublished topics and with empty arrays.
80 
81  :param message_class: message class type, ``type``, usually inherits from genpy.message.Message
82  :param slot_path: path to the slot inside the message class, ``str``, i.e. 'header/seq'
83  :returns: field_type, is_array
84  """
85  is_array = False
86  fields = [f for f in slot_path.split('/') if f]
87  for field_name in fields:
88  try:
89  field_name, _, field_index = roslib.msgs.parse_type(field_name)
90  except roslib.msgs.MsgSpecException:
91  return None, False
92  if field_name not in getattr(message_class, '__slots__', []):
93  # qDebug('topic_helpers.get_slot_type(%s, %s): field not found: %s' %
94  # (message_class, slot_path, field_name))
95  return None, False
96  slot_type = message_class._slot_types[message_class.__slots__.index(field_name)]
97  slot_type, slot_is_array, _ = roslib.msgs.parse_type(slot_type)
98  is_array = slot_is_array and field_index is None
99 
100  message_class = get_type_class(slot_type)
101  return message_class, is_array
102 
103 
104 def is_slot_numeric(topic_name):
105  """
106  Check is a slot in the given topic is numeric, or an array of numeric values.
107  This is a static type check, so it works for unpublished topics and with empty arrays.
108 
109  :param topic_name: name of field of a registered topic, ``str``, i.e. '/rosout/file'
110  :returns: is_numeric, is_array, description
111  """
112  field_type, is_array = get_field_type(topic_name)
113  if field_type in (int, float):
114  if is_array:
115  message = 'topic "%s" is numeric array: %s[]' % (topic_name, field_type)
116  else:
117  message = 'topic "%s" is numeric: %s' % (topic_name, field_type)
118  return True, is_array, message
119 
120  return False, is_array, 'topic "%s" is NOT numeric: %s' % (topic_name, field_type)
121 
122 
123 def find_slots_by_type_dfs(msg_class, slot_type):
124  """
125  Search inside msg_class for all slots of type slot_type and return their paths.
126  Uses a depth first search.
127 
128  :param msg_class: The class to search in.
129  :param slot_type: The type name or class to search for (e.g. 'float64' or Quaternion).
130  :return: List of paths to slots of type slot_type inside msg_class (e.g. ['header/frame_id']).
131  """
132 
133  def _find_slots(msg_class, slot_type):
134  paths = []
135  if msg_class == slot_type:
136  paths.append([])
137  return paths
138 
139  for slot_name, slot_type_name in zip(msg_class.__slots__, msg_class._slot_types):
140  slot_type_name, is_array, _ = roslib.msgs.parse_type(slot_type_name)
141  if is_array:
142  slot_name += '[]'
143  if roslib.msgs.is_valid_constant_type(slot_type_name):
144  if slot_type_name == slot_type:
145  paths.append([slot_name])
146  continue
147 
148  slot_class = roslib.message.get_message_class(slot_type_name)
149  if slot_class is not None:
150  inner_paths = _find_slots(slot_class, slot_type)
151  paths.extend([[slot_name] + path for path in inner_paths])
152 
153  return paths
154 
155  return ['/'.join(path) for path in _find_slots(msg_class, slot_type)]
156 
157 
158 def find_slots_by_type_bfs(msg_class, slot_type):
159  """
160  Search inside msg_class for all slots of type slot_type and return their paths.
161  Uses a breadth first search, so it will find the most shallow matches first.
162 
163  :param msg_class: The class to search in.
164  :param slot_type: The type name or class to search for (e.g. 'float64' or Quaternion).
165  :return: List of paths to slots of type slot_type inside msg_class (e.g. ['header/frame_id']).
166  """
167  paths = []
168  queue = [(msg_class, [])]
169  while queue:
170  msg_class, path = queue.pop(0)
171  if msg_class == slot_type:
172  paths.append(path)
173  continue
174 
175  for slot_name, slot_type_name in zip(msg_class.__slots__, msg_class._slot_types):
176  slot_type_name, is_array, _ = roslib.msgs.parse_type(slot_type_name)
177  if is_array:
178  slot_name += '[]'
179  if roslib.msgs.is_valid_constant_type(slot_type_name):
180  if slot_type_name == slot_type:
181  paths.append(path + [slot_name])
182  continue
183 
184  slot_class = roslib.message.get_message_class(slot_type_name)
185  if slot_class is not None:
186  queue.append((slot_class, path + [slot_name]))
187 
188  return ['/'.join(path) for path in paths]
def get_type_class(type_name)
def get_field_type(topic_name)
def is_slot_numeric(topic_name)
def find_slots_by_type_bfs(msg_class, slot_type)
def get_slot_type(message_class, slot_path)
def find_slots_by_type_dfs(msg_class, slot_type)


rqt_py_common
Author(s): Dorian Scholz , Isaac Saito, Dirk Thomas
autogenerated on Mon May 16 2022 02:49:40