message_converter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2013, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import roslib.message
35 import rospy
36 import base64
37 import sys
38 import copy
39 import collections
40 
41 python3 = (sys.hexversion > 0x03000000)
42 
43 python_list_types = [list, tuple]
44 
45 if python3:
46  python_string_types = [str, bytes]
47  python_int_types = [int]
48 else:
49  python_string_types = [str, unicode]
50  python_int_types = [int, long]
51 
52 python_float_types = [float]
53 
54 ros_to_python_type_map = {
55  'bool' : [bool],
56  'float32' : copy.deepcopy(python_float_types + python_int_types),
57  'float64' : copy.deepcopy(python_float_types + python_int_types),
58  'int8' : copy.deepcopy(python_int_types),
59  'int16' : copy.deepcopy(python_int_types),
60  'int32' : copy.deepcopy(python_int_types),
61  'int64' : copy.deepcopy(python_int_types),
62  'uint8' : copy.deepcopy(python_int_types),
63  'uint16' : copy.deepcopy(python_int_types),
64  'uint32' : copy.deepcopy(python_int_types),
65  'uint64' : copy.deepcopy(python_int_types),
66  'byte' : copy.deepcopy(python_int_types),
67  'char' : copy.deepcopy(python_int_types),
68  'string' : copy.deepcopy(python_string_types)
69 }
70 
71 try:
72  import numpy as np
73  _ros_to_numpy_type_map = {
74  'float32' : [np.float32, np.int8, np.int16, np.uint8, np.uint16],
75  # don't include int32, because conversion to float may change value: v = np.iinfo(np.int32).max; np.float32(v) != v
76  'float64' : [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
77  'int8' : [np.int8],
78  'int16' : [np.int8, np.int16, np.uint8],
79  'int32' : [np.int8, np.int16, np.int32, np.uint8, np.uint16],
80  'int64' : [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
81  'uint8' : [np.uint8],
82  'uint16' : [np.uint8, np.uint16],
83  'uint32' : [np.uint8, np.uint16, np.uint32],
84  'uint64' : [np.uint8, np.uint16, np.uint32, np.uint64],
85  'byte' : [np.int8],
86  'char' : [np.uint8],
87  }
88 
89  # merge type_maps
90  merged = collections.defaultdict(list, ros_to_python_type_map)
91  for k, v in _ros_to_numpy_type_map.items():
92  merged[k].extend(v)
93  ros_to_python_type_map = dict(merged)
94 except ImportError:
95  pass
96 
97 
98 ros_time_types = ['time', 'duration']
99 ros_primitive_types = ['bool', 'byte', 'char', 'int8', 'uint8', 'int16',
100  'uint16', 'int32', 'uint32', 'int64', 'uint64',
101  'float32', 'float64', 'string']
102 ros_header_types = ['Header', 'std_msgs/Header', 'roslib/Header']
103 
104 def convert_dictionary_to_ros_message(message_type, dictionary, kind='message', strict_mode=True,
105  check_missing_fields=False, check_types=True):
106  """
107  Takes in the message type and a Python dictionary and returns a ROS message.
108 
109  Example:
110  >>> msg_type = "std_msgs/String"
111  >>> dict_msg = { "data": "Hello, Robot" }
112  >>> convert_dictionary_to_ros_message(msg_type, dict_msg)
113  data: "Hello, Robot"
114 
115  >>> msg_type = "std_srvs/SetBool"
116  >>> dict_msg = { "data": True }
117  >>> kind = "request"
118  >>> convert_dictionary_to_ros_message(msg_type, dict_msg, kind)
119  data: True
120  """
121  if kind == 'message':
122  message_class = roslib.message.get_message_class(message_type)
123  message = message_class()
124  elif kind == 'request':
125  service_class = roslib.message.get_service_class(message_type)
126  message = service_class._request_class()
127  elif kind == 'response':
128  service_class = roslib.message.get_service_class(message_type)
129  message = service_class._response_class()
130  else:
131  raise ValueError('Unknown kind "%s".' % kind)
132  message_fields = dict(_get_message_fields(message))
133 
134  remaining_message_fields = copy.deepcopy(message_fields)
135 
136  for field_name, field_value in dictionary.items():
137  if field_name in message_fields:
138  field_type = message_fields[field_name]
139  field_value = _convert_to_ros_type(field_name, field_type, field_value, strict_mode, check_missing_fields,
140  check_types)
141  setattr(message, field_name, field_value)
142  del remaining_message_fields[field_name]
143  else:
144  error_message = 'ROS message type "{0}" has no field named "{1}"'\
145  .format(message_type, field_name)
146  if strict_mode:
147  raise ValueError(error_message)
148  else:
149  rospy.logerr('{}! It will be ignored.'.format(error_message))
150 
151  if check_missing_fields and remaining_message_fields:
152  error_message = 'Missing fields "{0}"'.format(remaining_message_fields)
153  raise ValueError(error_message)
154 
155  return message
156 
157 def _convert_to_ros_type(field_name, field_type, field_value, strict_mode=True, check_missing_fields=False,
158  check_types=True):
159  if _is_ros_binary_type(field_type):
160  field_value = _convert_to_ros_binary(field_type, field_value)
161  elif field_type in ros_time_types:
162  field_value = _convert_to_ros_time(field_type, field_value)
163  elif field_type in ros_primitive_types:
164  # Note: one could also use genpy.message.check_type() here, but:
165  # 1. check_type is "not designed to run fast and is meant only for error diagnosis"
166  # 2. it doesn't check floats (see ros/genpy#130)
167  # 3. it rejects numpy types, although they can be serialized
168  if check_types and type(field_value) not in ros_to_python_type_map[field_type]:
169  raise TypeError("Field '{0}' has wrong type {1} (valid types: {2})".format(field_name, type(field_value), ros_to_python_type_map[field_type]))
170  field_value = _convert_to_ros_primitive(field_type, field_value)
171  elif _is_field_type_a_primitive_array(field_type):
172  field_value = field_value
173  elif _is_field_type_an_array(field_type):
174  field_value = _convert_to_ros_array(field_name, field_type, field_value, strict_mode, check_missing_fields,
175  check_types)
176  else:
177  field_value = convert_dictionary_to_ros_message(field_type, field_value, strict_mode=strict_mode,
178  check_missing_fields=check_missing_fields,
179  check_types=check_types)
180  return field_value
181 
182 def _convert_to_ros_binary(field_type, field_value):
183  if type(field_value) in python_string_types:
184  if python3:
185  # base64 in python3 added the `validate` arg:
186  # If field_value is not properly base64 encoded and there are non-base64-alphabet characters in the input,
187  # a binascii.Error will be raised.
188  binary_value_as_string = base64.b64decode(field_value, validate=True)
189  else:
190  # base64 in python2 doesn't have the `validate` arg: characters that are not in the base-64 alphabet are
191  # silently discarded, resulting in garbage output.
192  binary_value_as_string = base64.b64decode(field_value)
193  else:
194  binary_value_as_string = bytes(bytearray(field_value))
195  return binary_value_as_string
196 
197 def _convert_to_ros_time(field_type, field_value):
198  time = None
199 
200  if field_type == 'time' and field_value == 'now':
201  time = rospy.get_rostime()
202  else:
203  if field_type == 'time':
204  time = rospy.rostime.Time()
205  elif field_type == 'duration':
206  time = rospy.rostime.Duration()
207  if 'secs' in field_value:
208  setattr(time, 'secs', field_value['secs'])
209  if 'nsecs' in field_value:
210  setattr(time, 'nsecs', field_value['nsecs'])
211 
212  return time
213 
214 def _convert_to_ros_primitive(field_type, field_value):
215  # std_msgs/msg/_String.py always calls encode() on python3, so don't do it here
216  if field_type == "string" and not python3:
217  field_value = field_value.encode('utf-8')
218  return field_value
219 
220 def _convert_to_ros_array(field_name, field_type, list_value, strict_mode=True, check_missing_fields=False,
221  check_types=True):
222  # use index to raise ValueError if '[' not present
223  list_type = field_type[:field_type.index('[')]
224  return [_convert_to_ros_type(field_name, list_type, value, strict_mode, check_missing_fields, check_types) for value
225  in list_value]
226 
227 def convert_ros_message_to_dictionary(message, binary_array_as_bytes=True):
228  """
229  Takes in a ROS message and returns a Python dictionary.
230 
231  Example:
232  >>> import std_msgs.msg
233  >>> ros_message = std_msgs.msg.UInt32(data=42)
234  >>> convert_ros_message_to_dictionary(ros_message)
235  {'data': 42}
236  """
237  dictionary = {}
238  message_fields = _get_message_fields(message)
239  for field_name, field_type in message_fields:
240  field_value = getattr(message, field_name)
241  dictionary[field_name] = _convert_from_ros_type(field_type, field_value, binary_array_as_bytes)
242 
243  return dictionary
244 
245 
246 def _convert_from_ros_type(field_type, field_value, binary_array_as_bytes=True):
247  if field_type in ros_primitive_types:
248  field_value = _convert_from_ros_primitive(field_type, field_value)
249  elif field_type in ros_time_types:
250  field_value = _convert_from_ros_time(field_type, field_value)
251  elif _is_ros_binary_type(field_type):
252  if binary_array_as_bytes:
253  field_value = _convert_from_ros_binary(field_type, field_value)
254  elif type(field_value) == str:
255  field_value = [ord(v) for v in field_value]
256  else:
257  field_value = list(field_value)
258  elif _is_field_type_a_primitive_array(field_type):
259  field_value = list(field_value)
260  elif _is_field_type_an_array(field_type):
261  field_value = _convert_from_ros_array(field_type, field_value, binary_array_as_bytes)
262  else:
263  field_value = convert_ros_message_to_dictionary(field_value, binary_array_as_bytes)
264 
265  return field_value
266 
267 def _is_ros_binary_type(field_type):
268  """ Checks if the field is a binary array one, fixed size or not
269 
270  >>> _is_ros_binary_type("uint8")
271  False
272  >>> _is_ros_binary_type("uint8[]")
273  True
274  >>> _is_ros_binary_type("uint8[3]")
275  True
276  >>> _is_ros_binary_type("char")
277  False
278  >>> _is_ros_binary_type("char[]")
279  True
280  >>> _is_ros_binary_type("char[3]")
281  True
282  """
283  return field_type.startswith('uint8[') or field_type.startswith('char[')
284 
285 def _convert_from_ros_binary(field_type, field_value):
286  field_value = base64.b64encode(field_value).decode('utf-8')
287  return field_value
288 
289 def _convert_from_ros_time(field_type, field_value):
290  field_value = {
291  'secs' : field_value.secs,
292  'nsecs' : field_value.nsecs
293  }
294  return field_value
295 
296 def _convert_from_ros_primitive(field_type, field_value):
297  # std_msgs/msg/_String.py always calls decode() on python3, so don't do it here
298  if field_type == "string" and not python3:
299  field_value = field_value.decode('utf-8')
300  return field_value
301 
302 def _convert_from_ros_array(field_type, field_value, binary_array_as_bytes=True):
303  # use index to raise ValueError if '[' not present
304  list_type = field_type[:field_type.index('[')]
305  return [_convert_from_ros_type(list_type, value, binary_array_as_bytes) for value in field_value]
306 
307 def _get_message_fields(message):
308  return zip(message.__slots__, message._slot_types)
309 
310 def _is_field_type_an_array(field_type):
311  return field_type.find('[') >= 0
312 
313 def _is_field_type_a_primitive_array(field_type):
314  bracket_index = field_type.find('[')
315  if bracket_index < 0:
316  return False
317  else:
318  list_type = field_type[:bracket_index]
319  return list_type in ros_primitive_types
320 
321 
322 if __name__ == "__main__":
323  import doctest
324  doctest.testmod()


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Tue Mar 2 2021 03:04:27