Package rospy :: Package impl :: Module masterslave

Source Code for Module rospy.impl.masterslave

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2008, Willow Garage, Inc.
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  #
 
 33  # Revision $Id$
 
 34  """
 
 35  Internal use: ROS Node (Slave) API. 
 
 36  
 
 37  The Node API is implemented by the L{ROSHandler}.
 
 38  
 
 39  API return convention: (statusCode, statusMessage, returnValue)
 
 40  
 
 41   - statusCode: an integer indicating the completion condition of the method. 
 
 42   - statusMessage: a human-readable string message for debugging
 
 43   - returnValue: the return value of the method; method-specific.
 
 44  
 
 45  Current status codes: 
 
 46  
 
 47   - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
 
 48   - 0: FAILURE: Method was attempted but failed to complete correctly.
 
 49   - 1: SUCCESS: Method completed successfully.
 
 50  
 
 51  Individual methods may assign additional meaning/semantics to statusCode.
 
 52  """ 
 53  
 
 54  import os 
 55  import sys 
 56  import itertools 
 57  import logging 
 58  import socket 
 59  import threading 
 60  import traceback 
 61  import time 
 62  
 
 63  try: 
 64      #py3k
 
 65      import urllib.parse as urlparse 
 66  except ImportError: 
 67      import urlparse 
 68  
 
 69  from rosgraph.xmlrpc import XmlRpcHandler 
 70  
 
 71  import rospy.names 
 72  import rospy.rostime 
 73  
 
 74  import rospy.impl.tcpros 
 75  
 
 76  from rospy.core import * 
 77  from rospy.impl.paramserver import get_param_server_cache 
 78  from rospy.impl.registration import RegManager, get_topic_manager 
 79  from rospy.impl.validators import non_empty, ParameterInvalid 
 80  
 
 81  # Return code slots
 
 82  STATUS = 0 
 83  MSG = 1 
 84  VAL = 2 
