Package rospy :: Package impl :: Module masterslave

Source Code for Module rospy.impl.masterslave

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2008, Willow Garage, Inc.
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  #
 
 33  # Revision $Id$
 
 34  """
 
 35  Internal use: ROS Node (Slave) API. 
 
 36  
 
 37  The Node API is implemented by the L{ROSHandler}.
 
 38  
 
 39  API return convention: (statusCode, statusMessage, returnValue)
 
 40  
 
 41   - statusCode: an integer indicating the completion condition of the method. 
 
 42   - statusMessage: a human-readable string message for debugging
 
 43   - returnValue: the return value of the method; method-specific.
 
 44  
 
 45  Current status codes: 
 
 46  
 
 47   - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
 
 48   - 0: FAILURE: Method was attempted but failed to complete correctly.
 
 49   - 1: SUCCESS: Method completed successfully.
 
 50  
 
 51  Individual methods may assign additional meaning/semantics to statusCode.
 
 52  """ 
 53  
 
 54  import os 
 55  import sys 
 56  import itertools 
 57  import logging 
 58  import socket 
 59  import threading 
 60  import traceback 
 61  import time 
 62  import errno 
 63  
 
 64  try: 
 65      #py3k
 
 66      import urllib.parse as urlparse 
 67  except ImportError: 
 68      import urlparse 
 69  
 
 70  from rosgraph.xmlrpc import XmlRpcHandler 
 71  
 
 72  import rospy.names 
 73  import rospy.rostime 
 74  
 
 75  import rospy.impl.tcpros 
 76  
 
 77  from rospy.core import * 
 78  from rospy.impl.paramserver import get_param_server_cache 
 79  from rospy.impl.registration import RegManager, get_topic_manager 
 80  from rospy.impl.validators import non_empty, ParameterInvalid 
 81  
 
 82  # Return code slots
 
 83  STATUS = 0 
 84  MSG = 1 
 85  VAL = 2 
