Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import rospy
00034 import time
00035 import bson
00036 from rosbridge_library.internal.exceptions import InvalidArgumentException
00037 from rosbridge_library.internal.exceptions import MissingArgumentException
00038
00039
00040 from rosbridge_library.capabilities.fragmentation import Fragmentation
00041 from rosbridge_library.util import json
00042
00043
00044 def is_number(s):
00045 try:
00046 float(s)
00047 return True
00048 except ValueError:
00049 return False
00050
00051 def has_binary(d):
00052 if type(d)==bson.Binary:
00053 return True
00054 if type(d)==dict:
00055 for k,v in d.iteritems():
00056 if has_binary(v):
00057 return True
00058 return False
00059
00060 class Protocol:
00061 """ The interface for a single client to interact with ROS.
00062
00063 See rosbridge_protocol for the default protocol used by rosbridge
00064
00065 The lifecycle for a Protocol instance is as follows:
00066 - Pass incoming messages from the client to incoming
00067 - Propagate outgoing messages to the client by overriding outgoing
00068 - Call finish to clean up resources when the client is finished
00069
00070 """
00071
00072
00073
00074 fragment_size = None
00075 png = None
00076
00077 buffer = ""
00078 old_buffer = ""
00079 busy = False
00080
00081
00082
00083 delay_between_messages = 0
00084
00085 external_service_list = {}
00086
00087 bson_only_mode = False
00088
00089 parameters = None
00090
00091 def __init__(self, client_id):
00092 """ Keyword arguments:
00093 client_id -- a unique ID for this client to take. Uniqueness is
00094 important otherwise there will be conflicts between multiple clients
00095 with shared resources
00096
00097 """
00098 self.client_id = client_id
00099 self.capabilities = []
00100 self.operations = {}
00101
00102 if self.parameters:
00103 self.fragment_size = self.parameters["max_message_size"]
00104 self.delay_between_messages = self.parameters["delay_between_messages"]
00105 self.bson_only_mode = self.parameters.get('bson_only_mode', False)
00106
00107
00108
00109 def incoming(self, message_string=""):
00110 """ Process an incoming message from the client
00111
00112 Keyword arguments:
00113 message_string -- the wire-level message sent by the client
00114
00115 """
00116 self.buffer = self.buffer + message_string
00117 msg = None
00118
00119
00120
00121 try:
00122 msg = self.deserialize(self.buffer)
00123 self.buffer = ""
00124
00125
00126
00127 except Exception, e:
00128 if self.bson_only_mode:
00129
00130
00131
00132
00133 self.log("error", "Exception in deserialization of BSON")
00134
00135 else:
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148 opening_brackets = [i for i, letter in enumerate(self.buffer) if letter == '{']
00149 closing_brackets = [i for i, letter in enumerate(self.buffer) if letter == '}']
00150
00151 for start in opening_brackets:
00152 for end in closing_brackets:
00153 try:
00154 msg = self.deserialize(self.buffer[start:end+1])
00155 if msg.get("op",None) != None:
00156
00157 self.buffer = self.buffer[end+1:len(self.buffer)]
00158
00159 break
00160 except Exception,e:
00161
00162
00163 pass
00164
00165 if msg != None:
00166 break
00167
00168
00169 if msg is None:
00170 return
00171
00172
00173 mid = None
00174 if "id" in msg:
00175 mid = msg["id"]
00176 if "op" not in msg:
00177 if "receiver" in msg:
00178 self.log("error", "Received a rosbridge v1.0 message. Please refer to rosbridge.org for the correct format of rosbridge v2.0 messages. Original message was: %s" % message_string)
00179 else:
00180 self.log("error", "Received a message without an op. All messages require 'op' field with value one of: %s. Original message was: %s" % (self.operations.keys(), message_string), mid)
00181 return
00182 op = msg["op"]
00183 if op not in self.operations:
00184 self.log("error", "Unknown operation: %s. Allowed operations: %s" % (op, self.operations.keys()), mid)
00185 return
00186
00187
00188 if "fragment_size" in msg.keys():
00189 self.fragment_size = msg["fragment_size"]
00190
00191 if "message_intervall" in msg.keys() and is_number(msg["message_intervall"]):
00192 self.delay_between_messages = msg["message_intervall"]
00193 if "png" in msg.keys():
00194 self.png = msg["msg"]
00195
00196
00197 try:
00198 self.operations[op](msg)
00199 except Exception as exc:
00200 self.log("error", "%s: %s" % (op, str(exc)), mid)
00201
00202
00203
00204 if len(self.buffer) > 0:
00205
00206 if self.old_buffer != self.buffer:
00207 self.old_buffer = self.buffer
00208 self.incoming()
00209
00210
00211
00212 def outgoing(self, message):
00213 """ Pass an outgoing message to the client. This method should be
00214 overridden.
00215
00216 Keyword arguments:
00217 message -- the wire-level message to send to the client
00218
00219 """
00220 pass
00221
00222 def send(self, message, cid=None):
00223 """ Called internally in preparation for sending messages to the client
00224
00225 This method pre-processes the message then passes it to the overridden
00226 outgoing method.
00227
00228 Keyword arguments:
00229 message -- a dict of message values to be marshalled and sent
00230 cid -- (optional) an associated id
00231
00232 """
00233 serialized = self.serialize(message, cid)
00234 if serialized is not None:
00235 if self.png == "png":
00236
00237
00238 pass
00239
00240 fragment_list = None
00241 if self.fragment_size != None and len(serialized) > self.fragment_size:
00242 mid = message.get("id", None)
00243
00244
00245
00246 fragment_list = Fragmentation(self).fragment(message, self.fragment_size, mid )
00247
00248
00249 if fragment_list != None:
00250 for fragment in fragment_list:
00251 if self.bson_only_mode:
00252 self.outgoing(bson.BSON.encode(fragment))
00253 else:
00254 self.outgoing(json.dumps(fragment))
00255
00256
00257 time.sleep(self.delay_between_messages)
00258
00259 else:
00260 self.outgoing(serialized)
00261 time.sleep(self.delay_between_messages)
00262
00263 def finish(self):
00264 """ Indicate that the client is finished and clean up resources.
00265
00266 All clients should call this method after disconnecting.
00267
00268 """
00269 for capability in self.capabilities:
00270 capability.finish()
00271
00272 def serialize(self, msg, cid=None):
00273 """ Turns a dictionary of values into the appropriate wire-level
00274 representation.
00275
00276 Default behaviour uses JSON. Override to use a different container.
00277
00278 Keyword arguments:
00279 msg -- the dictionary of values to serialize
00280 cid -- (optional) an ID associated with this. Will be logged on err.
00281
00282 Returns a JSON string representing the dictionary
00283 """
00284 try:
00285 if has_binary(msg) or self.bson_only_mode:
00286 return bson.BSON.encode(msg)
00287 else:
00288 return json.dumps(msg)
00289 except:
00290 if cid is not None:
00291
00292 self.log("error", "Unable to serialize %s message to client"
00293 % msg["op"], cid)
00294 return None
00295
00296 def deserialize(self, msg, cid=None):
00297
00298 """ Turns the wire-level representation into a dictionary of values
00299
00300 Default behaviour assumes JSON. Override to use a different container.
00301
00302 Keyword arguments:
00303 msg -- the wire-level message to deserialize
00304 cid -- (optional) an ID associated with this. Is logged on error
00305
00306 Returns a dictionary of values
00307
00308 """
00309 try:
00310 if self.bson_only_mode:
00311 bson_message = bson.BSON(msg)
00312 return bson_message.decode()
00313 else:
00314 return json.loads(msg)
00315 except Exception, e:
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329 raise
00330
00331
00332 def register_operation(self, opcode, handler):
00333 """ Register a handler for an opcode
00334
00335 Keyword arguments:
00336 opcode -- the opcode to register this handler for
00337 handler -- a callback function to call for messages with this opcode
00338
00339 """
00340 self.operations[opcode] = handler
00341
00342 def unregister_operation(self, opcode):
00343 """ Unregister a handler for an opcode
00344
00345 Keyword arguments:
00346 opcode -- the opcode to unregister the handler for
00347
00348 """
00349 if opcode in self.operations:
00350 del self.operations[opcode]
00351
00352 def add_capability(self, capability_class):
00353 """ Add a capability to the protocol.
00354
00355 This method is for convenience; assumes the default capability
00356 constructor
00357
00358 Keyword arguments:
00359 capability_class -- the class of the capability to add
00360
00361 """
00362 self.capabilities.append(capability_class(self))
00363
00364 def log(self, level, message, lid=None):
00365 """ Log a message to the client. By default just sends to stdout
00366
00367 Keyword arguments:
00368 level -- the logger level of this message
00369 message -- the string message to send to the user
00370 lid -- an associated for this log message
00371
00372 """
00373 stdout_formatted_msg = None
00374 if lid is not None:
00375 stdout_formatted_msg = "[Client %s] [id: %s] %s" % (self.client_id, lid, message)
00376 else:
00377 stdout_formatted_msg = "[Client %s] %s" % (self.client_id, message)
00378
00379 if level == "error" or level == "err":
00380 rospy.logerr(stdout_formatted_msg)
00381 elif level == "warning" or level == "warn":
00382 rospy.logwarn(stdout_formatted_msg)
00383 elif level == "info" or level == "information":
00384 rospy.loginfo(stdout_formatted_msg)
00385 else:
00386 rospy.logdebug(stdout_formatted_msg)