message_converter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2013, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 from __future__ import absolute_import
35 
36 import roslib.message
37 import rospy
38 import base64
39 import sys
40 import copy
41 import collections
42 
43 from .custom_mappings import map_api2ros, map_ros2api
44 
45 python3 = (sys.hexversion > 0x03000000)
46 
47 python_list_types = [list, tuple]
48 
49 if python3:
50  python_string_types = [str, bytes]
51  python_int_types = [int]
52 else:
53  python_string_types = [str, unicode]
54  python_int_types = [int, long]
55 
56 python_float_types = [float]
57 
58 ros_to_python_type_map = {
59  'bool' : [bool],
60  'float32' : copy.deepcopy(python_float_types + python_int_types),
61  'float64' : copy.deepcopy(python_float_types + python_int_types),
62  'int8' : copy.deepcopy(python_int_types),
63  'int16' : copy.deepcopy(python_int_types),
64  'int32' : copy.deepcopy(python_int_types),
65  'int64' : copy.deepcopy(python_int_types),
66  'uint8' : copy.deepcopy(python_int_types),
67  'uint16' : copy.deepcopy(python_int_types),
68  'uint32' : copy.deepcopy(python_int_types),
69  'uint64' : copy.deepcopy(python_int_types),
70  'byte' : copy.deepcopy(python_int_types),
71  'char' : copy.deepcopy(python_int_types),
72  'string' : copy.deepcopy(python_string_types)
73 }
74 
75 try:
76  import numpy as np
77  _ros_to_numpy_type_map = {
78  'float32' : [np.float32, np.int8, np.int16, np.uint8, np.uint16],
79  # don't include int32, because conversion to float may change value: v = np.iinfo(np.int32).max; np.float32(v) != v
80  'float64' : [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
81  'int8' : [np.int8],
82  'int16' : [np.int8, np.int16, np.uint8],
83  'int32' : [np.int8, np.int16, np.int32, np.uint8, np.uint16],
84  'int64' : [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
85  'uint8' : [np.uint8],
86  'uint16' : [np.uint8, np.uint16],
87  'uint32' : [np.uint8, np.uint16, np.uint32],
88  'uint64' : [np.uint8, np.uint16, np.uint32, np.uint64],
89  'byte' : [np.int8],
90  'char' : [np.uint8],
91  }
92 
93  # merge type_maps
94  merged = collections.defaultdict(list, ros_to_python_type_map)
95  for k, v in _ros_to_numpy_type_map.items():
96  merged[k].extend(v)
97  ros_to_python_type_map = dict(merged)
98 except ImportError:
99  pass
100 
101 
102 ros_time_types = ['time', 'duration']
103 ros_primitive_types = ['bool', 'byte', 'char', 'int8', 'uint8', 'int16',
104  'uint16', 'int32', 'uint32', 'int64', 'uint64',
105  'float32', 'float64', 'string']
106 ros_header_types = ['Header', 'std_msgs/Header', 'roslib/Header']
107 
108 def convert_dictionary_to_ros_message(message_type, dictionary, kind='message', strict_mode=True,
109  check_missing_fields=False, check_types=True):
110  """
111  Takes in the message type and a Python dictionary and returns a ROS message.
112 
113  Example:
114  >>> msg_type = "std_msgs/String"
115  >>> dict_msg = { "data": "Hello, Robot" }
116  >>> convert_dictionary_to_ros_message(msg_type, dict_msg)
117  data: "Hello, Robot"
118 
119  >>> msg_type = "std_srvs/SetBool"
120  >>> dict_msg = { "data": True }
121  >>> kind = "request"
122  >>> convert_dictionary_to_ros_message(msg_type, dict_msg, kind)
123  data: True
124  """
125  if hasattr(message_type, '_type'):
126  message = message_type
127  message_type = message._type
128  elif type(message_type) in python_string_types:
129  if kind == 'message':
130  message_class = roslib.message.get_message_class(message_type)
131  message = message_class()
132  elif kind == 'request':
133  service_class = roslib.message.get_service_class(message_type)
134  message = service_class._request_class()
135  elif kind == 'response':
136  service_class = roslib.message.get_service_class(message_type)
137  message = service_class._response_class()
138  else:
139  raise ValueError('Unknown kind "%s".' % kind)
140  else:
141  raise ValueError('message_type is neither a ROS message instance nor type string')
142 
143  # do our custom mappings if required
144  mapped_dict = map_api2ros(dictionary, message_type)
145 
146  message_fields = dict(_get_message_fields(message))
147  remaining_message_fields = copy.deepcopy(message_fields)
148 
149  for field_name, field_value in mapped_dict.items():
150  if field_name in message_fields:
151  field_type = message_fields[field_name]
152  field_value = _convert_to_ros_type(field_name, field_type, field_value, strict_mode, check_missing_fields,
153  check_types)
154  setattr(message, field_name, field_value)
155  del remaining_message_fields[field_name]
156  else:
157  error_message = 'ROS message type "{0}" has no field named "{1}"'\
158  .format(message_type, field_name)
159  if strict_mode:
160  raise ValueError(error_message)
161  else:
162  rospy.logerr('{}! It will be ignored.'.format(error_message))
163 
164  if check_missing_fields and remaining_message_fields:
165  error_message = 'Missing fields "{0}"'.format(remaining_message_fields)
166  raise ValueError(error_message)
167 
168  return message
169 
170 def _convert_to_ros_type(field_name, field_type, field_value, strict_mode=True, check_missing_fields=False,
171  check_types=True):
172  if _is_ros_binary_type(field_type):
173  field_value = _convert_to_ros_binary(field_type, field_value)
174  elif field_type in ros_time_types:
175  field_value = _convert_to_ros_time(field_type, field_value)
176  elif field_type in ros_primitive_types:
177  # Note: one could also use genpy.message.check_type() here, but:
178  # 1. check_type is "not designed to run fast and is meant only for error diagnosis"
179  # 2. it doesn't check floats (see ros/genpy#130)
180  # 3. it rejects numpy types, although they can be serialized
181  if check_types and type(field_value) not in ros_to_python_type_map[field_type]:
182  raise TypeError("Field '{0}' has wrong type {1} (valid types: {2})".format(field_name, type(field_value), ros_to_python_type_map[field_type]))
183  field_value = _convert_to_ros_primitive(field_type, field_value)
184  elif _is_field_type_a_primitive_array(field_type):
185  field_value = field_value
186  elif _is_field_type_an_array(field_type):
187  field_value = _convert_to_ros_array(field_name, field_type, field_value, strict_mode, check_missing_fields,
188  check_types)
189  else:
190  field_value = convert_dictionary_to_ros_message(field_type, field_value, strict_mode=strict_mode,
191  check_missing_fields=check_missing_fields,
192  check_types=check_types)
193  return field_value
194 
195 def _convert_to_ros_binary(field_type, field_value):
196  if type(field_value) in python_string_types:
197  if python3:
198  # base64 in python3 added the `validate` arg:
199  # If field_value is not properly base64 encoded and there are non-base64-alphabet characters in the input,
200  # a binascii.Error will be raised.
201  binary_value_as_string = base64.b64decode(field_value, validate=True)
202  else:
203  # base64 in python2 doesn't have the `validate` arg: characters that are not in the base-64 alphabet are
204  # silently discarded, resulting in garbage output.
205  binary_value_as_string = base64.b64decode(field_value)
206  else:
207  binary_value_as_string = bytes(bytearray(field_value))
208  return binary_value_as_string
209 
210 def _convert_to_ros_time(field_type, field_value):
211  time = None
212 
213  if field_type == 'time' and field_value == 'now':
214  time = rospy.get_rostime()
215  else:
216  if field_type == 'time':
217  time = rospy.rostime.Time()
218  elif field_type == 'duration':
219  time = rospy.rostime.Duration()
220  if 'sec' in field_value:
221  setattr(time, 'secs', field_value['sec'])
222  if 'nsec' in field_value:
223  setattr(time, 'nsecs', field_value['nsec'])
224 
225  return time
226 
227 def _convert_to_ros_primitive(field_type, field_value):
228  # std_msgs/msg/_String.py always calls encode() on python3, so don't do it here
229  if field_type == "string" and not python3:
230  field_value = field_value.encode('utf-8')
231  return field_value
232 
233 def _convert_to_ros_array(field_name, field_type, list_value, strict_mode=True, check_missing_fields=False,
234  check_types=True):
235  # use index to raise ValueError if '[' not present
236  list_type = field_type[:field_type.index('[')]
237  return [_convert_to_ros_type(field_name, list_type, value, strict_mode, check_missing_fields, check_types) for value
238  in list_value]
239 
240 def convert_ros_message_to_dictionary(message, binary_array_as_bytes=True):
241  """
242  Takes in a ROS message and returns a Python dictionary.
243 
244  Example:
245  >>> import std_msgs.msg
246  >>> ros_message = std_msgs.msg.UInt32(data=42)
247  >>> convert_ros_message_to_dictionary(ros_message)
248  {'data': 42}
249  """
250  dictionary = {}
251  message_fields = _get_message_fields(message)
252  for field_name, field_type in message_fields:
253  field_value = getattr(message, field_name)
254  dictionary[field_name] = _convert_from_ros_type(field_type, field_value, binary_array_as_bytes)
255 
256  # do our custom mappings if required
257  mapped_dict = map_ros2api(dictionary, message._type)
258  return mapped_dict
259 
260 
261 def _convert_from_ros_type(field_type, field_value, binary_array_as_bytes=True):
262  if field_type in ros_primitive_types:
263  field_value = _convert_from_ros_primitive(field_type, field_value)
264  elif field_type in ros_time_types:
265  field_value = _convert_from_ros_time(field_type, field_value)
266  elif _is_ros_binary_type(field_type):
267  if binary_array_as_bytes:
268  field_value = _convert_from_ros_binary(field_type, field_value)
269  elif isinstance(field_value, str):
270  field_value = [ord(v) for v in field_value]
271  else:
272  field_value = list(field_value)
273  elif _is_field_type_a_primitive_array(field_type):
274  field_value = list(field_value)
275  elif _is_field_type_an_array(field_type):
276  field_value = _convert_from_ros_array(field_type, field_value, binary_array_as_bytes)
277  else:
278  field_value = convert_ros_message_to_dictionary(field_value, binary_array_as_bytes)
279 
280  return field_value
281 
282 def _is_ros_binary_type(field_type):
283  """ Checks if the field is a binary array one, fixed size or not
284 
285  >>> _is_ros_binary_type("uint8")
286  False
287  >>> _is_ros_binary_type("uint8[]")
288  True
289  >>> _is_ros_binary_type("uint8[3]")
290  True
291  >>> _is_ros_binary_type("char")
292  False
293  >>> _is_ros_binary_type("char[]")
294  True
295  >>> _is_ros_binary_type("char[3]")
296  True
297  """
298  return field_type.startswith('uint8[') or field_type.startswith('char[')
299 
300 def _convert_from_ros_binary(field_type, field_value):
301  field_value = base64.b64encode(field_value).decode('utf-8')
302  return field_value
303 
304 def _convert_from_ros_time(field_type, field_value):
305  field_value = {
306  'sec' : field_value.secs,
307  'nsec' : field_value.nsecs
308  }
309  return field_value
310 
311 def _convert_from_ros_primitive(field_type, field_value):
312  # std_msgs/msg/_String.py always calls decode() on python3, so don't do it here
313  if field_type == "string" and not python3:
314  field_value = field_value.decode('utf-8')
315  return field_value
316 
317 def _convert_from_ros_array(field_type, field_value, binary_array_as_bytes=True):
318  # use index to raise ValueError if '[' not present
319  list_type = field_type[:field_type.index('[')]
320  return [_convert_from_ros_type(list_type, value, binary_array_as_bytes) for value in field_value]
321 
322 def _get_message_fields(message):
323  return zip(message.__slots__, message._slot_types)
324 
325 def _is_field_type_an_array(field_type):
326  return field_type.find('[') >= 0
327 
328 def _is_field_type_a_primitive_array(field_type):
329  bracket_index = field_type.find('[')
330  if bracket_index < 0:
331  return False
332  else:
333  list_type = field_type[:bracket_index]
334  return list_type in ros_primitive_types
rc_reason_clients.custom_mappings.map_ros2api
def map_ros2api(msg, rostype)
Definition: custom_mappings.py:32
rc_reason_clients.custom_mappings.map_api2ros
def map_api2ros(msg, rostype)
Definition: custom_mappings.py:123


rc_reason_clients
Author(s):
autogenerated on Sat Nov 23 2024 03:19:24