86 87 # pseudo-validators ############################### 88 # these validators actually return tuples instead of a function and it is up to a custom 89 # validator on the class itself to perform the validation 90 -def is_publishers_list(paramName):
91 return ('is_publishers_list', paramName)
92 93 _logger = logging.getLogger("rospy.impl.masterslave") 94 95 LOG_API = True
96 97 -def apivalidate(error_return_value, validators=()):
98 """ 99 ROS master/slave arg-checking decorator. Applies the specified 100 validator to the corresponding argument and also remaps each 101 argument to be the value returned by the validator. Thus, 102 arguments can be simultaneously validated and canonicalized prior 103 to actual function call. 104 @param error_return_value: API value to return if call unexpectedly fails 105 @param validators: sequence of validators to apply to each 106 arg. None means no validation for the parameter is required. As all 107 api methods take caller_id as the first parameter, the validators 108 start with the second param. 109 @type validators: sequence 110 """ 111 def check_validates(f): 112 assert len(validators) == f.__code__.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id 113 def validated_f(*args, **kwds): 114 if LOG_API: 115 _logger.debug("%s%s", f.__name__, str(args[1:])) 116 #print "%s%s"%(f.func_name, str(args[1:])) 117 if len(args) == 1: 118 _logger.error("%s invoked without caller_id parameter"%f.__name__) 119 return -1, "missing required caller_id parameter", error_return_value 120 elif len(args) != f.__code__.co_argcount: 121 return -1, "Error: bad call arity", error_return_value 122 123 instance = args[0] 124 caller_id = args[1] 125 if not isinstance(caller_id, str): 126 _logger.error("%s: invalid caller_id param type", f.__name__) 127 return -1, "caller_id must be a string", error_return_value 128 129 newArgs = [instance, caller_id] #canonicalized args 130 try: 131 for (v, a) in zip(validators, args[2:]): 132 if v: 133 try: 134 #simultaneously validate + canonicalized args 135 if type(v) == list or type(v) == tuple: 136 newArgs.append(instance._custom_validate(v[0], v[1], a, caller_id)) 137 else: 138 newArgs.append(v(a, caller_id)) 139 except ParameterInvalid as e: 140 _logger.error("%s: invalid parameter: %s", f.__name__, str(e) or 'error') 141 return -1, str(e) or 'error', error_return_value 142 else: 143 newArgs.append(a) 144 145 if LOG_API: 146 retval = f(*newArgs, **kwds) 147 _logger.debug("%s%s returns %s", f.__name__, args[1:], retval) 148 return retval 149 else: 150 code, msg, val = f(*newArgs, **kwds) 151 if val is None: 152 return -1, "Internal error (None value returned)", error_return_value 153 return code, msg, val 154 except TypeError as te: #most likely wrong arg number 155 _logger.error(traceback.format_exc()) 156 return -1, "Error: invalid arguments: %s"%te, error_return_value 157 except Exception as e: #internal failure 158 _logger.error(traceback.format_exc()) 159 return 0, "Internal failure: %s"%e, error_return_value
160 validated_f.__name__ = f.__name__ 161 validated_f.__doc__ = f.__doc__ #preserve doc 162 return validated_f 163 return check_validates 164
165 166 -class ROSHandler(XmlRpcHandler):
167 """ 168 Base handler for both slave and master nodes. API methods 169 generally provide the capability for establishing point-to-point 170 connections with other nodes. 171 172 Instance methods are XML-RPC API methods, so care must be taken as 173 to what is added here. 174 """ 175
176 - def __init__(self, name, master_uri):
177 """ 178 Base constructor for ROS nodes/masters 179 @param name: ROS name of this node 180 @type name: str 181 @param master_uri: URI of master node, or None if this node is the master 182 @type master_uri: str 183 """ 184 super(ROSHandler, self).__init__() 185 self.masterUri = master_uri 186 self.name = name 187 self.uri = None 188 self.done = False 189 190 # initialize protocol handlers. The master will not have any. 191 self.protocol_handlers = [] 192 handler = rospy.impl.tcpros.get_tcpros_handler() 193 if handler is not None: 194 self.protocol_handlers.append(handler) 195 196 self.reg_man = RegManager(self)
197 198 ############################################################################### 199 # INTERNAL 200
201 - def _is_registered(self):
202 """ 203 @return: True if slave API is registered with master. 204 @rtype: bool 205 """ 206 if self.reg_man is None: 207 return False 208 else: 209 return self.reg_man.is_registered()
210 211
212 - def _ready(self, uri):
213 """ 214 @param uri: XML-RPC URI 215 @type uri: str 216 callback from ROSNode to inform handler of correct i/o information 217 """ 218 _logger.info("_ready: %s", uri) 219 self.uri = uri 220 #connect up topics in separate thread 221 if self.reg_man: 222 t = threading.Thread(target=self.reg_man.start, args=(uri, self.masterUri)) 223 rospy.core._add_shutdown_thread(t) 224 t.start()
225
226 - def _custom_validate(self, validation, param_name, param_value, caller_id):
227 """ 228 Implements validation rules that require access to internal ROSHandler state. 229 @param validation: name of validation rule to use 230 @type validation: str 231 @param param_name: name of parameter being validated 232 @type param_name: str 233 @param param_value str: value of parameter 234 @type param_value: str 235 @param caller_id: value of caller_id parameter to API method 236 @type caller_id: str 237 @raise ParameterInvalid: if the parameter does not meet validation 238 @return: new value for parameter, after validation 239 """ 240 if validation == 'is_publishers_list': 241 if not type(param_value) == list: 242 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name) 243 for v in param_value: 244 if not isinstance(v, str): 245 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name) 246 parsed = urlparse.urlparse(v) 247 if not parsed[0] or not parsed[1]: #protocol and host 248 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v)) 249 return param_value 250 else: 251 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
252 253 ## static map for tracking which arguments to a function should be remapped 254 # { methodName : [ arg indices ] 255 _remap_table = { } 256 257 @classmethod
258 - def remappings(cls, methodName):
259 """ 260 @internal 261 @param cls: class to register remappings on 262 @type cls: Class: class to register remappings on 263 @return: parameters (by pos) that should be remapped because they are names 264 @rtype: list 265 """ 266 if methodName in cls._remap_table: 267 return cls._remap_table[methodName] 268 else: 269 return []
270 271 ############################################################################### 272 # UNOFFICIAL/PYTHON-ONLY API 273 274 @apivalidate('') 275 ## (Python-Only API) Get the XML-RPC URI of this server 276 ## @param self 277 ## @param caller_id str: ROS caller id 278 ## @return [int, str, str]: [1, "", xmlRpcUri]
279 - def getUri(self, caller_id):
280 return 1, "", self.uri
281 282 @apivalidate('') 283 ## (Python-Only API) Get the ROS node name of this server 284 ## @param self 285 ## @param caller_id str: ROS caller id 286 ## @return [int, str, str]: [1, "", ROS node name]
287 - def getName(self, caller_id):
288 return 1, "", self.name
289 290 291 ############################################################################### 292 # EXTERNAL API 293 294 @apivalidate([])
295 - def getBusStats(self, caller_id):
296 """ 297 Retrieve transport/topic statistics 298 @param caller_id: ROS caller id 299 @type caller_id: str 300 @return: [publishStats, subscribeStats, serviceStats]:: 301 publishStats: [[topicName, messageDataSent, pubConnectionData]...[topicNameN, messageDataSentN, pubConnectionDataN]] 302 pubConnectionData: [connectionId, bytesSent, numSent, connected]* . 303 subscribeStats: [[topicName, subConnectionData]...[topicNameN, subConnectionDataN]] 304 subConnectionData: [connectionId, bytesReceived, dropEstimate, connected]* . dropEstimate is -1 if no estimate. 305 serviceStats: not sure yet, probably akin to [numRequests, bytesReceived, bytesSent] 306 """ 307 pub_stats, sub_stats = get_topic_manager().get_pub_sub_stats() 308 #TODO: serviceStats 309 return 1, '', [pub_stats, sub_stats, []]
310 311 @apivalidate([])
312 - def getBusInfo(self, caller_id):
313 """ 314 Retrieve transport/topic connection information 315 @param caller_id: ROS caller id 316 @type caller_id: str 317 """ 318 return 1, "bus info", get_topic_manager().get_pub_sub_info()
319 320 @apivalidate('')
321 - def getMasterUri(self, caller_id):
322 """ 323 Get the URI of the master node. 324 @param caller_id: ROS caller id 325 @type caller_id: str 326 @return: [code, msg, masterUri] 327 @rtype: [int, str, str] 328 """ 329 if self.masterUri: 330 return 1, self.masterUri, self.masterUri 331 else: 332 return 0, "master URI not set", ""
333
334 - def _shutdown(self, reason=''):
335 """ 336 @param reason: human-readable debug string 337 @type reason: str 338 """ 339 if not self.done: 340 self.done = True 341 if reason: 342 _logger.info(reason) 343 if self.protocol_handlers: 344 for handler in self.protocol_handlers: 345 handler.shutdown() 346 del self.protocol_handlers[:] 347 self.protocol_handlers = None 348 return True
349 350 @apivalidate(0, (None, ))
351 - def shutdown(self, caller_id, msg=''):
352 """ 353 Stop this server 354 @param caller_id: ROS caller id 355 @type caller_id: str 356 @param msg: a message describing why the node is being shutdown. 357 @type msg: str 358 @return: [code, msg, 0] 359 @rtype: [int, str, int] 360 """ 361 if msg: 362 print("shutdown request: %s"%msg) 363 else: 364 print("shutdown requst") 365 if self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)): 366 signal_shutdown('external shutdown request from [%s]: [%s]'%(caller_id, msg)) 367 return 1, "shutdown", 0
368 369 @apivalidate(-1)
370 - def getPid(self, caller_id):
371 """ 372 Get the PID of this server 373 @param caller_id: ROS caller id 374 @type caller_id: str 375 @return: [1, "", serverProcessPID] 376 @rtype: [int, str, int] 377 """ 378 return 1, "", os.getpid()
379 380 ############################################################################### 381 # PUB/SUB APIS 382 383 @apivalidate([])
384 - def getSubscriptions(self, caller_id):
385 """ 386 Retrieve a list of topics that this node subscribes to. 387 @param caller_id: ROS caller id 388 @type caller_id: str 389 @return: list of topics this node subscribes to. 390 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]] 391 """ 392 return 1, "subscriptions", get_topic_manager().get_subscriptions()
393 394 @apivalidate([])
395 - def getPublications(self, caller_id):
396 """ 397 Retrieve a list of topics that this node publishes. 398 @param caller_id: ROS caller id 399 @type caller_id: str 400 @return: list of topics published by this node. 401 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]] 402 """ 403 return 1, "publications", get_topic_manager().get_publications()
404
405 - def _connect_topic(self, topic, pub_uri):
406 """ 407 Connect subscriber to topic. 408 @param topic: Topic name to connect. 409 @type topic: str 410 @param pub_uri: API URI of topic publisher. 411 @type pub_uri: str 412 @return: [code, msg, numConnects]. numConnects is the number 413 of subscribers connected to the topic. 414 @rtype: [int, str, int] 415 """ 416 caller_id = rospy.names.get_caller_id() 417 sub = get_topic_manager().get_subscriber_impl(topic) 418 if not sub: 419 return -1, "No subscriber for topic [%s]"%topic, 0 420 elif sub.has_connection(pub_uri): 421 return 1, "_connect_topic[%s]: subscriber already connected to publisher [%s]"%(topic, pub_uri), 0 422 423 #Negotiate with source for connection 424 # - collect supported protocols 425 protocols = [] 426 for h in self.protocol_handlers: #currently only TCPROS 427 protocols.extend(h.get_supported()) 428 if not protocols: 429 return 0, "ERROR: no available protocol handlers", 0 430 431 _logger.debug("connect[%s]: calling requestTopic(%s, %s, %s)", topic, caller_id, topic, str(protocols)) 432 # 1) have to preserve original (unresolved) params as this may 433 # go outside our graph 434 # 2) xmlrpclib doesn't give us any way of affecting the 435 # timeout other than affecting the global timeout. We need 436 # to set a timeout to prevent infinite hangs. 60 seconds is 437 # a *very* long time. All of the rospy code right now sets 438 # individual socket timeouts, but this could potentially 439 # affect user code. 440 socket.setdefaulttimeout(60.) 441 success = False 442 interval = 0.5 # seconds 443 # while the ROS node is not shutdown try to get the topic information 444 # and retry on connections problems after some wait 445 # Abort the retry if the we get a Connection Refused since at that point 446 # we know for sure the URI is invalid 447 while not success and not is_shutdown(): 448 try: 449 code, msg, result = \ 450 xmlrpcapi(pub_uri, cache=False).requestTopic(caller_id, topic, protocols) 451 success = True 452 except Exception as e: 453 if getattr(e, 'errno', None) == errno.ECONNREFUSED: 454 code = -errno.ECONNREFUSED 455 msg = str(e) 456 break 457 elif not is_shutdown(): 458 _logger.debug("Retrying for %s" % topic) 459 if interval < 30.0: 460 # exponential backoff (maximum 32 seconds) 461 interval = interval * 2 462 time.sleep(interval) 463 464 #Create the connection (if possible) 465 if code <= 0: 466 _logger.debug("connect[%s]: requestTopic did not succeed %s, %s", pub_uri, code, msg) 467 return code, msg, 0 468 elif not result or type(protocols) != list: 469 return 0, "ERROR: publisher returned invalid protocol choice: %s"%(str(result)), 0 470 _logger.debug("connect[%s]: requestTopic returned protocol list %s", topic, result) 471 protocol = result[0] 472 for h in self.protocol_handlers: 473 if h.supports(protocol): 474 return h.create_transport(topic, pub_uri, result) 475 return 0, "ERROR: publisher returned unsupported protocol choice: %s"%result, 0
476 477 @apivalidate(-1, (global_name('parameter_key'), None))
478 - def paramUpdate(self, caller_id, parameter_key, parameter_value):
479 """ 480 Callback from master of current publisher list for specified topic. 481 @param caller_id: ROS caller id 482 @type caller_id: str 483 @param parameter_key str: parameter name, globally resolved 484 @type parameter_key: str 485 @param parameter_value New parameter value 486 @type parameter_value: XMLRPC-legal value 487 @return: [code, status, ignore]. If code is -1 ERROR, the node 488 is not subscribed to parameter_key 489 @rtype: [int, str, int] 490 """ 491 try: 492 get_param_server_cache().update(parameter_key, parameter_value) 493 return 1, '', 0 494 except KeyError: 495 return -1, 'not subscribed', 0
496 497 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
498 - def publisherUpdate(self, caller_id, topic, publishers):
499 """ 500 Callback from master of current publisher list for specified topic. 501 @param caller_id: ROS caller id 502 @type caller_id: str 503 @param topic str: topic name 504 @type topic: str 505 @param publishers: list of current publishers for topic in the form of XMLRPC URIs 506 @type publishers: [str] 507 @return: [code, status, ignore] 508 @rtype: [int, str, int] 509 """ 510 if self.reg_man: 511 for uri in publishers: 512 self.reg_man.publisher_update(topic, publishers) 513 return 1, "", 0
514 515 _remap_table['requestTopic'] = [0] # remap topic 516 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
517 - def requestTopic(self, caller_id, topic, protocols):
518 """ 519 Publisher node API method called by a subscriber node. 520 521 Request that source allocate a channel for communication. Subscriber provides 522 a list of desired protocols for communication. Publisher returns the 523 selected protocol along with any additional params required for 524 establishing connection. For example, for a TCP/IP-based connection, 525 the source node may return a port number of TCP/IP server. 526 @param caller_id str: ROS caller id 527 @type caller_id: str 528 @param topic: topic name 529 @type topic: str 530 @param protocols: list of desired 531 protocols for communication in order of preference. Each 532 protocol is a list of the form [ProtocolName, 533 ProtocolParam1, ProtocolParam2...N] 534 @type protocols: [[str, XmlRpcLegalValue*]] 535 @return: [code, msg, protocolParams]. protocolParams may be an 536 empty list if there are no compatible protocols. 537 @rtype: [int, str, [str, XmlRpcLegalValue*]] 538 """ 539 if not get_topic_manager().has_publication(topic): 540 return -1, "Not a publisher of [%s]"%topic, [] 541 for protocol in protocols: #simple for now: select first implementation 542 protocol_id = protocol[0] 543 for h in self.protocol_handlers: 544 if h.supports(protocol_id): 545 _logger.debug("requestTopic[%s]: choosing protocol %s", topic, protocol_id) 546 return h.init_publisher(topic, protocol) 547 return 0, "no supported protocol implementations", []
548