message_converter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2013, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import roslib.message
35 import rospy
36 import re
37 import base64
38 import sys
39 python3 = True if sys.hexversion > 0x03000000 else False
40 python_to_ros_type_map = {
41  'bool' : ['bool'],
42  'int' : ['int8', 'byte', 'uint8', 'char',
43  'int16', 'uint16', 'int32', 'uint32',
44  'int64', 'uint64', 'float32', 'float64'],
45  'float' : ['float32', 'float64'],
46  'str' : ['string'],
47  'unicode' : ['string'],
48  'long' : ['uint64']
49 }
50 if python3:
51  python_string_types = [str]
52 else:
53  python_string_types = [str, unicode]
54 python_list_types = [list, tuple]
55 
56 ros_time_types = ['time', 'duration']
57 ros_primitive_types = ['bool', 'byte', 'char', 'int8', 'uint8', 'int16',
58  'uint16', 'int32', 'uint32', 'int64', 'uint64',
59  'float32', 'float64', 'string']
60 ros_header_types = ['Header', 'std_msgs/Header', 'roslib/Header']
61 ros_binary_types_regexp = re.compile(r'(uint8|char)\[[^\]]*\]')
62 
63 list_brackets = re.compile(r'\[[^\]]*\]')
64 
65 def convert_dictionary_to_ros_message(message_type, dictionary, kind='message', strict_mode=True):
66  """
67  Takes in the message type and a Python dictionary and returns a ROS message.
68 
69  Example:
70  message_type = "std_msgs/String"
71  dict_message = { "data": "Hello, Robot" }
72  ros_message = convert_dictionary_to_ros_message(message_type, dict_message)
73 
74  message_type = "std_srvs/SetBool"
75  dict_message = { "data": True }
76  kind = "request"
77  ros_message = convert_dictionary_to_ros_message(message_type, dict_message, kind)
78  """
79  if kind == 'message':
80  message_class = roslib.message.get_message_class(message_type)
81  message = message_class()
82  elif kind == 'request':
83  service_class = roslib.message.get_service_class(message_type)
84  message = service_class._request_class()
85  elif kind == 'response':
86  service_class = roslib.message.get_service_class(message_type)
87  message = service_class._response_class()
88  else:
89  raise ValueError('Unknown kind "%s".' % kind)
90  message_fields = dict(_get_message_fields(message))
91 
92  for field_name, field_value in dictionary.items():
93  if field_name in message_fields:
94  field_type = message_fields[field_name]
95  field_value = _convert_to_ros_type(field_type, field_value)
96  setattr(message, field_name, field_value)
97  else:
98  error_message = 'ROS message type "{0}" has no field named "{1}"'\
99  .format(message_type, field_name)
100  if strict_mode:
101  raise ValueError(error_message)
102  else:
103  rospy.logerr('{}! It will be ignored.'.format(error_message))
104 
105  return message
106 
107 def _convert_to_ros_type(field_type, field_value):
108  if is_ros_binary_type(field_type, field_value):
109  field_value = _convert_to_ros_binary(field_type, field_value)
110  elif field_type in ros_time_types:
111  field_value = _convert_to_ros_time(field_type, field_value)
112  elif field_type in ros_primitive_types:
113  field_value = _convert_to_ros_primitive(field_type, field_value)
114  elif _is_field_type_a_primitive_array(field_type):
115  field_value = field_value
116  elif _is_field_type_an_array(field_type):
117  field_value = _convert_to_ros_array(field_type, field_value)
118  else:
119  field_value = convert_dictionary_to_ros_message(field_type, field_value)
120 
121  return field_value
122 
123 def _convert_to_ros_binary(field_type, field_value):
124  if type(field_value) in python_string_types:
125  binary_value_as_string = base64.standard_b64decode(field_value)
126  else:
127  binary_value_as_string = str(bytearray(field_value))
128 
129  return binary_value_as_string
130 
131 def _convert_to_ros_time(field_type, field_value):
132  time = None
133 
134  if field_type == 'time' and field_value == 'now':
135  time = rospy.get_rostime()
136  else:
137  if field_type == 'time':
138  time = rospy.rostime.Time()
139  elif field_type == 'duration':
140  time = rospy.rostime.Duration()
141  if 'secs' in field_value:
142  setattr(time, 'secs', field_value['secs'])
143  if 'nsecs' in field_value:
144  setattr(time, 'nsecs', field_value['nsecs'])
145 
146  return time
147 
148 def _convert_to_ros_primitive(field_type, field_value):
149  if field_type == "string":
150  field_value = field_value.encode('utf-8')
151  return field_value
152 
153 def _convert_to_ros_array(field_type, list_value):
154  list_type = list_brackets.sub('', field_type)
155  return [_convert_to_ros_type(list_type, value) for value in list_value]
156 
158  """
159  Takes in a ROS message and returns a Python dictionary.
160 
161  Example:
162  ros_message = std_msgs.msg.String(data="Hello, Robot")
163  dict_message = convert_ros_message_to_dictionary(ros_message)
164  """
165  dictionary = {}
166  message_fields = _get_message_fields(message)
167  for field_name, field_type in message_fields:
168  field_value = getattr(message, field_name)
169  dictionary[field_name] = _convert_from_ros_type(field_type, field_value)
170 
171  return dictionary
172 
173 def _convert_from_ros_type(field_type, field_value):
174  if is_ros_binary_type(field_type, field_value):
175  field_value = _convert_from_ros_binary(field_type, field_value)
176  elif field_type in ros_time_types:
177  field_value = _convert_from_ros_time(field_type, field_value)
178  elif field_type in ros_primitive_types:
179  field_value = field_value
180  elif _is_field_type_a_primitive_array(field_type):
181  field_value = list(field_value)
182  elif _is_field_type_an_array(field_type):
183  field_value = _convert_from_ros_array(field_type, field_value)
184  else:
185  field_value = convert_ros_message_to_dictionary(field_value)
186 
187  return field_value
188 
189 
190 def is_ros_binary_type(field_type, field_value):
191  """ Checks if the field is a binary array one, fixed size or not
192 
193  is_ros_binary_type("uint8", 42)
194  >>> False
195  is_ros_binary_type("uint8[]", [42, 18])
196  >>> True
197  is_ros_binary_type("uint8[3]", [42, 18, 21]
198  >>> True
199  is_ros_binary_type("char", 42)
200  >>> False
201  is_ros_binary_type("char[]", [42, 18])
202  >>> True
203  is_ros_binary_type("char[3]", [42, 18, 21]
204  >>> True
205  """
206  return re.search(ros_binary_types_regexp, field_type) is not None
207 
208 def _convert_from_ros_binary(field_type, field_value):
209  field_value = base64.standard_b64encode(field_value)
210  return field_value
211 
212 def _convert_from_ros_time(field_type, field_value):
213  field_value = {
214  'secs' : field_value.secs,
215  'nsecs' : field_value.nsecs
216  }
217  return field_value
218 
219 def _convert_from_ros_array(field_type, field_value):
220  list_type = list_brackets.sub('', field_type)
221  return [_convert_from_ros_type(list_type, value) for value in field_value]
222 
223 def _get_message_fields(message):
224  return zip(message.__slots__, message._slot_types)
225 
226 def _is_field_type_an_array(field_type):
227  return list_brackets.search(field_type) is not None
228 
230  list_type = list_brackets.sub('', field_type)
231  return list_type in ros_primitive_types
def _convert_to_ros_array(field_type, list_value)
def _convert_from_ros_array(field_type, field_value)
def _convert_to_ros_type(field_type, field_value)
def _convert_from_ros_type(field_type, field_value)
def _convert_from_ros_time(field_type, field_value)
def _convert_to_ros_time(field_type, field_value)
def convert_dictionary_to_ros_message(message_type, dictionary, kind='message', strict_mode=True)
def _convert_from_ros_binary(field_type, field_value)
def _convert_to_ros_binary(field_type, field_value)
def _convert_to_ros_primitive(field_type, field_value)
def is_ros_binary_type(field_type, field_value)


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Thu Mar 5 2020 03:12:49