00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 import roslib
00032 import roslib.msgs
00033 import roslib.message
00034 from rostopic import get_topic_type
00035 from python_qt_binding.QtCore import qDebug
00036
00037
00038 def get_type_class(type_name):
00039 if roslib.msgs.is_valid_constant_type(type_name):
00040 if type_name == 'string':
00041 return str
00042 elif type_name == 'bool':
00043 return bool
00044 else:
00045 return type(roslib.msgs._convert_val(type_name, 0))
00046 else:
00047 return roslib.message.get_message_class(type_name)
00048
00049
00050 def get_field_type(topic_name):
00051 """
00052 Get the Python type of a specific field in the given registered topic.
00053 If the field is an array, the type of the array's values are returned and the is_array flag is set to True.
00054 This is a static type check, so it works for unpublished topics and with empty arrays.
00055
00056 :param topic_name: name of field of a registered topic, ``str``, i.e. '/rosout/file'
00057 :returns: field_type, is_array
00058 """
00059
00060 topic_type, real_topic_name, _ = get_topic_type(topic_name)
00061 if topic_type is None:
00062
00063 return None, False
00064
00065 message_class = roslib.message.get_message_class(topic_type)
00066 if message_class is None:
00067 qDebug('topic_helpers.get_field_type(%s): get_message_class(%s) failed' % (topic_name, topic_type))
00068 return None, False
00069
00070 slot_path = topic_name[len(real_topic_name):]
00071 return get_slot_type(message_class, slot_path)
00072
00073
00074 def get_slot_type(message_class, slot_path):
00075 """
00076 Get the Python type of a specific slot in the given message class.
00077 If the field is an array, the type of the array's values are returned and the is_array flag is set to True.
00078 This is a static type check, so it works for unpublished topics and with empty arrays.
00079
00080 :param message_class: message class type, ``type``, usually inherits from genpy.message.Message
00081 :param slot_path: path to the slot inside the message class, ``str``, i.e. 'header/seq'
00082 :returns: field_type, is_array
00083 """
00084 is_array = False
00085 fields = [f for f in slot_path.split('/') if f]
00086 for field_name in fields:
00087 try:
00088 field_name, _, field_index = roslib.msgs.parse_type(field_name)
00089 except roslib.msgs.MsgSpecException:
00090 return None, False
00091 if field_name not in getattr(message_class, '__slots__', []):
00092
00093 return None, False
00094 slot_type = message_class._slot_types[message_class.__slots__.index(field_name)]
00095 slot_type, slot_is_array, _ = roslib.msgs.parse_type(slot_type)
00096 is_array = slot_is_array and field_index is None
00097
00098 message_class = get_type_class(slot_type)
00099 return message_class, is_array
00100
00101
00102 def is_slot_numeric(topic_name):
00103 """
00104 Check is a slot in the given topic is numeric, or an array of numeric values.
00105 This is a static type check, so it works for unpublished topics and with empty arrays.
00106
00107 :param topic_name: name of field of a registered topic, ``str``, i.e. '/rosout/file'
00108 :returns: is_numeric, is_array, description
00109 """
00110 field_type, is_array = get_field_type(topic_name)
00111 if field_type in (int, float):
00112 if is_array:
00113 message = 'topic "%s" is numeric array: %s[]' % (topic_name, field_type)
00114 else:
00115 message = 'topic "%s" is numeric: %s' % (topic_name, field_type)
00116 return True, is_array, message
00117
00118 return False, is_array, 'topic "%s" is NOT numeric: %s' % (topic_name, field_type)
00119
00120
00121 def find_slots_by_type_dfs(msg_class, slot_type):
00122 """
00123 Search inside msg_class for all slots of type slot_type and return their paths.
00124 Uses a depth first search.
00125
00126 :param msg_class: The class to search in.
00127 :param slot_type: The type name or class to search for (e.g. 'float64' or Quaternion).
00128 :return: List of paths to slots of type slot_type inside msg_class (e.g. ['header/frame_id']).
00129 """
00130
00131 def _find_slots(msg_class, slot_type):
00132 paths = []
00133 if msg_class == slot_type:
00134 paths.append([])
00135 return paths
00136
00137 for slot_name, slot_type_name in zip(msg_class.__slots__, msg_class._slot_types):
00138 slot_type_name, is_array, _ = roslib.msgs.parse_type(slot_type_name)
00139 if is_array:
00140 slot_name += '[]'
00141 if roslib.msgs.is_valid_constant_type(slot_type_name):
00142 if slot_type_name == slot_type:
00143 paths.append([slot_name])
00144 continue
00145
00146 slot_class = roslib.message.get_message_class(slot_type_name)
00147 if slot_class is not None:
00148 inner_paths = _find_slots(slot_class, slot_type)
00149 paths.extend([[slot_name] + path for path in inner_paths])
00150
00151 return paths
00152
00153 return ['/'.join(path) for path in _find_slots(msg_class, slot_type)]
00154
00155
00156 def find_slots_by_type_bfs(msg_class, slot_type):
00157 """
00158 Search inside msg_class for all slots of type slot_type and return their paths.
00159 Uses a breadth first search, so it will find the most shallow matches first.
00160
00161 :param msg_class: The class to search in.
00162 :param slot_type: The type name or class to search for (e.g. 'float64' or Quaternion).
00163 :return: List of paths to slots of type slot_type inside msg_class (e.g. ['header/frame_id']).
00164 """
00165 paths = []
00166 queue = [(msg_class, [])]
00167 while queue:
00168 msg_class, path = queue.pop(0)
00169 if msg_class == slot_type:
00170 paths.append(path)
00171 continue
00172
00173 for slot_name, slot_type_name in zip(msg_class.__slots__, msg_class._slot_types):
00174 slot_type_name, is_array, _ = roslib.msgs.parse_type(slot_type_name)
00175 if is_array:
00176 slot_name += '[]'
00177 if roslib.msgs.is_valid_constant_type(slot_type_name):
00178 if slot_type_name == slot_type:
00179 paths.append(path + [slot_name])
00180 continue
00181
00182 slot_class = roslib.message.get_message_class(slot_type_name)
00183 if slot_class is not None:
00184 queue.append((slot_class, path + [slot_name]))
00185
00186 return ['/'.join(path) for path in paths]