85 86 # pseudo-validators ############################### 87 # these validators actually return tuples instead of a function and it is up to a custom 88 # validator on the class itself to perform the validation 89 -def is_publishers_list(paramName):
90 return ('is_publishers_list', paramName)
91 92 _logger = logging.getLogger("rospy.impl.masterslave") 93 94 LOG_API = True
95 96 -def apivalidate(error_return_value, validators=()):
97 """ 98 ROS master/slave arg-checking decorator. Applies the specified 99 validator to the corresponding argument and also remaps each 100 argument to be the value returned by the validator. Thus, 101 arguments can be simultaneously validated and canonicalized prior 102 to actual function call. 103 @param error_return_value: API value to return if call unexpectedly fails 104 @param validators: sequence of validators to apply to each 105 arg. None means no validation for the parameter is required. As all 106 api methods take caller_id as the first parameter, the validators 107 start with the second param. 108 @type validators: sequence 109 """ 110 def check_validates(f): 111 assert len(validators) == f.__code__.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id 112 def validated_f(*args, **kwds): 113 if LOG_API: 114 _logger.debug("%s%s", f.__name__, str(args[1:])) 115 #print "%s%s"%(f.func_name, str(args[1:])) 116 if len(args) == 1: 117 _logger.error("%s invoked without caller_id paramter"%f.__name__) 118 return -1, "missing required caller_id parameter", error_return_value 119 elif len(args) != f.__code__.co_argcount: 120 return -1, "Error: bad call arity", error_return_value 121 122 instance = args[0] 123 caller_id = args[1] 124 if not isinstance(caller_id, str): 125 _logger.error("%s: invalid caller_id param type", f.__name__) 126 return -1, "caller_id must be a string", error_return_value 127 128 newArgs = [instance, caller_id] #canonicalized args 129 try: 130 for (v, a) in zip(validators, args[2:]): 131 if v: 132 try: 133 #simultaneously validate + canonicalized args 134 if type(v) == list or type(v) == tuple: 135 newArgs.append(instance._custom_validate(v[0], v[1], a, caller_id)) 136 else: 137 newArgs.append(v(a, caller_id)) 138 except ParameterInvalid as e: 139 _logger.error("%s: invalid parameter: %s", f.__name__, str(e) or 'error') 140 return -1, str(e) or 'error', error_return_value 141 else: 142 newArgs.append(a) 143 144 if LOG_API: 145 retval = f(*newArgs, **kwds) 146 _logger.debug("%s%s returns %s", f.__name__, args[1:], retval) 147 return retval 148 else: 149 code, msg, val = f(*newArgs, **kwds) 150 if val is None: 151 return -1, "Internal error (None value returned)", error_return_value 152 return code, msg, val 153 except TypeError as te: #most likely wrong arg number 154 _logger.error(traceback.format_exc()) 155 return -1, "Error: invalid arguments: %s"%te, error_return_value 156 except Exception as e: #internal failure 157 _logger.error(traceback.format_exc()) 158 return 0, "Internal failure: %s"%e, error_return_value
159 validated_f.__name__ = f.__name__ 160 validated_f.__doc__ = f.__doc__ #preserve doc 161 return validated_f 162 return check_validates 163
164 165 -class ROSHandler(XmlRpcHandler):
166 """ 167 Base handler for both slave and master nodes. API methods 168 generally provide the capability for establishing point-to-point 169 connections with other nodes. 170 171 Instance methods are XML-RPC API methods, so care must be taken as 172 to what is added here. 173 """ 174
175 - def __init__(self, name, master_uri):
176 """ 177 Base constructor for ROS nodes/masters 178 @param name: ROS name of this node 179 @type name: str 180 @param master_uri: URI of master node, or None if this node is the master 181 @type master_uri: str 182 """ 183 super(ROSHandler, self).__init__() 184 self.masterUri = master_uri 185 self.name = name 186 self.uri = None 187 self.done = False 188 189 # initialize protocol handlers. The master will not have any. 190 self.protocol_handlers = [] 191 handler = rospy.impl.tcpros.get_tcpros_handler() 192 if handler is not None: 193 self.protocol_handlers.append(handler) 194 195 self.reg_man = RegManager(self)
196 197 ############################################################################### 198 # INTERNAL 199
200 - def _is_registered(self):
201 """ 202 @return: True if slave API is registered with master. 203 @rtype: bool 204 """ 205 if self.reg_man is None: 206 return False 207 else: 208 return self.reg_man.is_registered()
209 210
211 - def _ready(self, uri):
212 """ 213 @param uri: XML-RPC URI 214 @type uri: str 215 callback from ROSNode to inform handler of correct i/o information 216 """ 217 _logger.info("_ready: %s", uri) 218 self.uri = uri 219 #connect up topics in separate thread 220 if self.reg_man: 221 t = threading.Thread(target=self.reg_man.start, args=(uri, self.masterUri)) 222 rospy.core._add_shutdown_thread(t) 223 t.start()
224
225 - def _custom_validate(self, validation, param_name, param_value, caller_id):
226 """ 227 Implements validation rules that require access to internal ROSHandler state. 228 @param validation: name of validation rule to use 229 @type validation: str 230 @param param_name: name of parameter being validated 231 @type param_name: str 232 @param param_value str: value of parameter 233 @type param_value: str 234 @param caller_id: value of caller_id parameter to API method 235 @type caller_id: str 236 @raise ParameterInvalid: if the parameter does not meet validation 237 @return: new value for parameter, after validation 238 """ 239 if validation == 'is_publishers_list': 240 if not type(param_value) == list: 241 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name) 242 for v in param_value: 243 if not isinstance(v, str): 244 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name) 245 parsed = urlparse.urlparse(v) 246 if not parsed[0] or not parsed[1]: #protocol and host 247 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v)) 248 return param_value 249 else: 250 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
251 252 ## static map for tracking which arguments to a function should be remapped 253 # { methodName : [ arg indices ] 254 _remap_table = { } 255 256 @classmethod
257 - def remappings(cls, methodName):
258 """ 259 @internal 260 @param cls: class to register remappings on 261 @type cls: Class: class to register remappings on 262 @return: parameters (by pos) that should be remapped because they are names 263 @rtype: list 264 """ 265 if methodName in cls._remap_table: 266 return cls._remap_table[methodName] 267 else: 268 return []
269 270 ############################################################################### 271 # UNOFFICIAL/PYTHON-ONLY API 272 273 @apivalidate('') 274 ## (Python-Only API) Get the XML-RPC URI of this server 275 ## @param self 276 ## @param caller_id str: ROS caller id 277 ## @return [int, str, str]: [1, "", xmlRpcUri]
278 - def getUri(self, caller_id):
279 return 1, "", self.uri
280 281 @apivalidate('') 282 ## (Python-Only API) Get the ROS node name of this server 283 ## @param self 284 ## @param caller_id str: ROS caller id 285 ## @return [int, str, str]: [1, "", ROS node name]
286 - def getName(self, caller_id):
287 return 1, "", self.name
288 289 290 ############################################################################### 291 # EXTERNAL API 292 293 @apivalidate([])
294 - def getBusStats(self, caller_id):
295 """ 296 Retrieve transport/topic statistics 297 @param caller_id: ROS caller id 298 @type caller_id: str 299 @return: [publishStats, subscribeStats, serviceStats]:: 300 publishStats: [[topicName, messageDataSent, pubConnectionData]...[topicNameN, messageDataSentN, pubConnectionDataN]] 301 pubConnectionData: [connectionId, bytesSent, numSent, connected]* . 302 subscribeStats: [[topicName, subConnectionData]...[topicNameN, subConnectionDataN]] 303 subConnectionData: [connectionId, bytesReceived, dropEstimate, connected]* . dropEstimate is -1 if no estimate. 304 serviceStats: not sure yet, probably akin to [numRequests, bytesReceived, bytesSent] 305 """ 306 pub_stats, sub_stats = get_topic_manager().get_pub_sub_stats() 307 #TODO: serviceStats 308 return 1, '', [pub_stats, sub_stats, []]
309 310 @apivalidate([])
311 - def getBusInfo(self, caller_id):
312 """ 313 Retrieve transport/topic connection information 314 @param caller_id: ROS caller id 315 @type caller_id: str 316 """ 317 return 1, "bus info", get_topic_manager().get_pub_sub_info()
318 319 @apivalidate('')
320 - def getMasterUri(self, caller_id):
321 """ 322 Get the URI of the master node. 323 @param caller_id: ROS caller id 324 @type caller_id: str 325 @return: [code, msg, masterUri] 326 @rtype: [int, str, str] 327 """ 328 if self.masterUri: 329 return 1, self.masterUri, self.masterUri 330 else: 331 return 0, "master URI not set", ""
332
333 - def _shutdown(self, reason=''):
334 """ 335 @param reason: human-readable debug string 336 @type reason: str 337 """ 338 if not self.done: 339 self.done = True 340 if reason: 341 _logger.info(reason) 342 if self.protocol_handlers: 343 for handler in self.protocol_handlers: 344 handler.shutdown() 345 del self.protocol_handlers[:] 346 self.protocol_handlers = None 347 return True
348 349 @apivalidate(0, (None, ))
350 - def shutdown(self, caller_id, msg=''):
351 """ 352 Stop this server 353 @param caller_id: ROS caller id 354 @type caller_id: str 355 @param msg: a message describing why the node is being shutdown. 356 @type msg: str 357 @return: [code, msg, 0] 358 @rtype: [int, str, int] 359 """ 360 if msg: 361 print("shutdown request: %s"%msg) 362 else: 363 print("shutdown requst") 364 if self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)): 365 signal_shutdown('external shutdown request from [%s]: [%s]'%(caller_id, msg)) 366 return 1, "shutdown", 0
367 368 @apivalidate(-1)
369 - def getPid(self, caller_id):
370 """ 371 Get the PID of this server 372 @param caller_id: ROS caller id 373 @type caller_id: str 374 @return: [1, "", serverProcessPID] 375 @rtype: [int, str, int] 376 """ 377 return 1, "", os.getpid()
378 379 ############################################################################### 380 # PUB/SUB APIS 381 382 @apivalidate([])
383 - def getSubscriptions(self, caller_id):
384 """ 385 Retrieve a list of topics that this node subscribes to. 386 @param caller_id: ROS caller id 387 @type caller_id: str 388 @return: list of topics this node subscribes to. 389 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]] 390 """ 391 return 1, "subscriptions", get_topic_manager().get_subscriptions()
392 393 @apivalidate([])
394 - def getPublications(self, caller_id):
395 """ 396 Retrieve a list of topics that this node publishes. 397 @param caller_id: ROS caller id 398 @type caller_id: str 399 @return: list of topics published by this node. 400 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]] 401 """ 402 return 1, "publications", get_topic_manager().get_publications()
403
404 - def _connect_topic(self, topic, pub_uri):
405 """ 406 Connect subscriber to topic. 407 @param topic: Topic name to connect. 408 @type topic: str 409 @param pub_uri: API URI of topic publisher. 410 @type pub_uri: str 411 @return: [code, msg, numConnects]. numConnects is the number 412 of subscribers connected to the topic. 413 @rtype: [int, str, int] 414 """ 415 caller_id = rospy.names.get_caller_id() 416 sub = get_topic_manager().get_subscriber_impl(topic) 417 if not sub: 418 return -1, "No subscriber for topic [%s]"%topic, 0 419 elif sub.has_connection(pub_uri): 420 return 1, "_connect_topic[%s]: subscriber already connected to publisher [%s]"%(topic, pub_uri), 0 421 422 #Negotiate with source for connection 423 # - collect supported protocols 424 protocols = [] 425 for h in self.protocol_handlers: #currently only TCPROS 426 protocols.extend(h.get_supported()) 427 if not protocols: 428 return 0, "ERROR: no available protocol handlers", 0 429 430 _logger.debug("connect[%s]: calling requestTopic(%s, %s, %s)", topic, caller_id, topic, str(protocols)) 431 # 1) have to preserve original (unresolved) params as this may 432 # go outside our graph 433 # 2) xmlrpclib doesn't give us any way of affecting the 434 # timeout other than affecting the global timeout. We need 435 # to set a timeout to prevent infinite hangs. 60 seconds is 436 # a *very* long time. All of the rospy code right now sets 437 # individual socket timeouts, but this could potentially 438 # affect user code. 439 socket.setdefaulttimeout(60.) 440 tries = 0 441 max_num_tries = 3 442 success = False 443 while not success: 444 tries += 1 445 try: 446 code, msg, result = \ 447 xmlrpcapi(pub_uri).requestTopic(caller_id, topic, protocols) 448 success = True 449 except Exception as e: 450 if tries >= max_num_tries: 451 return 0, "unable to requestTopic: %s"%str(e), 0 452 else: 453 _logger.debug("Retrying for %s" % topic) 454 455 #Create the connection (if possible) 456 if code <= 0: 457 _logger.debug("connect[%s]: requestTopic did not succeed %s, %s", pub_uri, code, msg) 458 return code, msg, 0 459 elif not result or type(protocols) != list: 460 return 0, "ERROR: publisher returned invalid protocol choice: %s"%(str(result)), 0 461 _logger.debug("connect[%s]: requestTopic returned protocol list %s", topic, result) 462 protocol = result[0] 463 for h in self.protocol_handlers: 464 if h.supports(protocol): 465 return h.create_transport(topic, pub_uri, result) 466 return 0, "ERROR: publisher returned unsupported protocol choice: %s"%result, 0
467 468 @apivalidate(-1, (global_name('parameter_key'), None))
469 - def paramUpdate(self, caller_id, parameter_key, parameter_value):
470 """ 471 Callback from master of current publisher list for specified topic. 472 @param caller_id: ROS caller id 473 @type caller_id: str 474 @param parameter_key str: parameter name, globally resolved 475 @type parameter_key: str 476 @param parameter_value New parameter value 477 @type parameter_value: XMLRPC-legal value 478 @return: [code, status, ignore]. If code is -1 ERROR, the node 479 is not subscribed to parameter_key 480 @rtype: [int, str, int] 481 """ 482 try: 483 get_param_server_cache().update(parameter_key, parameter_value) 484 return 1, '', 0 485 except KeyError: 486 return -1, 'not subscribed', 0
487 488 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
489 - def publisherUpdate(self, caller_id, topic, publishers):
490 """ 491 Callback from master of current publisher list for specified topic. 492 @param caller_id: ROS caller id 493 @type caller_id: str 494 @param topic str: topic name 495 @type topic: str 496 @param publishers: list of current publishers for topic in the form of XMLRPC URIs 497 @type publishers: [str] 498 @return: [code, status, ignore] 499 @rtype: [int, str, int] 500 """ 501 if self.reg_man: 502 for uri in publishers: 503 self.reg_man.publisher_update(topic, publishers) 504 return 1, "", 0
505 506 _remap_table['requestTopic'] = [0] # remap topic 507 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
508 - def requestTopic(self, caller_id, topic, protocols):
509 """ 510 Publisher node API method called by a subscriber node. 511 512 Request that source allocate a channel for communication. Subscriber provides 513 a list of desired protocols for communication. Publisher returns the 514 selected protocol along with any additional params required for 515 establishing connection. For example, for a TCP/IP-based connection, 516 the source node may return a port number of TCP/IP server. 517 @param caller_id str: ROS caller id 518 @type caller_id: str 519 @param topic: topic name 520 @type topic: str 521 @param protocols: list of desired 522 protocols for communication in order of preference. Each 523 protocol is a list of the form [ProtocolName, 524 ProtocolParam1, ProtocolParam2...N] 525 @type protocols: [[str, XmlRpcLegalValue*]] 526 @return: [code, msg, protocolParams]. protocolParams may be an 527 empty list if there are no compatible protocols. 528 @rtype: [int, str, [str, XmlRpcLegalValue*]] 529 """ 530 if not get_topic_manager().has_publication(topic): 531 return -1, "Not a publisher of [%s]"%topic, [] 532 for protocol in protocols: #simple for now: select first implementation 533 protocol_id = protocol[0] 534 for h in self.protocol_handlers: 535 if h.supports(protocol_id): 536 _logger.debug("requestTopic[%s]: choosing protocol %s", topic, protocol_id) 537 return h.init_publisher(topic, protocol) 538 return 0, "no supported protocol implementations", []
539