protocol.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 from __future__ import unicode_literals
34 from builtins import str
35 
36 import rospy
37 import time
38 
39 from rosbridge_library.internal.exceptions import InvalidArgumentException
40 from rosbridge_library.internal.exceptions import MissingArgumentException
41 
42 from rosbridge_library.capabilities.fragmentation import Fragmentation
43 from rosbridge_library.util import json, bson
44 
45 
46 def is_number(s):
47  try:
48  float(s)
49  return True
50  except ValueError:
51  return False
52 
53 
54 def has_binary(obj):
55  """ Returns True if obj is a binary or contains a binary attribute
56  """
57 
58  if isinstance(obj, list):
59  return any(has_binary(item) for item in obj)
60 
61  if isinstance(obj, dict):
62  return any(has_binary(obj[item]) for item in obj)
63 
64  return isinstance(obj, bson.binary.Binary)
65 
66 
67 class Protocol:
68  """ The interface for a single client to interact with ROS.
69 
70  See rosbridge_protocol for the default protocol used by rosbridge
71 
72  The lifecycle for a Protocol instance is as follows:
73  - Pass incoming messages from the client to incoming
74  - Propagate outgoing messages to the client by overriding outgoing
75  - Call finish to clean up resources when the client is finished
76 
77  """
78 
79  # fragment_size can be set per client (each client has its own instance of protocol)
80  # ..same for other parameters
81  fragment_size = None
82  png = None
83  # buffer used to gather partial JSON-objects (could be caused by small tcp-buffers or similar..)
84  buffer = ""
85  old_buffer = ""
86  busy = False
87  # if this is too low, ("simple")clients network stacks will get flooded (when sending fragments of a huge message..)
88  # .. depends on message_size/bandwidth/performance/client_limits/...
89  # !! this might be related to (or even be avoided by using) throttle_rate !!
90  delay_between_messages = 0
91  # global list of non-ros advertised services
92  external_service_list = {}
93  # Use only BSON for the whole communication if the server has been started with bson_only_mode:=True
94  bson_only_mode = False
95 
96  parameters = None
97 
98  def __init__(self, client_id):
99  """ Keyword arguments:
100  client_id -- a unique ID for this client to take. Uniqueness is
101  important otherwise there will be conflicts between multiple clients
102  with shared resources
103 
104  """
105  self.client_id = client_id
106  self.capabilities = []
107  self.operations = {}
108 
109  if self.parameters:
110  self.fragment_size = self.parameters["max_message_size"]
111  self.delay_between_messages = self.parameters["delay_between_messages"]
112  self.bson_only_mode = self.parameters.get('bson_only_mode', False)
113 
114  if self.bson_only_mode:
115  self.buffer = bytearray()
116  self.old_buffer = bytearray()
117 
118  # added default message_string="" to allow recalling incoming until buffer is empty without giving a parameter
119  # --> allows to get rid of (..or minimize) delay between client-side sends
120  def incoming(self, message_string=""):
121  """ Process an incoming message from the client
122 
123  Keyword arguments:
124  message_string -- the wire-level message sent by the client
125 
126  """
127  if self.bson_only_mode:
128  self.buffer.extend(message_string)
129  else:
130  self.buffer = self.buffer + str(message_string)
131  msg = None
132 
133  # take care of having multiple JSON-objects in receiving buffer
134  # ..first, try to load the whole buffer as a JSON-object
135  try:
136  msg = self.deserialize(self.buffer)
137  if self.bson_only_mode:
138  self.buffer = bytearray()
139  else:
140  self.buffer = ''
141 
142  # if loading whole object fails try to load part of it (from first opening bracket "{" to next closing bracket "}"
143  # .. this causes Exceptions on "inner" closing brackets --> so I suppressed logging of deserialization errors
144  except Exception as e:
145  if self.bson_only_mode:
146  # Since BSON should be used in conjunction with a network handler
147  # that receives exactly one full BSON message.
148  # This will then be passed to self.deserialize and shouldn't cause any
149  # exceptions because of fragmented messages (broken or invalid messages might still be sent tough)
150  self.log("error", "Exception in deserialization of BSON")
151 
152  else:
153  # TODO: handling of partial/multiple/broken json data in incoming buffer
154  # this way is problematic when json contains nested json-objects ( e.g. { ... { "config": [0,1,2,3] } ... } )
155  # .. if outer json is not fully received, stepping through opening brackets will find { "config" : ... } as a valid json object
156  # .. and pass this "inner" object to rosbridge and throw away the leading part of the "outer" object..
157  # solution for now:
158  # .. check for "op"-field. i can still imagine cases where a nested message ( e.g. complete service_response fits into the data field of a fragment..)
159  # .. would cause trouble, but if a response fits as a whole into a fragment, simply do not pack it into a fragment.
160  #
161  # --> from that follows current limitiation:
162  # fragment data must NOT (!) contain a complete json-object that has an "op-field"
163  #
164  # an alternative solution would be to only check from first opening bracket and have a time out on data in input buffer.. (to handle broken data)
165  opening_brackets = [i for i, letter in enumerate(self.buffer) if letter == '{']
166  closing_brackets = [i for i, letter in enumerate(self.buffer) if letter == '}']
167 
168  for start in opening_brackets:
169  for end in closing_brackets:
170  try:
171  msg = self.deserialize(self.buffer[start:end+1])
172  if msg.get("op",None) != None:
173  # TODO: check if throwing away leading data like this is okay.. loops look okay..
174  self.buffer = self.buffer[end+1:len(self.buffer)]
175  # jump out of inner loop if json-decode succeeded
176  break
177  except Exception as e:
178  # debug json-decode errors with this line
179  #print e
180  pass
181  # if load was successfull --> break outer loop, too.. -> no need to check if json begins at a "later" opening bracket..
182  if msg != None:
183  break
184 
185  # if decoding of buffer failed .. simply return
186  if msg is None:
187  return
188 
189  # process fields JSON-message object that "control" rosbridge
190  mid = None
191  if "id" in msg:
192  mid = msg["id"]
193  if "op" not in msg:
194  if "receiver" in msg:
195  self.log("error", "Received a rosbridge v1.0 message. Please refer to rosbridge.org for the correct format of rosbridge v2.0 messages. Original message was: %s" % message_string)
196  else:
197  self.log("error", "Received a message without an op. All messages require 'op' field with value one of: %s. Original message was: %s" % (list(self.operations.keys()), message_string), mid)
198  return
199  op = msg["op"]
200  if op not in self.operations:
201  self.log("error", "Unknown operation: %s. Allowed operations: %s" % (op, list(self.operations.keys())), mid)
202  return
203  # this way a client can change/overwrite it's active values anytime by just including parameter field in any message sent to rosbridge
204  # maybe need to be improved to bind parameter values to specific operation..
205  if "fragment_size" in msg.keys():
206  self.fragment_size = msg["fragment_size"]
207  #print "fragment size set to:", self.fragment_size
208  if "message_intervall" in msg.keys() and is_number(msg["message_intervall"]):
209  self.delay_between_messages = msg["message_intervall"]
210  if "png" in msg.keys():
211  self.png = msg["msg"]
212 
213  # now try to pass message to according operation
214  try:
215  self.operations[op](msg)
216  except Exception as exc:
217  self.log("error", "%s: %s" % (op, str(exc)), mid)
218 
219  # if anything left in buffer .. re-call self.incoming
220  # TODO: check what happens if we have "garbage" on tcp-stack --> infinite loop might be triggered! .. might get out of it when next valid JSON arrives since only data after last 'valid' closing bracket is kept
221  if len(self.buffer) > 0:
222  # try to avoid infinite loop..
223  if self.old_buffer != self.buffer:
224  self.old_buffer = self.buffer
225  self.incoming()
226 
227 
228 
229  def outgoing(self, message):
230  """ Pass an outgoing message to the client. This method should be
231  overridden.
232 
233  Keyword arguments:
234  message -- the wire-level message to send to the client
235 
236  """
237  pass
238 
239  def send(self, message, cid=None, compression="none"):
240  """ Called internally in preparation for sending messages to the client
241 
242  This method pre-processes the message then passes it to the overridden
243  outgoing method.
244 
245  Keyword arguments:
246  message -- a dict of message values to be marshalled and sent
247  cid -- (optional) an associated id
248 
249  """
250  serialized = message if compression in ["cbor", "cbor-raw"] else self.serialize(message, cid)
251  if serialized is not None:
252  if self.png == "png":
253  # TODO: png compression on outgoing messages
254  # encode message
255  pass
256 
257  fragment_list = None
258  if self.fragment_size != None and len(serialized) > self.fragment_size:
259  mid = message.get("id", None)
260 
261  # TODO: think about splitting into fragments that have specified size including header-fields!
262  # --> estimate header size --> split content into fragments that have the requested overall size, rather than requested content size
263  fragment_list = Fragmentation(self).fragment(message, self.fragment_size, mid )
264 
265  # fragment list not empty -> send fragments
266  if fragment_list != None:
267  for fragment in fragment_list:
268  if self.bson_only_mode:
269  self.outgoing(bson.BSON.encode(fragment), compression=compression)
270  else:
271  self.outgoing(json.dumps(fragment), compression=compression)
272  # okay to use delay here (sender's send()-function) because rosbridge is sending next request only to service provider when last one had finished)
273  # --> if this was not the case this delay needed to be implemented in service-provider's (meaning message receiver's) send_message()-function in rosbridge_tcp.py)
274  time.sleep(self.delay_between_messages)
275  # else send message as it is
276  else:
277  self.outgoing(serialized, compression=compression)
278  time.sleep(self.delay_between_messages)
279 
280  def finish(self):
281  """ Indicate that the client is finished and clean up resources.
282 
283  All clients should call this method after disconnecting.
284 
285  """
286  for capability in self.capabilities:
287  capability.finish()
288 
289  def serialize(self, msg, cid=None):
290  """ Turns a dictionary of values into the appropriate wire-level
291  representation.
292 
293  Default behaviour uses JSON. Override to use a different container.
294 
295  Keyword arguments:
296  msg -- the dictionary of values to serialize
297  cid -- (optional) an ID associated with this. Will be logged on err.
298 
299  Returns a JSON string representing the dictionary
300  """
301  try:
302  if type(msg) == bytearray:
303  return msg
304  if has_binary(msg) or self.bson_only_mode:
305  return bson.BSON.encode(msg)
306  else:
307  return json.dumps(msg)
308  except:
309  if cid is not None:
310  # Only bother sending the log message if there's an id
311  self.log("error", "Unable to serialize %s message to client"
312  % msg["op"], cid)
313  return None
314 
315  def deserialize(self, msg, cid=None):
316 
317  """ Turns the wire-level representation into a dictionary of values
318 
319  Default behaviour assumes JSON. Override to use a different container.
320 
321  Keyword arguments:
322  msg -- the wire-level message to deserialize
323  cid -- (optional) an ID associated with this. Is logged on error
324 
325  Returns a dictionary of values
326 
327  """
328  try:
329  if self.bson_only_mode:
330  bson_message = bson.BSON(msg)
331  return bson_message.decode()
332  else:
333  return json.loads(msg)
334  except Exception as e:
335  # if we did try to deserialize whole buffer .. first try to let self.incoming check for multiple/partial json-decodes before logging error
336  # .. this means, if buffer is not == msg --> we tried to decode part of buffer
337 
338  # TODO: implement a way to have a final Exception when nothing works out to decode (multiple/broken/partial JSON..)
339 
340  # supressed logging of exception on json-decode to keep rosbridge-logs "clean", otherwise console logs would get spammed for every failed json-decode try
341 # if msg != self.buffer:
342 # error_msg = "Unable to deserialize message from client: %s" % msg
343 # error_msg += "\nException was: " +str(e)
344 #
345 # self.log("error", error_msg, cid)
346 
347  # re-raise Exception to allow handling outside of deserialize function instead of returning None
348  raise
349  #return None
350 
351  def register_operation(self, opcode, handler):
352  """ Register a handler for an opcode
353 
354  Keyword arguments:
355  opcode -- the opcode to register this handler for
356  handler -- a callback function to call for messages with this opcode
357 
358  """
359  self.operations[opcode] = handler
360 
361  def unregister_operation(self, opcode):
362  """ Unregister a handler for an opcode
363 
364  Keyword arguments:
365  opcode -- the opcode to unregister the handler for
366 
367  """
368  if opcode in self.operations:
369  del self.operations[opcode]
370 
371  def add_capability(self, capability_class):
372  """ Add a capability to the protocol.
373 
374  This method is for convenience; assumes the default capability
375  constructor
376 
377  Keyword arguments:
378  capability_class -- the class of the capability to add
379 
380  """
381  self.capabilities.append(capability_class(self))
382 
383  def log(self, level, message, lid=None):
384  """ Log a message to the client. By default just sends to stdout
385 
386  Keyword arguments:
387  level -- the logger level of this message
388  message -- the string message to send to the user
389  lid -- an associated for this log message
390 
391  """
392  stdout_formatted_msg = None
393  if lid is not None:
394  stdout_formatted_msg = "[Client %s] [id: %s] %s" % (self.client_id, lid, message)
395  else:
396  stdout_formatted_msg = "[Client %s] %s" % (self.client_id, message)
397 
398  if level == "error" or level == "err":
399  rospy.logerr(stdout_formatted_msg)
400  elif level == "warning" or level == "warn":
401  rospy.logwarn(stdout_formatted_msg)
402  elif level == "info" or level == "information":
403  rospy.loginfo(stdout_formatted_msg)
404  else:
405  rospy.logdebug(stdout_formatted_msg)
def register_operation(self, opcode, handler)
Definition: protocol.py:351
def unregister_operation(self, opcode)
Definition: protocol.py:361
def __init__(self, client_id)
Definition: protocol.py:98
def add_capability(self, capability_class)
Definition: protocol.py:371
def deserialize(self, msg, cid=None)
Definition: protocol.py:315
def log(self, level, message, lid=None)
Definition: protocol.py:383
def send(self, message, cid=None, compression="none")
Definition: protocol.py:239
def incoming(self, message_string="")
Definition: protocol.py:120
def serialize(self, msg, cid=None)
Definition: protocol.py:289


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18