41 python3 = (sys.hexversion > 0x03000000)
43 python_list_types = [list, tuple]
46 python_string_types = [str, bytes]
47 python_int_types = [int]
49 python_string_types = [str, unicode]
50 python_int_types = [int, long]
52 python_float_types = [float]
54 ros_to_python_type_map = {
56 'float32' : copy.deepcopy(python_float_types + python_int_types),
57 'float64' : copy.deepcopy(python_float_types + python_int_types),
58 'int8' : copy.deepcopy(python_int_types),
59 'int16' : copy.deepcopy(python_int_types),
60 'int32' : copy.deepcopy(python_int_types),
61 'int64' : copy.deepcopy(python_int_types),
62 'uint8' : copy.deepcopy(python_int_types),
63 'uint16' : copy.deepcopy(python_int_types),
64 'uint32' : copy.deepcopy(python_int_types),
65 'uint64' : copy.deepcopy(python_int_types),
66 'byte' : copy.deepcopy(python_int_types),
67 'char' : copy.deepcopy(python_int_types),
68 'string' : copy.deepcopy(python_string_types)
73 _ros_to_numpy_type_map = {
74 'float32' : [np.float32, np.int8, np.int16, np.uint8, np.uint16],
76 'float64' : [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
78 'int16' : [np.int8, np.int16, np.uint8],
79 'int32' : [np.int8, np.int16, np.int32, np.uint8, np.uint16],
80 'int64' : [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
82 'uint16' : [np.uint8, np.uint16],
83 'uint32' : [np.uint8, np.uint16, np.uint32],
84 'uint64' : [np.uint8, np.uint16, np.uint32, np.uint64],
90 merged = collections.defaultdict(list, ros_to_python_type_map)
91 for k, v
in _ros_to_numpy_type_map.items():
93 ros_to_python_type_map = dict(merged)
98 ros_time_types = [
'time',
'duration']
99 ros_primitive_types = [
'bool',
'byte',
'char',
'int8',
'uint8',
'int16',
100 'uint16',
'int32',
'uint32',
'int64',
'uint64',
101 'float32',
'float64',
'string']
102 ros_header_types = [
'Header',
'std_msgs/Header',
'roslib/Header']
104 def convert_dictionary_to_ros_message(message_type, dictionary, kind='message', strict_mode=True,
105 check_missing_fields=
False, check_types=
True):
107 Takes in the message type and a Python dictionary and returns a ROS message. 110 >>> msg_type = "std_msgs/String" 111 >>> dict_msg = { "data": "Hello, Robot" } 112 >>> convert_dictionary_to_ros_message(msg_type, dict_msg) 115 >>> msg_type = "std_srvs/SetBool" 116 >>> dict_msg = { "data": True } 118 >>> convert_dictionary_to_ros_message(msg_type, dict_msg, kind) 121 if kind ==
'message':
122 message_class = roslib.message.get_message_class(message_type)
123 message = message_class()
124 elif kind ==
'request':
125 service_class = roslib.message.get_service_class(message_type)
126 message = service_class._request_class()
127 elif kind ==
'response':
128 service_class = roslib.message.get_service_class(message_type)
129 message = service_class._response_class()
131 raise ValueError(
'Unknown kind "%s".' % kind)
132 message_fields = dict(_get_message_fields(message))
134 remaining_message_fields = copy.deepcopy(message_fields)
136 for field_name, field_value
in dictionary.items():
137 if field_name
in message_fields:
138 field_type = message_fields[field_name]
139 field_value = _convert_to_ros_type(field_name, field_type, field_value, strict_mode, check_missing_fields,
141 setattr(message, field_name, field_value)
142 del remaining_message_fields[field_name]
144 error_message =
'ROS message type "{0}" has no field named "{1}"'\
145 .format(message_type, field_name)
147 raise ValueError(error_message)
149 rospy.logerr(
'{}! It will be ignored.'.format(error_message))
151 if check_missing_fields
and remaining_message_fields:
152 error_message =
'Missing fields "{0}"'.format(remaining_message_fields)
153 raise ValueError(error_message)
157 def _convert_to_ros_type(field_name, field_type, field_value, strict_mode=True, check_missing_fields=False,
159 if _is_ros_binary_type(field_type):
160 field_value = _convert_to_ros_binary(field_type, field_value)
161 elif field_type
in ros_time_types:
162 field_value = _convert_to_ros_time(field_type, field_value)
163 elif field_type
in ros_primitive_types:
168 if check_types
and type(field_value)
not in ros_to_python_type_map[field_type]:
169 raise TypeError(
"Field '{0}' has wrong type {1} (valid types: {2})".format(field_name, type(field_value), ros_to_python_type_map[field_type]))
170 field_value = _convert_to_ros_primitive(field_type, field_value)
171 elif _is_field_type_a_primitive_array(field_type):
172 field_value = field_value
173 elif _is_field_type_an_array(field_type):
174 field_value = _convert_to_ros_array(field_name, field_type, field_value, strict_mode, check_missing_fields,
177 field_value = convert_dictionary_to_ros_message(field_type, field_value, strict_mode=strict_mode,
178 check_missing_fields=check_missing_fields,
179 check_types=check_types)
182 def _convert_to_ros_binary(field_type, field_value):
183 if type(field_value)
in python_string_types:
188 binary_value_as_string = base64.b64decode(field_value, validate=
True)
192 binary_value_as_string = base64.b64decode(field_value)
194 binary_value_as_string = bytes(bytearray(field_value))
195 return binary_value_as_string
197 def _convert_to_ros_time(field_type, field_value):
200 if field_type ==
'time' and field_value ==
'now':
201 time = rospy.get_rostime()
203 if field_type ==
'time':
204 time = rospy.rostime.Time()
205 elif field_type ==
'duration':
206 time = rospy.rostime.Duration()
207 if 'secs' in field_value:
208 setattr(time,
'secs', field_value[
'secs'])
209 if 'nsecs' in field_value:
210 setattr(time,
'nsecs', field_value[
'nsecs'])
214 def _convert_to_ros_primitive(field_type, field_value):
216 if field_type ==
"string" and not python3:
217 field_value = field_value.encode(
'utf-8')
220 def _convert_to_ros_array(field_name, field_type, list_value, strict_mode=True, check_missing_fields=False,
223 list_type = field_type[:field_type.index(
'[')]
224 return [_convert_to_ros_type(field_name, list_type, value, strict_mode, check_missing_fields, check_types)
for value
227 def convert_ros_message_to_dictionary(message, binary_array_as_bytes=True):
229 Takes in a ROS message and returns a Python dictionary. 232 >>> import std_msgs.msg 233 >>> ros_message = std_msgs.msg.UInt32(data=42) 234 >>> convert_ros_message_to_dictionary(ros_message) 238 message_fields = _get_message_fields(message)
239 for field_name, field_type
in message_fields:
240 field_value = getattr(message, field_name)
241 dictionary[field_name] = _convert_from_ros_type(field_type, field_value, binary_array_as_bytes)
246 def _convert_from_ros_type(field_type, field_value, binary_array_as_bytes=True):
247 if field_type
in ros_primitive_types:
248 field_value = _convert_from_ros_primitive(field_type, field_value)
249 elif field_type
in ros_time_types:
250 field_value = _convert_from_ros_time(field_type, field_value)
251 elif _is_ros_binary_type(field_type):
252 if binary_array_as_bytes:
253 field_value = _convert_from_ros_binary(field_type, field_value)
254 elif type(field_value) == str:
255 field_value = [ord(v)
for v
in field_value]
257 field_value = list(field_value)
258 elif _is_field_type_a_primitive_array(field_type):
259 field_value = list(field_value)
260 elif _is_field_type_an_array(field_type):
261 field_value = _convert_from_ros_array(field_type, field_value, binary_array_as_bytes)
263 field_value = convert_ros_message_to_dictionary(field_value, binary_array_as_bytes)
267 def _is_ros_binary_type(field_type):
268 """ Checks if the field is a binary array one, fixed size or not 270 >>> _is_ros_binary_type("uint8") 272 >>> _is_ros_binary_type("uint8[]") 274 >>> _is_ros_binary_type("uint8[3]") 276 >>> _is_ros_binary_type("char") 278 >>> _is_ros_binary_type("char[]") 280 >>> _is_ros_binary_type("char[3]") 283 return field_type.startswith(
'uint8[')
or field_type.startswith(
'char[')
285 def _convert_from_ros_binary(field_type, field_value):
286 field_value = base64.b64encode(field_value).decode(
'utf-8')
289 def _convert_from_ros_time(field_type, field_value):
291 'secs' : field_value.secs,
292 'nsecs' : field_value.nsecs
296 def _convert_from_ros_primitive(field_type, field_value):
298 if field_type ==
"string" and not python3:
299 field_value = field_value.decode(
'utf-8')
302 def _convert_from_ros_array(field_type, field_value, binary_array_as_bytes=True):
304 list_type = field_type[:field_type.index(
'[')]
305 return [_convert_from_ros_type(list_type, value, binary_array_as_bytes)
for value
in field_value]
307 def _get_message_fields(message):
308 return zip(message.__slots__, message._slot_types)
310 def _is_field_type_an_array(field_type):
311 return field_type.find(
'[') >= 0
313 def _is_field_type_a_primitive_array(field_type):
314 bracket_index = field_type.find(
'[')
315 if bracket_index < 0:
318 list_type = field_type[:bracket_index]
319 return list_type
in ros_primitive_types
322 if __name__ ==
"__main__":