34 from __future__
import absolute_import
43 from .custom_mappings
import map_api2ros, map_ros2api
45 python3 = (sys.hexversion > 0x03000000)
47 python_list_types = [list, tuple]
50 python_string_types = [str, bytes]
51 python_int_types = [int]
53 python_string_types = [str, unicode]
54 python_int_types = [int, long]
56 python_float_types = [float]
58 ros_to_python_type_map = {
60 'float32' : copy.deepcopy(python_float_types + python_int_types),
61 'float64' : copy.deepcopy(python_float_types + python_int_types),
62 'int8' : copy.deepcopy(python_int_types),
63 'int16' : copy.deepcopy(python_int_types),
64 'int32' : copy.deepcopy(python_int_types),
65 'int64' : copy.deepcopy(python_int_types),
66 'uint8' : copy.deepcopy(python_int_types),
67 'uint16' : copy.deepcopy(python_int_types),
68 'uint32' : copy.deepcopy(python_int_types),
69 'uint64' : copy.deepcopy(python_int_types),
70 'byte' : copy.deepcopy(python_int_types),
71 'char' : copy.deepcopy(python_int_types),
72 'string' : copy.deepcopy(python_string_types)
77 _ros_to_numpy_type_map = {
78 'float32' : [np.float32, np.int8, np.int16, np.uint8, np.uint16],
80 'float64' : [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
82 'int16' : [np.int8, np.int16, np.uint8],
83 'int32' : [np.int8, np.int16, np.int32, np.uint8, np.uint16],
84 'int64' : [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
86 'uint16' : [np.uint8, np.uint16],
87 'uint32' : [np.uint8, np.uint16, np.uint32],
88 'uint64' : [np.uint8, np.uint16, np.uint32, np.uint64],
94 merged = collections.defaultdict(list, ros_to_python_type_map)
95 for k, v
in _ros_to_numpy_type_map.items():
97 ros_to_python_type_map = dict(merged)
102 ros_time_types = [
'time',
'duration']
103 ros_primitive_types = [
'bool',
'byte',
'char',
'int8',
'uint8',
'int16',
104 'uint16',
'int32',
'uint32',
'int64',
'uint64',
105 'float32',
'float64',
'string']
106 ros_header_types = [
'Header',
'std_msgs/Header',
'roslib/Header']
108 def convert_dictionary_to_ros_message(message_type, dictionary, kind='message', strict_mode=True,
109 check_missing_fields=False, check_types=True):
111 Takes in the message type and a Python dictionary and returns a ROS message.
114 >>> msg_type = "std_msgs/String"
115 >>> dict_msg = { "data": "Hello, Robot" }
116 >>> convert_dictionary_to_ros_message(msg_type, dict_msg)
119 >>> msg_type = "std_srvs/SetBool"
120 >>> dict_msg = { "data": True }
122 >>> convert_dictionary_to_ros_message(msg_type, dict_msg, kind)
125 if hasattr(message_type,
'_type'):
126 message = message_type
127 message_type = message._type
128 elif type(message_type)
in python_string_types:
129 if kind ==
'message':
130 message_class = roslib.message.get_message_class(message_type)
131 message = message_class()
132 elif kind ==
'request':
133 service_class = roslib.message.get_service_class(message_type)
134 message = service_class._request_class()
135 elif kind ==
'response':
136 service_class = roslib.message.get_service_class(message_type)
137 message = service_class._response_class()
139 raise ValueError(
'Unknown kind "%s".' % kind)
141 raise ValueError(
'message_type is neither a ROS message instance nor type string')
144 mapped_dict =
map_api2ros(dictionary, message_type)
146 message_fields = dict(_get_message_fields(message))
147 remaining_message_fields = copy.deepcopy(message_fields)
149 for field_name, field_value
in mapped_dict.items():
150 if field_name
in message_fields:
151 field_type = message_fields[field_name]
152 field_value = _convert_to_ros_type(field_name, field_type, field_value, strict_mode, check_missing_fields,
154 setattr(message, field_name, field_value)
155 del remaining_message_fields[field_name]
157 error_message =
'ROS message type "{0}" has no field named "{1}"'\
158 .format(message_type, field_name)
160 raise ValueError(error_message)
162 rospy.logerr(
'{}! It will be ignored.'.format(error_message))
164 if check_missing_fields
and remaining_message_fields:
165 error_message =
'Missing fields "{0}"'.format(remaining_message_fields)
166 raise ValueError(error_message)
170 def _convert_to_ros_type(field_name, field_type, field_value, strict_mode=True, check_missing_fields=False,
172 if _is_ros_binary_type(field_type):
173 field_value = _convert_to_ros_binary(field_type, field_value)
174 elif field_type
in ros_time_types:
175 field_value = _convert_to_ros_time(field_type, field_value)
176 elif field_type
in ros_primitive_types:
181 if check_types
and type(field_value)
not in ros_to_python_type_map[field_type]:
182 raise TypeError(
"Field '{0}' has wrong type {1} (valid types: {2})".format(field_name, type(field_value), ros_to_python_type_map[field_type]))
183 field_value = _convert_to_ros_primitive(field_type, field_value)
184 elif _is_field_type_a_primitive_array(field_type):
185 field_value = field_value
186 elif _is_field_type_an_array(field_type):
187 field_value = _convert_to_ros_array(field_name, field_type, field_value, strict_mode, check_missing_fields,
190 field_value = convert_dictionary_to_ros_message(field_type, field_value, strict_mode=strict_mode,
191 check_missing_fields=check_missing_fields,
192 check_types=check_types)
195 def _convert_to_ros_binary(field_type, field_value):
196 if type(field_value)
in python_string_types:
201 binary_value_as_string = base64.b64decode(field_value, validate=
True)
205 binary_value_as_string = base64.b64decode(field_value)
207 binary_value_as_string = bytes(bytearray(field_value))
208 return binary_value_as_string
210 def _convert_to_ros_time(field_type, field_value):
213 if field_type ==
'time' and field_value ==
'now':
214 time = rospy.get_rostime()
216 if field_type ==
'time':
217 time = rospy.rostime.Time()
218 elif field_type ==
'duration':
219 time = rospy.rostime.Duration()
220 if 'sec' in field_value:
221 setattr(time,
'secs', field_value[
'sec'])
222 if 'nsec' in field_value:
223 setattr(time,
'nsecs', field_value[
'nsec'])
227 def _convert_to_ros_primitive(field_type, field_value):
229 if field_type ==
"string" and not python3:
230 field_value = field_value.encode(
'utf-8')
233 def _convert_to_ros_array(field_name, field_type, list_value, strict_mode=True, check_missing_fields=False,
236 list_type = field_type[:field_type.index(
'[')]
237 return [_convert_to_ros_type(field_name, list_type, value, strict_mode, check_missing_fields, check_types)
for value
240 def convert_ros_message_to_dictionary(message, binary_array_as_bytes=True):
242 Takes in a ROS message and returns a Python dictionary.
245 >>> import std_msgs.msg
246 >>> ros_message = std_msgs.msg.UInt32(data=42)
247 >>> convert_ros_message_to_dictionary(ros_message)
251 message_fields = _get_message_fields(message)
252 for field_name, field_type
in message_fields:
253 field_value = getattr(message, field_name)
254 dictionary[field_name] = _convert_from_ros_type(field_type, field_value, binary_array_as_bytes)
257 mapped_dict =
map_ros2api(dictionary, message._type)
261 def _convert_from_ros_type(field_type, field_value, binary_array_as_bytes=True):
262 if field_type
in ros_primitive_types:
263 field_value = _convert_from_ros_primitive(field_type, field_value)
264 elif field_type
in ros_time_types:
265 field_value = _convert_from_ros_time(field_type, field_value)
266 elif _is_ros_binary_type(field_type):
267 if binary_array_as_bytes:
268 field_value = _convert_from_ros_binary(field_type, field_value)
269 elif isinstance(field_value, str):
270 field_value = [ord(v)
for v
in field_value]
272 field_value = list(field_value)
273 elif _is_field_type_a_primitive_array(field_type):
274 field_value = list(field_value)
275 elif _is_field_type_an_array(field_type):
276 field_value = _convert_from_ros_array(field_type, field_value, binary_array_as_bytes)
278 field_value = convert_ros_message_to_dictionary(field_value, binary_array_as_bytes)
282 def _is_ros_binary_type(field_type):
283 """ Checks if the field is a binary array one, fixed size or not
285 >>> _is_ros_binary_type("uint8")
287 >>> _is_ros_binary_type("uint8[]")
289 >>> _is_ros_binary_type("uint8[3]")
291 >>> _is_ros_binary_type("char")
293 >>> _is_ros_binary_type("char[]")
295 >>> _is_ros_binary_type("char[3]")
298 return field_type.startswith(
'uint8[')
or field_type.startswith(
'char[')
300 def _convert_from_ros_binary(field_type, field_value):
301 field_value = base64.b64encode(field_value).decode(
'utf-8')
304 def _convert_from_ros_time(field_type, field_value):
306 'sec' : field_value.secs,
307 'nsec' : field_value.nsecs
311 def _convert_from_ros_primitive(field_type, field_value):
313 if field_type ==
"string" and not python3:
314 field_value = field_value.decode(
'utf-8')
317 def _convert_from_ros_array(field_type, field_value, binary_array_as_bytes=True):
319 list_type = field_type[:field_type.index(
'[')]
320 return [_convert_from_ros_type(list_type, value, binary_array_as_bytes)
for value
in field_value]
322 def _get_message_fields(message):
323 return zip(message.__slots__, message._slot_types)
325 def _is_field_type_an_array(field_type):
326 return field_type.find(
'[') >= 0
328 def _is_field_type_a_primitive_array(field_type):
329 bracket_index = field_type.find(
'[')
330 if bracket_index < 0:
333 list_type = field_type[:bracket_index]
334 return list_type
in ros_primitive_types