message_converter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2013, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import roslib.message
35 import rospy
36 import re
37 import base64
38 import sys
39 python3 = True if sys.hexversion > 0x03000000 else False
40 python_to_ros_type_map = {
41  'bool' : ['bool'],
42  'int' : ['int8', 'byte', 'uint8', 'char',
43  'int16', 'uint16', 'int32', 'uint32',
44  'int64', 'uint64', 'float32', 'float64'],
45  'float' : ['float32', 'float64'],
46  'str' : ['string'],
47  'unicode' : ['string'],
48  'long' : ['uint64']
49 }
50 if python3:
51  python_string_types = [str]
52 else:
53  python_string_types = [str, unicode]
54 python_list_types = [list, tuple]
55 
56 ros_time_types = ['time', 'duration']
57 ros_primitive_types = ['bool', 'byte', 'char', 'int8', 'uint8', 'int16',
58  'uint16', 'int32', 'uint32', 'int64', 'uint64',
59  'float32', 'float64', 'string']
60 ros_header_types = ['Header', 'std_msgs/Header', 'roslib/Header']
61 ros_binary_types_regexp = re.compile(r'(uint8|char)\[[^\]]*\]')
62 
63 list_brackets = re.compile(r'\[[^\]]*\]')
64 
65 def convert_dictionary_to_ros_message(message_type, dictionary, kind='message'):
66  """
67  Takes in the message type and a Python dictionary and returns a ROS message.
68 
69  Example:
70  message_type = "std_msgs/String"
71  dict_message = { "data": "Hello, Robot" }
72  ros_message = convert_dictionary_to_ros_message(message_type, dict_message)
73 
74  message_type = "std_srvs/SetBool"
75  dict_message = { "data": True }
76  kind = "request"
77  ros_message = convert_dictionary_to_ros_message(message_type, dict_message, kind)
78  """
79  if kind == 'message':
80  message_class = roslib.message.get_message_class(message_type)
81  message = message_class()
82  elif kind == 'request':
83  service_class = roslib.message.get_service_class(message_type)
84  message = service_class._request_class()
85  elif kind == 'response':
86  service_class = roslib.message.get_service_class(message_type)
87  message = service_class._response_class()
88  else:
89  raise ValueError('Unknown kind "%s".' % kind)
90  message_fields = dict(_get_message_fields(message))
91 
92  for field_name, field_value in dictionary.items():
93  if field_name in message_fields:
94  field_type = message_fields[field_name]
95  field_value = _convert_to_ros_type(field_type, field_value)
96  setattr(message, field_name, field_value)
97  else:
98  error_message = 'ROS message type "{0}" has no field named "{1}"'\
99  .format(message_type, field_name)
100  raise ValueError(error_message)
101 
102  return message
103 
104 def _convert_to_ros_type(field_type, field_value):
105  if is_ros_binary_type(field_type, field_value):
106  field_value = _convert_to_ros_binary(field_type, field_value)
107  elif field_type in ros_time_types:
108  field_value = _convert_to_ros_time(field_type, field_value)
109  elif field_type in ros_primitive_types:
110  field_value = _convert_to_ros_primitive(field_type, field_value)
111  elif _is_field_type_an_array(field_type):
112  field_value = _convert_to_ros_array(field_type, field_value)
113  else:
114  field_value = convert_dictionary_to_ros_message(field_type, field_value)
115 
116  return field_value
117 
118 def _convert_to_ros_binary(field_type, field_value):
119  if type(field_value) in python_string_types:
120  binary_value_as_string = base64.standard_b64decode(field_value)
121  else:
122  binary_value_as_string = str(bytearray(field_value))
123 
124  return binary_value_as_string
125 
126 def _convert_to_ros_time(field_type, field_value):
127  time = None
128 
129  if field_type == 'time' and field_value == 'now':
130  time = rospy.get_rostime()
131  else:
132  if field_type == 'time':
133  time = rospy.rostime.Time()
134  elif field_type == 'duration':
135  time = rospy.rostime.Duration()
136  if 'secs' in field_value:
137  setattr(time, 'secs', field_value['secs'])
138  if 'nsecs' in field_value:
139  setattr(time, 'nsecs', field_value['nsecs'])
140 
141  return time
142 
143 def _convert_to_ros_primitive(field_type, field_value):
144  if field_type == "string":
145  field_value = field_value.encode('utf-8')
146  return field_value
147 
148 def _convert_to_ros_array(field_type, list_value):
149  list_type = list_brackets.sub('', field_type)
150  return [_convert_to_ros_type(list_type, value) for value in list_value]
151 
153  """
154  Takes in a ROS message and returns a Python dictionary.
155 
156  Example:
157  ros_message = std_msgs.msg.String(data="Hello, Robot")
158  dict_message = convert_ros_message_to_dictionary(ros_message)
159  """
160  dictionary = {}
161  message_fields = _get_message_fields(message)
162  for field_name, field_type in message_fields:
163  field_value = getattr(message, field_name)
164  dictionary[field_name] = _convert_from_ros_type(field_type, field_value)
165 
166  return dictionary
167 
168 def _convert_from_ros_type(field_type, field_value):
169  if is_ros_binary_type(field_type, field_value):
170  field_value = _convert_from_ros_binary(field_type, field_value)
171  elif field_type in ros_time_types:
172  field_value = _convert_from_ros_time(field_type, field_value)
173  elif field_type in ros_primitive_types:
174  field_value = field_value
175  elif _is_field_type_an_array(field_type):
176  field_value = _convert_from_ros_array(field_type, field_value)
177  else:
178  field_value = convert_ros_message_to_dictionary(field_value)
179 
180  return field_value
181 
182 
183 def is_ros_binary_type(field_type, field_value):
184  """ Checks if the field is a binary array one, fixed size or not
185 
186  is_ros_binary_type("uint8", 42)
187  >>> False
188  is_ros_binary_type("uint8[]", [42, 18])
189  >>> True
190  is_ros_binary_type("uint8[3]", [42, 18, 21]
191  >>> True
192  is_ros_binary_type("char", 42)
193  >>> False
194  is_ros_binary_type("char[]", [42, 18])
195  >>> True
196  is_ros_binary_type("char[3]", [42, 18, 21]
197  >>> True
198  """
199  return re.search(ros_binary_types_regexp, field_type) is not None
200 
201 def _convert_from_ros_binary(field_type, field_value):
202  field_value = base64.standard_b64encode(field_value)
203  return field_value
204 
205 def _convert_from_ros_time(field_type, field_value):
206  field_value = {
207  'secs' : field_value.secs,
208  'nsecs' : field_value.nsecs
209  }
210  return field_value
211 
212 def _convert_from_ros_primitive(field_type, field_value):
213  return field_value
214 
215 def _convert_from_ros_array(field_type, field_value):
216  list_type = list_brackets.sub('', field_type)
217  return [_convert_from_ros_type(list_type, value) for value in field_value]
218 
219 def _get_message_fields(message):
220  return zip(message.__slots__, message._slot_types)
221 
222 def _is_field_type_an_array(field_type):
223  return list_brackets.search(field_type) is not None
def _convert_to_ros_array(field_type, list_value)
def _convert_from_ros_array(field_type, field_value)
def _convert_to_ros_type(field_type, field_value)
def _convert_from_ros_type(field_type, field_value)
def _convert_from_ros_time(field_type, field_value)
def _convert_to_ros_time(field_type, field_value)
def convert_dictionary_to_ros_message(message_type, dictionary, kind='message')
def _convert_from_ros_binary(field_type, field_value)
def _convert_to_ros_binary(field_type, field_value)
def _convert_to_ros_primitive(field_type, field_value)
def is_ros_binary_type(field_type, field_value)
def _convert_from_ros_primitive(field_type, field_value)


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Tue May 7 2019 03:17:53