message_converter.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2013, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import roslib.message
00035 import rospy
00036 import re
00037 import base64
00038 import sys
00039 python3 = True if sys.hexversion > 0x03000000 else False
00040 python_to_ros_type_map = {
00041     'bool'    : ['bool'],
00042     'int'     : ['int8', 'byte', 'uint8', 'char',
00043                  'int16', 'uint16', 'int32', 'uint32',
00044                  'int64', 'uint64', 'float32', 'float64'],
00045     'float'   : ['float32', 'float64'],
00046     'str'     : ['string'],
00047     'unicode' : ['string'],
00048     'long'    : ['uint64']
00049 }
00050 if python3:
00051     python_string_types = [str]
00052 else:
00053     python_string_types = [str, unicode]
00054 python_list_types = [list, tuple]
00055 
00056 ros_time_types = ['time', 'duration']
00057 ros_primitive_types = ['bool', 'byte', 'char', 'int8', 'uint8', 'int16',
00058                        'uint16', 'int32', 'uint32', 'int64', 'uint64',
00059                        'float32', 'float64', 'string']
00060 ros_header_types = ['Header', 'std_msgs/Header', 'roslib/Header']
00061 ros_binary_types_regexp = re.compile(r'(uint8|char)\[[^\]]*\]')
00062 
00063 list_brackets = re.compile(r'\[[^\]]*\]')
00064 
00065 def convert_dictionary_to_ros_message(message_type, dictionary, kind='message'):
00066     """
00067     Takes in the message type and a Python dictionary and returns a ROS message.
00068 
00069     Example:
00070         message_type = "std_msgs/String"
00071         dict_message = { "data": "Hello, Robot" }
00072         ros_message = convert_dictionary_to_ros_message(message_type, dict_message)
00073 
00074         message_type = "std_srvs/SetBool"
00075         dict_message = { "data": True }
00076         kind = "request"
00077         ros_message = convert_dictionary_to_ros_message(message_type, dict_message, kind)
00078     """
00079     if kind == 'message':
00080         message_class = roslib.message.get_message_class(message_type)
00081         message = message_class()
00082     elif kind == 'request':
00083         service_class = roslib.message.get_service_class(message_type)
00084         message = service_class._request_class()
00085     elif kind == 'response':
00086         service_class = roslib.message.get_service_class(message_type)
00087         message = service_class._response_class()
00088     else:
00089         raise ValueError('Unknown kind "%s".' % kind)
00090     message_fields = dict(_get_message_fields(message))
00091 
00092     for field_name, field_value in dictionary.items():
00093         if field_name in message_fields:
00094             field_type = message_fields[field_name]
00095             field_value = _convert_to_ros_type(field_type, field_value)
00096             setattr(message, field_name, field_value)
00097         else:
00098             error_message = 'ROS message type "{0}" has no field named "{1}"'\
00099                 .format(message_type, field_name)
00100             raise ValueError(error_message)
00101 
00102     return message
00103 
00104 def _convert_to_ros_type(field_type, field_value):
00105     if is_ros_binary_type(field_type, field_value):
00106         field_value = _convert_to_ros_binary(field_type, field_value)
00107     elif field_type in ros_time_types:
00108         field_value = _convert_to_ros_time(field_type, field_value)
00109     elif field_type in ros_primitive_types:
00110         field_value = _convert_to_ros_primitive(field_type, field_value)
00111     elif _is_field_type_an_array(field_type):
00112         field_value = _convert_to_ros_array(field_type, field_value)
00113     else:
00114         field_value = convert_dictionary_to_ros_message(field_type, field_value)
00115 
00116     return field_value
00117 
00118 def _convert_to_ros_binary(field_type, field_value):
00119     if type(field_value) in python_string_types:
00120         binary_value_as_string = base64.standard_b64decode(field_value)
00121     else:
00122         binary_value_as_string = str(bytearray(field_value))
00123 
00124     return binary_value_as_string
00125 
00126 def _convert_to_ros_time(field_type, field_value):
00127     time = None
00128 
00129     if field_type == 'time' and field_value == 'now':
00130         time = rospy.get_rostime()
00131     else:
00132         if field_type == 'time':
00133             time = rospy.rostime.Time()
00134         elif field_type == 'duration':
00135             time = rospy.rostime.Duration()
00136         if 'secs' in field_value:
00137             setattr(time, 'secs', field_value['secs'])
00138         if 'nsecs' in field_value:
00139             setattr(time, 'nsecs', field_value['nsecs'])
00140 
00141     return time
00142 
00143 def _convert_to_ros_primitive(field_type, field_value):
00144     if field_type == "string":
00145         field_value = field_value.encode('utf-8')
00146     return field_value
00147 
00148 def _convert_to_ros_array(field_type, list_value):
00149     list_type = list_brackets.sub('', field_type)
00150     return [_convert_to_ros_type(list_type, value) for value in list_value]
00151 
00152 def convert_ros_message_to_dictionary(message):
00153     """
00154     Takes in a ROS message and returns a Python dictionary.
00155 
00156     Example:
00157         ros_message = std_msgs.msg.String(data="Hello, Robot")
00158         dict_message = convert_ros_message_to_dictionary(ros_message)
00159     """
00160     dictionary = {}
00161     message_fields = _get_message_fields(message)
00162     for field_name, field_type in message_fields:
00163         field_value = getattr(message, field_name)
00164         dictionary[field_name] = _convert_from_ros_type(field_type, field_value)
00165 
00166     return dictionary
00167 
00168 def _convert_from_ros_type(field_type, field_value):
00169     if is_ros_binary_type(field_type, field_value):
00170         field_value = _convert_from_ros_binary(field_type, field_value)
00171     elif field_type in ros_time_types:
00172         field_value = _convert_from_ros_time(field_type, field_value)
00173     elif field_type in ros_primitive_types:
00174         field_value = field_value
00175     elif _is_field_type_an_array(field_type):
00176         field_value = _convert_from_ros_array(field_type, field_value)
00177     else:
00178         field_value = convert_ros_message_to_dictionary(field_value)
00179 
00180     return field_value
00181 
00182 
00183 def is_ros_binary_type(field_type, field_value):
00184     """ Checks if the field is a binary array one, fixed size or not
00185 
00186     is_ros_binary_type("uint8", 42)
00187     >>> False
00188     is_ros_binary_type("uint8[]", [42, 18])
00189     >>> True
00190     is_ros_binary_type("uint8[3]", [42, 18, 21]
00191     >>> True
00192     is_ros_binary_type("char", 42)
00193     >>> False
00194     is_ros_binary_type("char[]", [42, 18])
00195     >>> True
00196     is_ros_binary_type("char[3]", [42, 18, 21]
00197     >>> True
00198     """
00199     return re.search(ros_binary_types_regexp, field_type) is not None
00200 
00201 def _convert_from_ros_binary(field_type, field_value):
00202     field_value = base64.standard_b64encode(field_value)
00203     return field_value
00204 
00205 def _convert_from_ros_time(field_type, field_value):
00206     field_value = {
00207         'secs'  : field_value.secs,
00208         'nsecs' : field_value.nsecs
00209     }
00210     return field_value
00211 
00212 def _convert_from_ros_primitive(field_type, field_value):
00213     return field_value
00214 
00215 def _convert_from_ros_array(field_type, field_value):
00216     list_type = list_brackets.sub('', field_type)
00217     return [_convert_from_ros_type(list_type, value) for value in field_value]
00218 
00219 def _get_message_fields(message):
00220     return zip(message.__slots__, message._slot_types)
00221 
00222 def _is_field_type_an_array(field_type):
00223     return list_brackets.search(field_type) is not None


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Tue May 7 2019 03:25:56