00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import roslib.message
00035 import rospy
00036 import re
00037 import base64
00038 import sys
00039 python3 = True if sys.hexversion > 0x03000000 else False
00040 python_to_ros_type_map = {
00041 'bool' : ['bool'],
00042 'int' : ['int8', 'byte', 'uint8', 'char',
00043 'int16', 'uint16', 'int32', 'uint32',
00044 'int64', 'uint64', 'float32', 'float64'],
00045 'float' : ['float32', 'float64'],
00046 'str' : ['string'],
00047 'unicode' : ['string'],
00048 'long' : ['uint64']
00049 }
00050 if python3:
00051 python_string_types = [str]
00052 else:
00053 python_string_types = [str, unicode]
00054 python_list_types = [list, tuple]
00055
00056 ros_time_types = ['time', 'duration']
00057 ros_primitive_types = ['bool', 'byte', 'char', 'int8', 'uint8', 'int16',
00058 'uint16', 'int32', 'uint32', 'int64', 'uint64',
00059 'float32', 'float64', 'string']
00060 ros_header_types = ['Header', 'std_msgs/Header', 'roslib/Header']
00061 ros_binary_types_regexp = re.compile(r'(uint8|char)\[[^\]]*\]')
00062
00063 list_brackets = re.compile(r'\[[^\]]*\]')
00064
00065 def convert_dictionary_to_ros_message(message_type, dictionary, kind='message'):
00066 """
00067 Takes in the message type and a Python dictionary and returns a ROS message.
00068
00069 Example:
00070 message_type = "std_msgs/String"
00071 dict_message = { "data": "Hello, Robot" }
00072 ros_message = convert_dictionary_to_ros_message(message_type, dict_message)
00073
00074 message_type = "std_srvs/SetBool"
00075 dict_message = { "data": True }
00076 kind = "request"
00077 ros_message = convert_dictionary_to_ros_message(message_type, dict_message, kind)
00078 """
00079 if kind == 'message':
00080 message_class = roslib.message.get_message_class(message_type)
00081 message = message_class()
00082 elif kind == 'request':
00083 service_class = roslib.message.get_service_class(message_type)
00084 message = service_class._request_class()
00085 elif kind == 'response':
00086 service_class = roslib.message.get_service_class(message_type)
00087 message = service_class._response_class()
00088 else:
00089 raise ValueError('Unknown kind "%s".' % kind)
00090 message_fields = dict(_get_message_fields(message))
00091
00092 for field_name, field_value in dictionary.items():
00093 if field_name in message_fields:
00094 field_type = message_fields[field_name]
00095 field_value = _convert_to_ros_type(field_type, field_value)
00096 setattr(message, field_name, field_value)
00097 else:
00098 error_message = 'ROS message type "{0}" has no field named "{1}"'\
00099 .format(message_type, field_name)
00100 raise ValueError(error_message)
00101
00102 return message
00103
00104 def _convert_to_ros_type(field_type, field_value):
00105 if is_ros_binary_type(field_type, field_value):
00106 field_value = _convert_to_ros_binary(field_type, field_value)
00107 elif field_type in ros_time_types:
00108 field_value = _convert_to_ros_time(field_type, field_value)
00109 elif field_type in ros_primitive_types:
00110 field_value = _convert_to_ros_primitive(field_type, field_value)
00111 elif _is_field_type_an_array(field_type):
00112 field_value = _convert_to_ros_array(field_type, field_value)
00113 else:
00114 field_value = convert_dictionary_to_ros_message(field_type, field_value)
00115
00116 return field_value
00117
00118 def _convert_to_ros_binary(field_type, field_value):
00119 if type(field_value) in python_string_types:
00120 binary_value_as_string = base64.standard_b64decode(field_value)
00121 else:
00122 binary_value_as_string = str(bytearray(field_value))
00123
00124 return binary_value_as_string
00125
00126 def _convert_to_ros_time(field_type, field_value):
00127 time = None
00128
00129 if field_type == 'time' and field_value == 'now':
00130 time = rospy.get_rostime()
00131 else:
00132 if field_type == 'time':
00133 time = rospy.rostime.Time()
00134 elif field_type == 'duration':
00135 time = rospy.rostime.Duration()
00136 if 'secs' in field_value:
00137 setattr(time, 'secs', field_value['secs'])
00138 if 'nsecs' in field_value:
00139 setattr(time, 'nsecs', field_value['nsecs'])
00140
00141 return time
00142
00143 def _convert_to_ros_primitive(field_type, field_value):
00144 if field_type == "string":
00145 field_value = field_value.encode('utf-8')
00146 return field_value
00147
00148 def _convert_to_ros_array(field_type, list_value):
00149 list_type = list_brackets.sub('', field_type)
00150 return [_convert_to_ros_type(list_type, value) for value in list_value]
00151
00152 def convert_ros_message_to_dictionary(message):
00153 """
00154 Takes in a ROS message and returns a Python dictionary.
00155
00156 Example:
00157 ros_message = std_msgs.msg.String(data="Hello, Robot")
00158 dict_message = convert_ros_message_to_dictionary(ros_message)
00159 """
00160 dictionary = {}
00161 message_fields = _get_message_fields(message)
00162 for field_name, field_type in message_fields:
00163 field_value = getattr(message, field_name)
00164 dictionary[field_name] = _convert_from_ros_type(field_type, field_value)
00165
00166 return dictionary
00167
00168 def _convert_from_ros_type(field_type, field_value):
00169 if is_ros_binary_type(field_type, field_value):
00170 field_value = _convert_from_ros_binary(field_type, field_value)
00171 elif field_type in ros_time_types:
00172 field_value = _convert_from_ros_time(field_type, field_value)
00173 elif field_type in ros_primitive_types:
00174 field_value = field_value
00175 elif _is_field_type_an_array(field_type):
00176 field_value = _convert_from_ros_array(field_type, field_value)
00177 else:
00178 field_value = convert_ros_message_to_dictionary(field_value)
00179
00180 return field_value
00181
00182
00183 def is_ros_binary_type(field_type, field_value):
00184 """ Checks if the field is a binary array one, fixed size or not
00185
00186 is_ros_binary_type("uint8", 42)
00187 >>> False
00188 is_ros_binary_type("uint8[]", [42, 18])
00189 >>> True
00190 is_ros_binary_type("uint8[3]", [42, 18, 21]
00191 >>> True
00192 is_ros_binary_type("char", 42)
00193 >>> False
00194 is_ros_binary_type("char[]", [42, 18])
00195 >>> True
00196 is_ros_binary_type("char[3]", [42, 18, 21]
00197 >>> True
00198 """
00199 return re.search(ros_binary_types_regexp, field_type) is not None
00200
00201 def _convert_from_ros_binary(field_type, field_value):
00202 field_value = base64.standard_b64encode(field_value)
00203 return field_value
00204
00205 def _convert_from_ros_time(field_type, field_value):
00206 field_value = {
00207 'secs' : field_value.secs,
00208 'nsecs' : field_value.nsecs
00209 }
00210 return field_value
00211
00212 def _convert_from_ros_primitive(field_type, field_value):
00213 return field_value
00214
00215 def _convert_from_ros_array(field_type, field_value):
00216 list_type = list_brackets.sub('', field_type)
00217 return [_convert_from_ros_type(list_type, value) for value in field_value]
00218
00219 def _get_message_fields(message):
00220 return zip(message.__slots__, message._slot_types)
00221
00222 def _is_field_type_an_array(field_type):
00223 return list_brackets.search(field_type) is not None