Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import rospy
00034 import time
00035 from rosbridge_library.internal.exceptions import InvalidArgumentException
00036 from rosbridge_library.internal.exceptions import MissingArgumentException
00037
00038
00039 from rosbridge_library.capabilities.fragmentation import Fragmentation
00040 from rosbridge_library.util import json, bson
00041
00042
00043 def is_number(s):
00044 try:
00045 float(s)
00046 return True
00047 except ValueError:
00048 return False
00049
00050 def has_binary(d):
00051 if type(d)==bson.Binary:
00052 return True
00053 if type(d)==dict:
00054 for k,v in d.items():
00055 if has_binary(v):
00056 return True
00057 return False
00058
00059 class Protocol:
00060 """ The interface for a single client to interact with ROS.
00061
00062 See rosbridge_protocol for the default protocol used by rosbridge
00063
00064 The lifecycle for a Protocol instance is as follows:
00065 - Pass incoming messages from the client to incoming
00066 - Propagate outgoing messages to the client by overriding outgoing
00067 - Call finish to clean up resources when the client is finished
00068
00069 """
00070
00071
00072
00073 fragment_size = None
00074 png = None
00075
00076 buffer = ""
00077 old_buffer = ""
00078 busy = False
00079
00080
00081
00082 delay_between_messages = 0
00083
00084 external_service_list = {}
00085
00086 bson_only_mode = False
00087
00088 parameters = None
00089
00090 def __init__(self, client_id):
00091 """ Keyword arguments:
00092 client_id -- a unique ID for this client to take. Uniqueness is
00093 important otherwise there will be conflicts between multiple clients
00094 with shared resources
00095
00096 """
00097 self.client_id = client_id
00098 self.capabilities = []
00099 self.operations = {}
00100
00101 if self.parameters:
00102 self.fragment_size = self.parameters["max_message_size"]
00103 self.delay_between_messages = self.parameters["delay_between_messages"]
00104 self.bson_only_mode = self.parameters.get('bson_only_mode', False)
00105
00106
00107
00108 def incoming(self, message_string=""):
00109 """ Process an incoming message from the client
00110
00111 Keyword arguments:
00112 message_string -- the wire-level message sent by the client
00113
00114 """
00115 self.buffer = self.buffer + message_string
00116 msg = None
00117
00118
00119
00120 try:
00121 msg = self.deserialize(self.buffer)
00122 self.buffer = ""
00123
00124
00125
00126 except Exception as e:
00127 if self.bson_only_mode:
00128
00129
00130
00131
00132 self.log("error", "Exception in deserialization of BSON")
00133
00134 else:
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147 opening_brackets = [i for i, letter in enumerate(self.buffer) if letter == '{']
00148 closing_brackets = [i for i, letter in enumerate(self.buffer) if letter == '}']
00149
00150 for start in opening_brackets:
00151 for end in closing_brackets:
00152 try:
00153 msg = self.deserialize(self.buffer[start:end+1])
00154 if msg.get("op",None) != None:
00155
00156 self.buffer = self.buffer[end+1:len(self.buffer)]
00157
00158 break
00159 except Exception as e:
00160
00161
00162 pass
00163
00164 if msg != None:
00165 break
00166
00167
00168 if msg is None:
00169 return
00170
00171
00172 mid = None
00173 if "id" in msg:
00174 mid = msg["id"]
00175 if "op" not in msg:
00176 if "receiver" in msg:
00177 self.log("error", "Received a rosbridge v1.0 message. Please refer to rosbridge.org for the correct format of rosbridge v2.0 messages. Original message was: %s" % message_string)
00178 else:
00179 self.log("error", "Received a message without an op. All messages require 'op' field with value one of: %s. Original message was: %s" % (list(self.operations.keys()), message_string), mid)
00180 return
00181 op = msg["op"]
00182 if op not in self.operations:
00183 self.log("error", "Unknown operation: %s. Allowed operations: %s" % (op, list(self.operations.keys())), mid)
00184 return
00185
00186
00187 if "fragment_size" in msg.keys():
00188 self.fragment_size = msg["fragment_size"]
00189
00190 if "message_intervall" in msg.keys() and is_number(msg["message_intervall"]):
00191 self.delay_between_messages = msg["message_intervall"]
00192 if "png" in msg.keys():
00193 self.png = msg["msg"]
00194
00195
00196 try:
00197 self.operations[op](msg)
00198 except Exception as exc:
00199 self.log("error", "%s: %s" % (op, str(exc)), mid)
00200
00201
00202
00203 if len(self.buffer) > 0:
00204
00205 if self.old_buffer != self.buffer:
00206 self.old_buffer = self.buffer
00207 self.incoming()
00208
00209
00210
00211 def outgoing(self, message):
00212 """ Pass an outgoing message to the client. This method should be
00213 overridden.
00214
00215 Keyword arguments:
00216 message -- the wire-level message to send to the client
00217
00218 """
00219 pass
00220
00221 def send(self, message, cid=None):
00222 """ Called internally in preparation for sending messages to the client
00223
00224 This method pre-processes the message then passes it to the overridden
00225 outgoing method.
00226
00227 Keyword arguments:
00228 message -- a dict of message values to be marshalled and sent
00229 cid -- (optional) an associated id
00230
00231 """
00232 serialized = self.serialize(message, cid)
00233 if serialized is not None:
00234 if self.png == "png":
00235
00236
00237 pass
00238
00239 fragment_list = None
00240 if self.fragment_size != None and len(serialized) > self.fragment_size:
00241 mid = message.get("id", None)
00242
00243
00244
00245 fragment_list = Fragmentation(self).fragment(message, self.fragment_size, mid )
00246
00247
00248 if fragment_list != None:
00249 for fragment in fragment_list:
00250 if self.bson_only_mode:
00251 self.outgoing(bson.BSON.encode(fragment))
00252 else:
00253 self.outgoing(json.dumps(fragment))
00254
00255
00256 time.sleep(self.delay_between_messages)
00257
00258 else:
00259 self.outgoing(serialized)
00260 time.sleep(self.delay_between_messages)
00261
00262 def finish(self):
00263 """ Indicate that the client is finished and clean up resources.
00264
00265 All clients should call this method after disconnecting.
00266
00267 """
00268 for capability in self.capabilities:
00269 capability.finish()
00270
00271 def serialize(self, msg, cid=None):
00272 """ Turns a dictionary of values into the appropriate wire-level
00273 representation.
00274
00275 Default behaviour uses JSON. Override to use a different container.
00276
00277 Keyword arguments:
00278 msg -- the dictionary of values to serialize
00279 cid -- (optional) an ID associated with this. Will be logged on err.
00280
00281 Returns a JSON string representing the dictionary
00282 """
00283 try:
00284 if has_binary(msg) or self.bson_only_mode:
00285 return bson.BSON.encode(msg)
00286 else:
00287 return json.dumps(msg)
00288 except:
00289 if cid is not None:
00290
00291 self.log("error", "Unable to serialize %s message to client"
00292 % msg["op"], cid)
00293 return None
00294
00295 def deserialize(self, msg, cid=None):
00296
00297 """ Turns the wire-level representation into a dictionary of values
00298
00299 Default behaviour assumes JSON. Override to use a different container.
00300
00301 Keyword arguments:
00302 msg -- the wire-level message to deserialize
00303 cid -- (optional) an ID associated with this. Is logged on error
00304
00305 Returns a dictionary of values
00306
00307 """
00308 try:
00309 if self.bson_only_mode:
00310 bson_message = bson.BSON(msg)
00311 return bson_message.decode()
00312 else:
00313 return json.loads(msg)
00314 except Exception as e:
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328 raise
00329
00330
00331 def register_operation(self, opcode, handler):
00332 """ Register a handler for an opcode
00333
00334 Keyword arguments:
00335 opcode -- the opcode to register this handler for
00336 handler -- a callback function to call for messages with this opcode
00337
00338 """
00339 self.operations[opcode] = handler
00340
00341 def unregister_operation(self, opcode):
00342 """ Unregister a handler for an opcode
00343
00344 Keyword arguments:
00345 opcode -- the opcode to unregister the handler for
00346
00347 """
00348 if opcode in self.operations:
00349 del self.operations[opcode]
00350
00351 def add_capability(self, capability_class):
00352 """ Add a capability to the protocol.
00353
00354 This method is for convenience; assumes the default capability
00355 constructor
00356
00357 Keyword arguments:
00358 capability_class -- the class of the capability to add
00359
00360 """
00361 self.capabilities.append(capability_class(self))
00362
00363 def log(self, level, message, lid=None):
00364 """ Log a message to the client. By default just sends to stdout
00365
00366 Keyword arguments:
00367 level -- the logger level of this message
00368 message -- the string message to send to the user
00369 lid -- an associated for this log message
00370
00371 """
00372 stdout_formatted_msg = None
00373 if lid is not None:
00374 stdout_formatted_msg = "[Client %s] [id: %s] %s" % (self.client_id, lid, message)
00375 else:
00376 stdout_formatted_msg = "[Client %s] %s" % (self.client_id, message)
00377
00378 if level == "error" or level == "err":
00379 rospy.logerr(stdout_formatted_msg)
00380 elif level == "warning" or level == "warn":
00381 rospy.logwarn(stdout_formatted_msg)
00382 elif level == "info" or level == "information":
00383 rospy.loginfo(stdout_formatted_msg)
00384 else:
00385 rospy.logdebug(stdout_formatted_msg)