message_converter.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 #
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2019-2022, Martin Günther (DFKI GmbH) and others
6 # Copyright (c) 2013-2016, Brandon Alexander
7 #
8 # All rights reserved.
9 #
10 # Redistribution and use in source and binary forms, with or without
11 # modification, are permitted provided that the following conditions
12 # are met:
13 #
14 # * Redistributions of source code must retain the above copyright
15 # notice, this list of conditions and the following disclaimer.
16 # * Redistributions in binary form must reproduce the above
17 # copyright notice, this list of conditions and the following
18 # disclaimer in the documentation and/or other materials provided
19 # with the distribution.
20 # * Neither the name of this project nor the names of its
21 # contributors may be used to endorse or promote products derived
22 # from this software without specific prior written permission.
23 #
24 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
28 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
30 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
32 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
34 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35 # POSSIBILITY OF SUCH DAMAGE.
36 
37 import logging
38 import roslib.message
39 import rospy
40 import base64
41 import sys
42 import copy
43 import collections
44 
45 python3 = sys.hexversion > 0x03000000
46 
47 python_list_types = [list, tuple]
48 
49 if python3:
50  python_string_types = [str, bytes]
51  python_int_types = [int]
52 else:
53  python_string_types = [str, unicode] # noqa
54  python_int_types = [int, long] # noqa
55 
56 python_float_types = [float]
57 
58 ros_to_python_type_map = {
59  'bool': [bool],
60  'float32': copy.deepcopy(python_float_types + python_int_types),
61  'float64': copy.deepcopy(python_float_types + python_int_types),
62  'int8': copy.deepcopy(python_int_types),
63  'int16': copy.deepcopy(python_int_types),
64  'int32': copy.deepcopy(python_int_types),
65  'int64': copy.deepcopy(python_int_types),
66  'uint8': copy.deepcopy(python_int_types),
67  'uint16': copy.deepcopy(python_int_types),
68  'uint32': copy.deepcopy(python_int_types),
69  'uint64': copy.deepcopy(python_int_types),
70  'byte': copy.deepcopy(python_int_types),
71  'char': copy.deepcopy(python_int_types),
72  'string': copy.deepcopy(python_string_types),
73 }
74 
75 try:
76  import numpy as np
77 
78  _ros_to_numpy_type_map = {
79  'float32': [np.float32, np.int8, np.int16, np.uint8, np.uint16],
80  # don't include int32, because conversion to float may change value:
81  # v = np.iinfo(np.int32).max; np.float32(v) != v
82  'float64': [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
83  'int8': [np.int8],
84  'int16': [np.int8, np.int16, np.uint8],
85  'int32': [np.int8, np.int16, np.int32, np.uint8, np.uint16],
86  'int64': [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
87  'uint8': [np.uint8],
88  'uint16': [np.uint8, np.uint16],
89  'uint32': [np.uint8, np.uint16, np.uint32],
90  'uint64': [np.uint8, np.uint16, np.uint32, np.uint64],
91  'byte': [np.int8],
92  'char': [np.uint8],
93  }
94 
95  # merge type_maps
96  merged = collections.defaultdict(list, ros_to_python_type_map)
97  for k, v in _ros_to_numpy_type_map.items():
98  merged[k].extend(v)
99  ros_to_python_type_map = dict(merged)
100 except ImportError:
101  pass
102 
103 
104 ros_time_types = ['time', 'duration']
105 ros_primitive_types = [
106  'bool',
107  'byte',
108  'char',
109  'int8',
110  'uint8',
111  'int16',
112  'uint16',
113  'int32',
114  'uint32',
115  'int64',
116  'uint64',
117  'float32',
118  'float64',
119  'string',
120 ]
121 ros_header_types = ['Header', 'std_msgs/Header', 'roslib/Header']
122 
123 
124 def convert_dictionary_to_ros_message(
125  message_type,
126  dictionary,
127  kind='message',
128  strict_mode=True,
129  check_missing_fields=False,
130  check_types=True,
131  log_level='error',
132 ):
133  """
134  Takes in the message type and a Python dictionary and returns a ROS message.
135 
136  Example:
137  >>> msg_type = "std_msgs/String"
138  >>> dict_msg = { "data": "Hello, Robot" }
139  >>> convert_dictionary_to_ros_message(msg_type, dict_msg)
140  data: "Hello, Robot"
141 
142  >>> msg_type = "std_srvs/SetBool"
143  >>> dict_msg = { "data": True }
144  >>> kind = "request"
145  >>> convert_dictionary_to_ros_message(msg_type, dict_msg, kind)
146  data: True
147  """
148  if kind == 'message':
149  message_class = roslib.message.get_message_class(message_type)
150  message = message_class()
151  elif kind == 'request':
152  service_class = roslib.message.get_service_class(message_type)
153  message = service_class._request_class()
154  elif kind == 'response':
155  service_class = roslib.message.get_service_class(message_type)
156  message = service_class._response_class()
157  else:
158  raise ValueError('Unknown kind "%s".' % kind)
159  message_fields = dict(_get_message_fields(message))
160 
161  remaining_message_fields = copy.deepcopy(message_fields)
162 
163  if dictionary is None:
164  dictionary = {}
165  for field_name, field_value in dictionary.items():
166  if field_name in message_fields:
167  field_type = message_fields[field_name]
168  if field_value is not None:
169  field_value = _convert_to_ros_type(
170  field_name, field_type, field_value, strict_mode, check_missing_fields, check_types, log_level
171  )
172  setattr(message, field_name, field_value)
173  del remaining_message_fields[field_name]
174  else:
175  error_message = 'ROS message type "{0}" has no field named "{1}"'.format(message_type, field_name)
176  if strict_mode:
177  raise ValueError(error_message)
178  else:
179  if log_level not in ["debug", "info", "warning", "error", "critical"]:
180  log_level = "error"
181  logger = logging.getLogger('rosout')
182  log_func = getattr(logger, log_level)
183 
184  log_func('{}! It will be ignored.'.format(error_message))
185 
186  if check_missing_fields and remaining_message_fields:
187  error_message = 'Missing fields "{0}"'.format(remaining_message_fields)
188  raise ValueError(error_message)
189 
190  return message
191 
192 
193 def _convert_to_ros_type(
194  field_name,
195  field_type,
196  field_value,
197  strict_mode=True,
198  check_missing_fields=False,
199  check_types=True,
200  log_level='error',
201 ):
202  if _is_ros_binary_type(field_type):
203  field_value = _convert_to_ros_binary(field_type, field_value)
204  elif field_type in ros_time_types:
205  field_value = _convert_to_ros_time(field_type, field_value)
206  elif field_type in ros_primitive_types:
207  # Note: one could also use genpy.message.check_type() here, but:
208  # 1. check_type is "not designed to run fast and is meant only for error diagnosis"
209  # 2. it doesn't check floats (see ros/genpy#130)
210  # 3. it rejects numpy types, although they can be serialized
211  if check_types and type(field_value) not in ros_to_python_type_map[field_type]:
212  raise TypeError(
213  "Field '{0}' has wrong type {1} (valid types: {2})".format(
214  field_name, type(field_value), ros_to_python_type_map[field_type]
215  )
216  )
217  field_value = _convert_to_ros_primitive(field_type, field_value)
218  elif _is_field_type_a_primitive_array(field_type):
219  field_value = field_value
220  elif _is_field_type_an_array(field_type):
221  field_value = _convert_to_ros_array(
222  field_name, field_type, field_value, strict_mode, check_missing_fields, check_types, log_level
223  )
224  else:
225  field_value = convert_dictionary_to_ros_message(
226  field_type,
227  field_value,
228  strict_mode=strict_mode,
229  check_missing_fields=check_missing_fields,
230  check_types=check_types,
231  log_level=log_level,
232  )
233  return field_value
234 
235 
236 def _convert_to_ros_binary(field_type, field_value):
237  if type(field_value) in python_string_types:
238  if python3:
239  # base64 in python3 added the `validate` arg:
240  # If field_value is not properly base64 encoded and there are non-base64-alphabet characters in the input,
241  # a binascii.Error will be raised.
242  binary_value_as_string = base64.b64decode(field_value, validate=True)
243  else:
244  # base64 in python2 doesn't have the `validate` arg: characters that are not in the base-64 alphabet are
245  # silently discarded, resulting in garbage output.
246  binary_value_as_string = base64.b64decode(field_value)
247  else:
248  binary_value_as_string = bytes(bytearray(field_value))
249  return binary_value_as_string
250 
251 
252 def _convert_to_ros_time(field_type, field_value):
253  time = None
254 
255  if field_type == 'time' and field_value == 'now':
256  time = rospy.get_rostime()
257  else:
258  if field_type == 'time':
259  time = rospy.rostime.Time()
260  elif field_type == 'duration':
261  time = rospy.rostime.Duration()
262  if 'secs' in field_value and field_value['secs'] is not None:
263  setattr(time, 'secs', field_value['secs'])
264  if 'nsecs' in field_value and field_value['nsecs'] is not None:
265  setattr(time, 'nsecs', field_value['nsecs'])
266 
267  return time
268 
269 
270 def _convert_to_ros_primitive(field_type, field_value):
271  # std_msgs/msg/_String.py always calls encode() on python3, so don't do it here
272  if field_type == "string" and not python3:
273  field_value = field_value.encode('utf-8')
274  return field_value
275 
276 
277 def _convert_to_ros_array(
278  field_name,
279  field_type,
280  list_value,
281  strict_mode=True,
282  check_missing_fields=False,
283  check_types=True,
284  log_level='error',
285 ):
286  # use index to raise ValueError if '[' not present
287  list_type = field_type[: field_type.index('[')]
288  return [
289  _convert_to_ros_type(field_name, list_type, value, strict_mode, check_missing_fields, check_types, log_level)
290  for value in list_value
291  ]
292 
293 
294 def convert_ros_message_to_dictionary(message, binary_array_as_bytes=True):
295  """
296  Takes in a ROS message and returns a Python dictionary.
297 
298  Example:
299  >>> import std_msgs.msg
300  >>> ros_message = std_msgs.msg.UInt32(data=42)
301  >>> convert_ros_message_to_dictionary(ros_message)
302  {'data': 42}
303  """
304  dictionary = {}
305  message_fields = _get_message_fields(message)
306  for field_name, field_type in message_fields:
307  field_value = getattr(message, field_name)
308  dictionary[field_name] = _convert_from_ros_type(field_type, field_value, binary_array_as_bytes)
309 
310  return dictionary
311 
312 
313 def _convert_from_ros_type(field_type, field_value, binary_array_as_bytes=True):
314  if field_type in ros_primitive_types:
315  field_value = _convert_from_ros_primitive(field_type, field_value)
316  elif field_type in ros_time_types:
317  field_value = _convert_from_ros_time(field_type, field_value)
318  elif _is_ros_binary_type(field_type):
319  if binary_array_as_bytes:
320  field_value = _convert_from_ros_binary(field_type, field_value)
321  elif type(field_value) == str:
322  field_value = [ord(v) for v in field_value]
323  else:
324  field_value = list(field_value)
325  elif _is_field_type_a_primitive_array(field_type):
326  field_value = list(field_value)
327  elif _is_field_type_an_array(field_type):
328  field_value = _convert_from_ros_array(field_type, field_value, binary_array_as_bytes)
329  else:
330  field_value = convert_ros_message_to_dictionary(field_value, binary_array_as_bytes)
331 
332  return field_value
333 
334 
335 def _is_ros_binary_type(field_type):
336  """Checks if the field is a binary array one, fixed size or not
337 
338  >>> _is_ros_binary_type("uint8")
339  False
340  >>> _is_ros_binary_type("uint8[]")
341  True
342  >>> _is_ros_binary_type("uint8[3]")
343  True
344  >>> _is_ros_binary_type("char")
345  False
346  >>> _is_ros_binary_type("char[]")
347  True
348  >>> _is_ros_binary_type("char[3]")
349  True
350  """
351  return field_type.startswith('uint8[') or field_type.startswith('char[')
352 
353 
354 def _convert_from_ros_binary(field_type, field_value):
355  field_value = base64.b64encode(field_value).decode('utf-8')
356  return field_value
357 
358 
359 def _convert_from_ros_time(field_type, field_value):
360  field_value = {'secs': field_value.secs, 'nsecs': field_value.nsecs}
361  return field_value
362 
363 
364 def _convert_from_ros_primitive(field_type, field_value):
365  # std_msgs/msg/_String.py always calls decode() on python3, so don't do it here
366  if field_type == "string" and not python3:
367  field_value = field_value.decode('utf-8')
368  return field_value
369 
370 
371 def _convert_from_ros_array(field_type, field_value, binary_array_as_bytes=True):
372  # use index to raise ValueError if '[' not present
373  list_type = field_type[: field_type.index('[')]
374  return [_convert_from_ros_type(list_type, value, binary_array_as_bytes) for value in field_value]
375 
376 
377 def _get_message_fields(message):
378  return zip(message.__slots__, message._slot_types)
379 
380 
381 def _is_field_type_an_array(field_type):
382  return field_type.find('[') >= 0
383 
384 
385 def _is_field_type_a_primitive_array(field_type):
386  bracket_index = field_type.find('[')
387  if bracket_index < 0:
388  return False
389  else:
390  list_type = field_type[:bracket_index]
391  return list_type in ros_primitive_types
392 
393 
394 if __name__ == "__main__":
395  import doctest
396 
397  doctest.testmod()


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Thu Dec 22 2022 03:33:21