protocol.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 import rospy
00034 import time
00035 import bson
00036 from rosbridge_library.internal.exceptions import InvalidArgumentException
00037 from rosbridge_library.internal.exceptions import MissingArgumentException
00038 
00039 #from rosbridge_library.internal.pngcompression import encode
00040 from rosbridge_library.capabilities.fragmentation import Fragmentation
00041 from rosbridge_library.util import json
00042 
00043 
00044 def is_number(s):
00045     try:
00046         float(s)
00047         return True
00048     except ValueError:
00049         return False
00050         
00051 def has_binary(d):
00052     if type(d)==bson.Binary:
00053         return True
00054     if type(d)==dict:
00055         for k,v in d.iteritems():
00056             if has_binary(v):
00057                 return True
00058     return False                
00059 
00060 class Protocol:
00061     """ The interface for a single client to interact with ROS.
00062 
00063     See rosbridge_protocol for the default protocol used by rosbridge
00064 
00065     The lifecycle for a Protocol instance is as follows:
00066     - Pass incoming messages from the client to incoming
00067     - Propagate outgoing messages to the client by overriding outgoing
00068     - Call finish to clean up resources when the client is finished
00069 
00070     """
00071 
00072     # fragment_size can be set per client (each client has its own instance of protocol)
00073     # ..same for other parameters
00074     fragment_size = None
00075     png = None
00076     # buffer used to gather partial JSON-objects (could be caused by small tcp-buffers or similar..)
00077     buffer = ""
00078     old_buffer = ""
00079     busy = False
00080     # if this is too low, ("simple")clients network stacks will get flooded (when sending fragments of a huge message..)
00081     # .. depends on message_size/bandwidth/performance/client_limits/...
00082     # !! this might be related to (or even be avoided by using) throttle_rate !!
00083     delay_between_messages = 0
00084     # global list of non-ros advertised services
00085     external_service_list = {}
00086     # Use only BSON for the whole communication if the server has been started with bson_only_mode:=True
00087     bson_only_mode = False
00088 
00089     parameters = None
00090 
00091     def __init__(self, client_id):
00092         """ Keyword arguments:
00093         client_id -- a unique ID for this client to take.  Uniqueness is
00094         important otherwise there will be conflicts between multiple clients
00095         with shared resources
00096 
00097         """
00098         self.client_id = client_id
00099         self.capabilities = []
00100         self.operations = {}
00101 
00102         if self.parameters:
00103             self.fragment_size = self.parameters["max_message_size"]
00104             self.delay_between_messages = self.parameters["delay_between_messages"]
00105             self.bson_only_mode = self.parameters.get('bson_only_mode', False)
00106 
00107     # added default message_string="" to allow recalling incoming until buffer is empty without giving a parameter
00108     # --> allows to get rid of (..or minimize) delay between client-side sends
00109     def incoming(self, message_string=""):
00110         """ Process an incoming message from the client
00111 
00112         Keyword arguments:
00113         message_string -- the wire-level message sent by the client
00114 
00115         """
00116         self.buffer = self.buffer + message_string
00117         msg = None
00118 
00119         # take care of having multiple JSON-objects in receiving buffer
00120         # ..first, try to load the whole buffer as a JSON-object
00121         try:
00122             msg = self.deserialize(self.buffer)
00123             self.buffer = ""
00124 
00125         # if loading whole object fails try to load part of it (from first opening bracket "{" to next closing bracket "}"
00126         # .. this causes Exceptions on "inner" closing brackets --> so I suppressed logging of deserialization errors
00127         except Exception, e:
00128             if self.bson_only_mode:
00129                 # Since BSON should be used in conjunction with a network handler
00130                 # that receives exactly one full BSON message.
00131                 # This will then be passed to self.deserialize and shouldn't cause any
00132                 # exceptions because of fragmented messages (broken or invalid messages might still be sent tough)
00133                 self.log("error", "Exception in deserialization of BSON")
00134 
00135             else:
00136                 # TODO: handling of partial/multiple/broken json data in incoming buffer
00137                 # this way is problematic when json contains nested json-objects ( e.g. { ... { "config": [0,1,2,3] } ...  } )
00138                 # .. if outer json is not fully received, stepping through opening brackets will find { "config" : ... } as a valid json object
00139                 # .. and pass this "inner" object to rosbridge and throw away the leading part of the "outer" object..
00140                 # solution for now:
00141                 # .. check for "op"-field. i can still imagine cases where a nested message ( e.g. complete service_response fits into the data field of a fragment..)
00142                 # .. would cause trouble, but if a response fits as a whole into a fragment, simply do not pack it into a fragment.
00143                 #
00144                 # --> from that follows current limitiation:
00145                 #     fragment data must NOT (!) contain a complete json-object that has an "op-field"
00146                 #
00147                 # an alternative solution would be to only check from first opening bracket and have a time out on data in input buffer.. (to handle broken data)
00148                 opening_brackets = [i for i, letter in enumerate(self.buffer) if letter == '{']
00149                 closing_brackets = [i for i, letter in enumerate(self.buffer) if letter == '}']
00150 
00151                 for start in opening_brackets:
00152                     for end in closing_brackets:
00153                         try:
00154                             msg = self.deserialize(self.buffer[start:end+1])
00155                             if msg.get("op",None) != None:
00156                                 # TODO: check if throwing away leading data like this is okay.. loops look okay..
00157                                 self.buffer = self.buffer[end+1:len(self.buffer)]
00158                                 # jump out of inner loop if json-decode succeeded
00159                                 break
00160                         except Exception,e:
00161                             # debug json-decode errors with this line
00162                             #print e
00163                             pass
00164                     # if load was successfull --> break outer loop, too.. -> no need to check if json begins at a "later" opening bracket..
00165                     if msg != None:
00166                         break
00167 
00168         # if decoding of buffer failed .. simply return
00169         if msg is None:
00170             return
00171 
00172         # process fields JSON-message object that "control" rosbridge
00173         mid = None
00174         if "id" in msg:
00175             mid = msg["id"]
00176         if "op" not in msg:
00177             if "receiver" in msg:
00178                 self.log("error", "Received a rosbridge v1.0 message.  Please refer to rosbridge.org for the correct format of rosbridge v2.0 messages.  Original message was: %s" % message_string)
00179             else:
00180                 self.log("error", "Received a message without an op.  All messages require 'op' field with value one of: %s.  Original message was: %s" % (self.operations.keys(), message_string), mid)
00181             return
00182         op = msg["op"]
00183         if op not in self.operations:
00184             self.log("error", "Unknown operation: %s.  Allowed operations: %s" % (op, self.operations.keys()), mid)
00185             return
00186         # this way a client can change/overwrite it's active values anytime by just including parameter field in any message sent to rosbridge
00187         #  maybe need to be improved to bind parameter values to specific operation..
00188         if "fragment_size" in msg.keys():
00189             self.fragment_size = msg["fragment_size"]
00190             #print "fragment size set to:", self.fragment_size
00191         if "message_intervall" in msg.keys() and is_number(msg["message_intervall"]):
00192             self.delay_between_messages = msg["message_intervall"]
00193         if "png" in msg.keys():
00194             self.png = msg["msg"]
00195 
00196         # now try to pass message to according operation
00197         try:
00198             self.operations[op](msg)
00199         except Exception as exc:
00200             self.log("error", "%s: %s" % (op, str(exc)), mid)
00201 
00202         # if anything left in buffer .. re-call self.incoming
00203         # TODO: check what happens if we have "garbage" on tcp-stack --> infinite loop might be triggered! .. might get out of it when next valid JSON arrives since only data after last 'valid' closing bracket is kept
00204         if len(self.buffer) > 0:
00205             # try to avoid infinite loop..
00206             if self.old_buffer != self.buffer:
00207                 self.old_buffer = self.buffer
00208                 self.incoming()
00209 
00210 
00211 
00212     def outgoing(self, message):
00213         """ Pass an outgoing message to the client.  This method should be
00214         overridden.
00215 
00216         Keyword arguments:
00217         message -- the wire-level message to send to the client
00218 
00219         """
00220         pass
00221 
00222     def send(self, message, cid=None):
00223         """ Called internally in preparation for sending messages to the client
00224 
00225         This method pre-processes the message then passes it to the overridden
00226         outgoing method.
00227 
00228         Keyword arguments:
00229         message -- a dict of message values to be marshalled and sent
00230         cid     -- (optional) an associated id
00231 
00232         """
00233         serialized = self.serialize(message, cid)
00234         if serialized is not None:
00235             if self.png == "png":
00236                 # TODO: png compression on outgoing messages
00237                 # encode message
00238                 pass
00239 
00240             fragment_list = None
00241             if self.fragment_size != None and len(serialized) > self.fragment_size:
00242                 mid = message.get("id", None)
00243 
00244                 # TODO: think about splitting into fragments that have specified size including header-fields!
00245                 # --> estimate header size --> split content into fragments that have the requested overall size, rather than requested content size
00246                 fragment_list = Fragmentation(self).fragment(message, self.fragment_size, mid )
00247 
00248             # fragment list not empty -> send fragments
00249             if fragment_list != None:
00250                 for fragment in fragment_list:
00251                     if self.bson_only_mode:
00252                         self.outgoing(bson.BSON.encode(fragment))
00253                     else:
00254                         self.outgoing(json.dumps(fragment))
00255                     # okay to use delay here (sender's send()-function) because rosbridge is sending next request only to service provider when last one had finished)
00256                     #  --> if this was not the case this delay needed to be implemented in service-provider's (meaning message receiver's) send_message()-function in rosbridge_tcp.py)
00257                     time.sleep(self.delay_between_messages)
00258             # else send message as it is
00259             else:
00260                 self.outgoing(serialized)
00261                 time.sleep(self.delay_between_messages)
00262 
00263     def finish(self):
00264         """ Indicate that the client is finished and clean up resources.
00265 
00266         All clients should call this method after disconnecting.
00267 
00268         """
00269         for capability in self.capabilities:
00270             capability.finish()
00271 
00272     def serialize(self, msg, cid=None):
00273         """ Turns a dictionary of values into the appropriate wire-level
00274         representation.
00275 
00276         Default behaviour uses JSON.  Override to use a different container.
00277 
00278         Keyword arguments:
00279         msg -- the dictionary of values to serialize
00280         cid -- (optional) an ID associated with this.  Will be logged on err.
00281 
00282         Returns a JSON string representing the dictionary
00283         """
00284         try:
00285             if has_binary(msg) or self.bson_only_mode:
00286                 return bson.BSON.encode(msg)
00287             else:    
00288                 return json.dumps(msg)
00289         except:
00290             if cid is not None:
00291                 # Only bother sending the log message if there's an id
00292                 self.log("error", "Unable to serialize %s message to client"
00293                          % msg["op"], cid)
00294             return None
00295 
00296     def deserialize(self, msg, cid=None):
00297 
00298         """ Turns the wire-level representation into a dictionary of values
00299 
00300         Default behaviour assumes JSON. Override to use a different container.
00301 
00302         Keyword arguments:
00303         msg -- the wire-level message to deserialize
00304         cid -- (optional) an ID associated with this.  Is logged on error
00305 
00306         Returns a dictionary of values
00307 
00308         """
00309         try:
00310             if self.bson_only_mode:
00311                 bson_message = bson.BSON(msg)
00312                 return bson_message.decode()
00313             else:
00314                 return json.loads(msg)
00315         except Exception, e:
00316             # if we did try to deserialize whole buffer .. first try to let self.incoming check for multiple/partial json-decodes before logging error
00317             # .. this means, if buffer is not == msg --> we tried to decode part of buffer
00318 
00319             # TODO: implement a way to have a final Exception when nothing works out to decode (multiple/broken/partial JSON..)
00320 
00321             # supressed logging of exception on json-decode to keep rosbridge-logs "clean", otherwise console logs would get spammed for every failed json-decode try
00322 #            if msg != self.buffer:
00323 #                error_msg = "Unable to deserialize message from client: %s"  % msg
00324 #                error_msg += "\nException was: " +str(e)
00325 #
00326 #                self.log("error", error_msg, cid)
00327 
00328             # re-raise Exception to allow handling outside of deserialize function instead of returning None
00329             raise
00330             #return None
00331 
00332     def register_operation(self, opcode, handler):
00333         """ Register a handler for an opcode
00334 
00335         Keyword arguments:
00336         opcode  -- the opcode to register this handler for
00337         handler -- a callback function to call for messages with this opcode
00338 
00339         """
00340         self.operations[opcode] = handler
00341 
00342     def unregister_operation(self, opcode):
00343         """ Unregister a handler for an opcode
00344 
00345         Keyword arguments:
00346         opcode -- the opcode to unregister the handler for
00347 
00348         """
00349         if opcode in self.operations:
00350             del self.operations[opcode]
00351 
00352     def add_capability(self, capability_class):
00353         """ Add a capability to the protocol.
00354 
00355         This method is for convenience; assumes the default capability
00356         constructor
00357 
00358         Keyword arguments:
00359         capability_class -- the class of the capability to add
00360 
00361         """
00362         self.capabilities.append(capability_class(self))
00363 
00364     def log(self, level, message, lid=None):
00365         """ Log a message to the client.  By default just sends to stdout
00366 
00367         Keyword arguments:
00368         level   -- the logger level of this message
00369         message -- the string message to send to the user
00370         lid     -- an associated for this log message
00371 
00372         """
00373         stdout_formatted_msg = None
00374         if lid is not None:
00375             stdout_formatted_msg = "[Client %s] [id: %s] %s" % (self.client_id, lid, message)
00376         else:
00377             stdout_formatted_msg = "[Client %s] %s" % (self.client_id, message)
00378 
00379         if level == "error" or level == "err":
00380             rospy.logerr(stdout_formatted_msg)
00381         elif level == "warning" or level == "warn":
00382             rospy.logwarn(stdout_formatted_msg)
00383         elif level == "info" or level == "information":
00384             rospy.loginfo(stdout_formatted_msg)
00385         else:
00386             rospy.logdebug(stdout_formatted_msg)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Sep 13 2017 03:18:07