45 python3 = sys.hexversion > 0x03000000
47 python_list_types = [list, tuple]
50 python_string_types = [str, bytes]
51 python_int_types = [int]
53 python_string_types = [str, unicode]
54 python_int_types = [int, long]
56 python_float_types = [float]
58 ros_to_python_type_map = {
60 'float32': copy.deepcopy(python_float_types + python_int_types),
61 'float64': copy.deepcopy(python_float_types + python_int_types),
62 'int8': copy.deepcopy(python_int_types),
63 'int16': copy.deepcopy(python_int_types),
64 'int32': copy.deepcopy(python_int_types),
65 'int64': copy.deepcopy(python_int_types),
66 'uint8': copy.deepcopy(python_int_types),
67 'uint16': copy.deepcopy(python_int_types),
68 'uint32': copy.deepcopy(python_int_types),
69 'uint64': copy.deepcopy(python_int_types),
70 'byte': copy.deepcopy(python_int_types),
71 'char': copy.deepcopy(python_int_types),
72 'string': copy.deepcopy(python_string_types),
78 _ros_to_numpy_type_map = {
79 'float32': [np.float32, np.int8, np.int16, np.uint8, np.uint16],
82 'float64': [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
84 'int16': [np.int8, np.int16, np.uint8],
85 'int32': [np.int8, np.int16, np.int32, np.uint8, np.uint16],
86 'int64': [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
88 'uint16': [np.uint8, np.uint16],
89 'uint32': [np.uint8, np.uint16, np.uint32],
90 'uint64': [np.uint8, np.uint16, np.uint32, np.uint64],
96 merged = collections.defaultdict(list, ros_to_python_type_map)
97 for k, v
in _ros_to_numpy_type_map.items():
99 ros_to_python_type_map = dict(merged)
104 ros_time_types = [
'time',
'duration']
105 ros_primitive_types = [
121 ros_header_types = [
'Header',
'std_msgs/Header',
'roslib/Header']
124 def convert_dictionary_to_ros_message(
129 check_missing_fields=False,
134 Takes in the message type and a Python dictionary and returns a ROS message. 137 >>> msg_type = "std_msgs/String" 138 >>> dict_msg = { "data": "Hello, Robot" } 139 >>> convert_dictionary_to_ros_message(msg_type, dict_msg) 142 >>> msg_type = "std_srvs/SetBool" 143 >>> dict_msg = { "data": True } 145 >>> convert_dictionary_to_ros_message(msg_type, dict_msg, kind) 148 if kind ==
'message':
149 message_class = roslib.message.get_message_class(message_type)
150 message = message_class()
151 elif kind ==
'request':
152 service_class = roslib.message.get_service_class(message_type)
153 message = service_class._request_class()
154 elif kind ==
'response':
155 service_class = roslib.message.get_service_class(message_type)
156 message = service_class._response_class()
158 raise ValueError(
'Unknown kind "%s".' % kind)
159 message_fields = dict(_get_message_fields(message))
161 remaining_message_fields = copy.deepcopy(message_fields)
163 if dictionary
is None:
165 for field_name, field_value
in dictionary.items():
166 if field_name
in message_fields:
167 field_type = message_fields[field_name]
168 if field_value
is not None:
169 field_value = _convert_to_ros_type(
170 field_name, field_type, field_value, strict_mode, check_missing_fields, check_types, log_level
172 setattr(message, field_name, field_value)
173 del remaining_message_fields[field_name]
175 error_message =
'ROS message type "{0}" has no field named "{1}"'.format(message_type, field_name)
177 raise ValueError(error_message)
179 if log_level
not in [
"debug",
"info",
"warning",
"error",
"critical"]:
181 logger = logging.getLogger(
'rosout')
182 log_func = getattr(logger, log_level)
184 log_func(
'{}! It will be ignored.'.format(error_message))
186 if check_missing_fields
and remaining_message_fields:
187 error_message =
'Missing fields "{0}"'.format(remaining_message_fields)
188 raise ValueError(error_message)
193 def _convert_to_ros_type(
198 check_missing_fields=False,
202 if _is_ros_binary_type(field_type):
203 field_value = _convert_to_ros_binary(field_type, field_value)
204 elif field_type
in ros_time_types:
205 field_value = _convert_to_ros_time(field_type, field_value)
206 elif field_type
in ros_primitive_types:
211 if check_types
and type(field_value)
not in ros_to_python_type_map[field_type]:
213 "Field '{0}' has wrong type {1} (valid types: {2})".format(
214 field_name, type(field_value), ros_to_python_type_map[field_type]
217 field_value = _convert_to_ros_primitive(field_type, field_value)
218 elif _is_field_type_a_primitive_array(field_type):
219 field_value = field_value
220 elif _is_field_type_an_array(field_type):
221 field_value = _convert_to_ros_array(
222 field_name, field_type, field_value, strict_mode, check_missing_fields, check_types, log_level
225 field_value = convert_dictionary_to_ros_message(
228 strict_mode=strict_mode,
229 check_missing_fields=check_missing_fields,
230 check_types=check_types,
236 def _convert_to_ros_binary(field_type, field_value):
237 if type(field_value)
in python_string_types:
242 binary_value_as_string = base64.b64decode(field_value, validate=
True)
246 binary_value_as_string = base64.b64decode(field_value)
248 binary_value_as_string = bytes(bytearray(field_value))
249 return binary_value_as_string
252 def _convert_to_ros_time(field_type, field_value):
255 if field_type ==
'time' and field_value ==
'now':
256 time = rospy.get_rostime()
258 if field_type ==
'time':
259 time = rospy.rostime.Time()
260 elif field_type ==
'duration':
261 time = rospy.rostime.Duration()
262 if 'secs' in field_value
and field_value[
'secs']
is not None:
263 setattr(time,
'secs', field_value[
'secs'])
264 if 'nsecs' in field_value
and field_value[
'nsecs']
is not None:
265 setattr(time,
'nsecs', field_value[
'nsecs'])
270 def _convert_to_ros_primitive(field_type, field_value):
272 if field_type ==
"string" and not python3:
273 field_value = field_value.encode(
'utf-8')
277 def _convert_to_ros_array(
282 check_missing_fields=False,
287 list_type = field_type[: field_type.index(
'[')]
289 _convert_to_ros_type(field_name, list_type, value, strict_mode, check_missing_fields, check_types, log_level)
290 for value
in list_value
294 def convert_ros_message_to_dictionary(message, binary_array_as_bytes=True):
296 Takes in a ROS message and returns a Python dictionary. 299 >>> import std_msgs.msg 300 >>> ros_message = std_msgs.msg.UInt32(data=42) 301 >>> convert_ros_message_to_dictionary(ros_message) 305 message_fields = _get_message_fields(message)
306 for field_name, field_type
in message_fields:
307 field_value = getattr(message, field_name)
308 dictionary[field_name] = _convert_from_ros_type(field_type, field_value, binary_array_as_bytes)
313 def _convert_from_ros_type(field_type, field_value, binary_array_as_bytes=True):
314 if field_type
in ros_primitive_types:
315 field_value = _convert_from_ros_primitive(field_type, field_value)
316 elif field_type
in ros_time_types:
317 field_value = _convert_from_ros_time(field_type, field_value)
318 elif _is_ros_binary_type(field_type):
319 if binary_array_as_bytes:
320 field_value = _convert_from_ros_binary(field_type, field_value)
321 elif type(field_value) == str:
322 field_value = [ord(v)
for v
in field_value]
324 field_value = list(field_value)
325 elif _is_field_type_a_primitive_array(field_type):
326 field_value = list(field_value)
327 elif _is_field_type_an_array(field_type):
328 field_value = _convert_from_ros_array(field_type, field_value, binary_array_as_bytes)
330 field_value = convert_ros_message_to_dictionary(field_value, binary_array_as_bytes)
335 def _is_ros_binary_type(field_type):
336 """Checks if the field is a binary array one, fixed size or not 338 >>> _is_ros_binary_type("uint8") 340 >>> _is_ros_binary_type("uint8[]") 342 >>> _is_ros_binary_type("uint8[3]") 344 >>> _is_ros_binary_type("char") 346 >>> _is_ros_binary_type("char[]") 348 >>> _is_ros_binary_type("char[3]") 351 return field_type.startswith(
'uint8[')
or field_type.startswith(
'char[')
354 def _convert_from_ros_binary(field_type, field_value):
355 field_value = base64.b64encode(field_value).decode(
'utf-8')
359 def _convert_from_ros_time(field_type, field_value):
360 field_value = {
'secs': field_value.secs,
'nsecs': field_value.nsecs}
364 def _convert_from_ros_primitive(field_type, field_value):
366 if field_type ==
"string" and not python3:
367 field_value = field_value.decode(
'utf-8')
371 def _convert_from_ros_array(field_type, field_value, binary_array_as_bytes=True):
373 list_type = field_type[: field_type.index(
'[')]
374 return [_convert_from_ros_type(list_type, value, binary_array_as_bytes)
for value
in field_value]
377 def _get_message_fields(message):
378 return zip(message.__slots__, message._slot_types)
381 def _is_field_type_an_array(field_type):
382 return field_type.find(
'[') >= 0
385 def _is_field_type_a_primitive_array(field_type):
386 bracket_index = field_type.find(
'[')
387 if bracket_index < 0:
390 list_type = field_type[:bracket_index]
391 return list_type
in ros_primitive_types
394 if __name__ ==
"__main__":