Package rospy :: Package impl :: Module masterslave

Source Code for Module rospy.impl.masterslave

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2008, Willow Garage, Inc.
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  #
 
 33  # Revision $Id: masterslave.py 13295 2011-02-17 22:38:51Z kwc $
 
 34  """
 
 35  Internal use: ROS Node (Slave) API. 
 
 36  
 
 37  The Node API is implemented by the L{ROSHandler}.
 
 38  
 
 39  API return convention: (statusCode, statusMessage, returnValue)
 
 40  
 
 41   - statusCode: an integer indicating the completion condition of the method. 
 
 42   - statusMessage: a human-readable string message for debugging
 
 43   - returnValue: the return value of the method; method-specific.
 
 44  
 
 45  Current status codes: 
 
 46  
 
 47   - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
 
 48   - 0: FAILURE: Method was attempted but failed to complete correctly.
 
 49   - 1: SUCCESS: Method completed successfully.
 
 50  
 
 51  Individual methods may assign additional meaning/semantics to statusCode.
 
 52  """ 
 53  
 
 54  import os 
 55  import sys 
 56  import itertools 
 57  import logging 
 58  import socket 
 59  import threading 
 60  import traceback 
 61  import time 
 62  import urlparse 
 63  
 
 64  from roslib.xmlrpc import XmlRpcHandler 
 65  
 
 66  import rospy.names 
 67  import rospy.rostime 
 68  
 
 69  import rospy.impl.tcpros 
 70  
 
 71  from rospy.core import * 
 72  from rospy.impl.paramserver import get_param_server_cache 
 73  from rospy.impl.registration import RegManager, get_topic_manager 
 74  from rospy.impl.validators import non_empty, ParameterInvalid 
 75  
 
 76  # Return code slots
 
 77  STATUS = 0 
 78  MSG = 1 
 79  VAL = 2 
80 81 # pseudo-validators ############################### 82 # these validators actually return tuples instead of a function and it is up to a custom 83 # validator on the class itself to perform the validation 84 -def is_publishers_list(paramName):
85 return ('is_publishers_list', paramName)
86 87 _logger = logging.getLogger("rospy.impl.masterslave") 88 89 LOG_API = True
90 91 -def apivalidate(error_return_value, validators=()):
92 """ 93 ROS master/slave arg-checking decorator. Applies the specified 94 validator to the corresponding argument and also remaps each 95 argument to be the value returned by the validator. Thus, 96 arguments can be simultaneously validated and canonicalized prior 97 to actual function call. 98 @param error_return_value: API value to return if call unexpectedly fails 99 @param validators: sequence of validators to apply to each 100 arg. None means no validation for the parameter is required. As all 101 api methods take caller_id as the first parameter, the validators 102 start with the second param. 103 @type validators: sequence 104 """ 105 def check_validates(f): 106 assert len(validators) == f.func_code.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id 107 def validated_f(*args, **kwds): 108 if LOG_API: 109 _logger.debug("%s%s", f.func_name, str(args[1:])) 110 #print "%s%s"%(f.func_name, str(args[1:])) 111 if len(args) == 1: 112 _logger.error("%s invoked without caller_id paramter"%f.func_name) 113 return -1, "missing required caller_id parameter", error_return_value 114 elif len(args) != f.func_code.co_argcount: 115 return -1, "Error: bad call arity", error_return_value 116 117 instance = args[0] 118 caller_id = args[1] 119 if not isinstance(caller_id, basestring): 120 _logger.error("%s: invalid caller_id param type", f.func_name) 121 return -1, "caller_id must be a string", error_return_value 122 123 newArgs = [instance, caller_id] #canonicalized args 124 try: 125 for (v, a) in itertools.izip(validators, args[2:]): 126 if v: 127 try: 128 #simultaneously validate + canonicalized args 129 if type(v) == list or type(v) == tuple: 130 newArgs.append(instance._custom_validate(v[0], v[1], a, caller_id)) 131 else: 132 newArgs.append(v(a, caller_id)) 133 except ParameterInvalid, e: 134 _logger.error("%s: invalid parameter: %s", f.func_name, str(e) or 'error') 135 return -1, str(e) or 'error', error_return_value 136 else: 137 newArgs.append(a) 138 139 if LOG_API: 140 retval = f(*newArgs, **kwds) 141 _logger.debug("%s%s returns %s", f.func_name, args[1:], retval) 142 return retval 143 else: 144 code, msg, val = f(*newArgs, **kwds) 145 if val is None: 146 return -1, "Internal error (None value returned)", error_return_value 147 return code, msg, val 148 except TypeError, te: #most likely wrong arg number 149 _logger.error(traceback.format_exc()) 150 return -1, "Error: invalid arguments: %s"%te, error_return_value 151 except Exception, e: #internal failure 152 _logger.error(traceback.format_exc()) 153 return 0, "Internal failure: %s"%e, error_return_value
154 validated_f.func_name = f.func_name 155 validated_f.__doc__ = f.__doc__ #preserve doc 156 return validated_f 157 return check_validates 158
159 160 -class ROSHandler(XmlRpcHandler):
161 """ 162 Base handler for both slave and master nodes. API methods 163 generally provide the capability for establishing point-to-point 164 connections with other nodes. 165 166 Instance methods are XML-RPC API methods, so care must be taken as 167 to what is added here. 168 """ 169
170 - def __init__(self, name, master_uri):
171 """ 172 Base constructor for ROS nodes/masters 173 @param name: ROS name of this node 174 @type name: str 175 @param master_uri: URI of master node, or None if this node is the master 176 @type master_uri: str 177 """ 178 super(ROSHandler, self).__init__() 179 self.masterUri = master_uri 180 self.name = name 181 self.uri = None 182 self.done = False 183 184 # initialize protocol handlers. The master will not have any. 185 self.protocol_handlers = [] 186 handler = rospy.impl.tcpros.get_tcpros_handler() 187 if handler is not None: 188 self.protocol_handlers.append(handler) 189 190 self.reg_man = RegManager(self)
191 192 ############################################################################### 193 # INTERNAL 194
195 - def _is_registered(self):
196 """ 197 @return: True if slave API is registered with master. 198 @rtype: bool 199 """ 200 if self.reg_man is None: 201 return False 202 else: 203 return self.reg_man.is_registered()
204 205
206 - def _ready(self, uri):
207 """ 208 @param uri: XML-RPC URI 209 @type uri: str 210 callback from ROSNode to inform handler of correct i/o information 211 """ 212 _logger.info("_ready: %s", uri) 213 self.uri = uri 214 #connect up topics in separate thread 215 if self.reg_man: 216 t = threading.Thread(target=self.reg_man.start, args=(uri, self.masterUri)) 217 rospy.core._add_shutdown_thread(t) 218 t.start()
219
220 - def _custom_validate(self, validation, param_name, param_value, caller_id):
221 """ 222 Implements validation rules that require access to internal ROSHandler state. 223 @param validation: name of validation rule to use 224 @type validation: str 225 @param param_name: name of parameter being validated 226 @type param_name: str 227 @param param_value str: value of parameter 228 @type param_value: str 229 @param caller_id: value of caller_id parameter to API method 230 @type caller_id: str 231 @raise ParameterInvalid: if the parameter does not meet validation 232 @return: new value for parameter, after validation 233 """ 234 if validation == 'is_publishers_list': 235 if not type(param_value) == list: 236 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name) 237 for v in param_value: 238 if not isinstance(v, basestring): 239 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name) 240 parsed = urlparse.urlparse(v) 241 if not parsed[0] or not parsed[1]: #protocol and host 242 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v)) 243 return param_value 244 else: 245 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
246 247 ## static map for tracking which arguments to a function should be remapped 248 # { methodName : [ arg indices ] 249 _remap_table = { } 250 251 @classmethod
252 - def remappings(cls, methodName):
253 """ 254 @internal 255 @param cls: class to register remappings on 256 @type cls: Class: class to register remappings on 257 @return: parameters (by pos) that should be remapped because they are names 258 @rtype: list 259 """ 260 if methodName in cls._remap_table: 261 return cls._remap_table[methodName] 262 else: 263 return []
264 265 ############################################################################### 266 # UNOFFICIAL/PYTHON-ONLY API 267 268 @apivalidate('') 269 ## (Python-Only API) Get the XML-RPC URI of this server 270 ## @param self 271 ## @param caller_id str: ROS caller id 272 ## @return [int, str, str]: [1, "", xmlRpcUri]
273 - def getUri(self, caller_id):
274 return 1, "", self.uri
275 276 @apivalidate('') 277 ## (Python-Only API) Get the ROS node name of this server 278 ## @param self 279 ## @param caller_id str: ROS caller id 280 ## @return [int, str, str]: [1, "", ROS node name]
281 - def getName(self, caller_id):
282 return 1, "", self.name
283 284 285 ############################################################################### 286 # EXTERNAL API 287 288 @apivalidate([])
289 - def getBusStats(self, caller_id):
290 """ 291 Retrieve transport/topic statistics 292 @param caller_id: ROS caller id 293 @type caller_id: str 294 @return: [publishStats, subscribeStats, serviceStats]:: 295 publishStats: [[topicName, messageDataSent, pubConnectionData]...[topicNameN, messageDataSentN, pubConnectionDataN]] 296 pubConnectionData: [connectionId, bytesSent, numSent, connected]* . 297 subscribeStats: [[topicName, subConnectionData]...[topicNameN, subConnectionDataN]] 298 subConnectionData: [connectionId, bytesReceived, dropEstimate, connected]* . dropEstimate is -1 if no estimate. 299 serviceStats: not sure yet, probably akin to [numRequests, bytesReceived, bytesSent] 300 """ 301 pub_stats, sub_stats = get_topic_manager().get_pub_sub_stats() 302 #TODO: serviceStats 303 return 1, '', [pub_stats, sub_stats, []]
304 305 @apivalidate([])
306 - def getBusInfo(self, caller_id):
307 """ 308 Retrieve transport/topic connection information 309 @param caller_id: ROS caller id 310 @type caller_id: str 311 """ 312 return 1, "bus info", get_topic_manager().get_pub_sub_info()
313 314 @apivalidate('')
315 - def getMasterUri(self, caller_id):
316 """ 317 Get the URI of the master node. 318 @param caller_id: ROS caller id 319 @type caller_id: str 320 @return: [code, msg, masterUri] 321 @rtype: [int, str, str] 322 """ 323 if self.masterUri: 324 return 1, self.masterUri, self.masterUri 325 else: 326 return 0, "master URI not set", ""
327
328 - def _shutdown(self, reason=''):
329 """ 330 @param reason: human-readable debug string 331 @type reason: str 332 """ 333 if not self.done: 334 self.done = True 335 if reason: 336 _logger.info(reason) 337 if self.protocol_handlers: 338 for handler in self.protocol_handlers: 339 handler.shutdown() 340 del self.protocol_handlers[:] 341 self.protocol_handlers = None 342 return True
343 344 @apivalidate(0, (None, ))
345 - def shutdown(self, caller_id, msg=''):
346 """ 347 Stop this server 348 @param caller_id: ROS caller id 349 @type caller_id: str 350 @param msg: a message describing why the node is being shutdown. 351 @type msg: str 352 @return: [code, msg, 0] 353 @rtype: [int, str, int] 354 """ 355 if msg: 356 print >> sys.stdout, "shutdown request: %s"%msg 357 else: 358 print >> sys.stdout, "shutdown requst" 359 if self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)): 360 signal_shutdown('external shutdown request from [%s]: [%s]'%(caller_id, msg)) 361 return 1, "shutdown", 0
362 363 @apivalidate(-1)
364 - def getPid(self, caller_id):
365 """ 366 Get the PID of this server 367 @param caller_id: ROS caller id 368 @type caller_id: str 369 @return: [1, "", serverProcessPID] 370 @rtype: [int, str, int] 371 """ 372 return 1, "", os.getpid()
373 374 ############################################################################### 375 # PUB/SUB APIS 376 377 @apivalidate([])
378 - def getSubscriptions(self, caller_id):
379 """Retrieve a list of topics that this node subscribes to 380 @param caller_id: ROS caller id 381 @type caller_id: str 382 @return [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]]: list of topics this node subscribes to 383 """ 384 return 1, "subscriptions", get_topic_manager().get_subscriptions()
385 386 @apivalidate([])
387 - def getPublications(self, caller_id):
388 """ 389 Retrieve a list of topics that this node publishes. 390 @param caller_id: ROS caller id 391 @type caller_id: str 392 @return: list of topics published by this node 393 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]] 394 """ 395 return 1, "publications", get_topic_manager().get_publications()
396
397 - def _connect_topic(self, topic, pub_uri):
398 """ 399 Connect subscriber to topic 400 @param topic: topic name to connect 401 @type topic: str 402 @param pub_uri: API URI of topic publisher 403 @type pub_uri: str 404 @return: [code, msg, numConnects]. numConnects is the number 405 of subscribers connected to the topic 406 @rtype: [int, str, int] 407 """ 408 caller_id = rospy.names.get_caller_id() 409 sub = get_topic_manager().get_subscriber_impl(topic) 410 if not sub: 411 return -1, "No subscriber for topic [%s]"%topic, 0 412 elif sub.has_connection(pub_uri): 413 return 1, "_connect_topic[%s]: subscriber already connected to publisher [%s]"%(topic, pub_uri), 0 414 415 #Negotiate with source for connection 416 # - collect supported protocols 417 protocols = [] 418 for h in self.protocol_handlers: #currently only TCPROS 419 protocols.extend(h.get_supported()) 420 if not protocols: 421 return 0, "ERROR: no available protocol handlers", 0 422 423 _logger.debug("connect[%s]: calling requestTopic(%s, %s, %s)", topic, caller_id, topic, str(protocols)) 424 # 1) have to preserve original (unresolved) params as this may 425 # go outside our graph 426 # 2) xmlrpclib doesn't give us any way of affecting the 427 # timeout other than affecting the global timeout. We need 428 # to set a timeout to prevent infinite hangs. 60 seconds is 429 # a *very* long time. All of the rospy code right now sets 430 # individual socket timeouts, but this could potentially 431 # affect user code. 432 socket.setdefaulttimeout(60.) 433 tries = 0 434 max_num_tries = 3 435 success = False 436 while not success: 437 tries += 1 438 try: 439 code, msg, result = \ 440 xmlrpcapi(pub_uri).requestTopic(caller_id, topic, protocols) 441 success = True 442 except Exception, e: 443 if tries >= max_num_tries: 444 return 0, "unable to requestTopic: %s"%str(e), 0 445 else: 446 _logger.debug("Retrying for %s" % topic) 447 448 #Create the connection (if possible) 449 if code <= 0: 450 _logger.debug("connect[%s]: requestTopic did not succeed %s, %s", pub_uri, code, msg) 451 return code, msg, 0 452 elif not result or type(protocols) != list: 453 return 0, "ERROR: publisher returned invalid protocol choice: %s"%(str(result)), 0 454 _logger.debug("connect[%s]: requestTopic returned protocol list %s", topic, result) 455 protocol = result[0] 456 for h in self.protocol_handlers: 457 if h.supports(protocol): 458 return h.create_transport(topic, pub_uri, result) 459 return 0, "ERROR: publisher returned unsupported protocol choice: %s"%result, 0
460 461 @apivalidate(-1, (global_name('parameter_key'), None))
462 - def paramUpdate(self, caller_id, parameter_key, parameter_value):
463 """ 464 Callback from master of current publisher list for specified topic. 465 @param caller_id: ROS caller id 466 @type caller_id: str 467 @param parameter_key str: parameter name, globally resolved 468 @type parameter_key: str 469 @param parameter_value New parameter value 470 @type parameter_value: XMLRPC-legal value 471 @return: [code, status, ignore]. If code is -1 ERROR, the node 472 is not subscribed to parameter_key 473 @rtype: [int, str, int] 474 """ 475 try: 476 get_param_server_cache().update(parameter_key, parameter_value) 477 return 1, '', 0 478 except KeyError: 479 return -1, 'not subscribed', 0
480 481 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
482 - def publisherUpdate(self, caller_id, topic, publishers):
483 """ 484 Callback from master of current publisher list for specified topic. 485 @param caller_id: ROS caller id 486 @type caller_id: str 487 @param topic str: topic name 488 @type topic: str 489 @param publishers: list of current publishers for topic in the form of XMLRPC URIs 490 @type publishers: [str] 491 @return: [code, status, ignore] 492 @rtype: [int, str, int] 493 """ 494 if self.reg_man: 495 for uri in publishers: 496 self.reg_man.publisher_update(topic, publishers) 497 return 1, "", 0
498 499 _remap_table['requestTopic'] = [0] # remap topic 500 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
501 - def requestTopic(self, caller_id, topic, protocols):
502 """ 503 Publisher node API method called by a subscriber node. 504 505 Request that source allocate a channel for communication. Subscriber provides 506 a list of desired protocols for communication. Publisher returns the 507 selected protocol along with any additional params required for 508 establishing connection. For example, for a TCP/IP-based connection, 509 the source node may return a port number of TCP/IP server. 510 @param caller_id str: ROS caller id 511 @type caller_id: str 512 @param topic: topic name 513 @type topic: str 514 @param protocols: list of desired 515 protocols for communication in order of preference. Each 516 protocol is a list of the form [ProtocolName, 517 ProtocolParam1, ProtocolParam2...N] 518 @type protocols: [[str, XmlRpcLegalValue*]] 519 @return: [code, msg, protocolParams]. protocolParams may be an 520 empty list if there are no compatible protocols. 521 @rtype: [int, str, [str, XmlRpcLegalValue*]] 522 """ 523 if not get_topic_manager().has_publication(topic): 524 return -1, "Not a publisher of [%s]"%topic, [] 525 for protocol in protocols: #simple for now: select first implementation 526 protocol_id = protocol[0] 527 for h in self.protocol_handlers: 528 if h.supports(protocol_id): 529 _logger.debug("requestTopic[%s]: choosing protocol %s", topic, protocol_id) 530 return h.init_publisher(topic, protocol) 531 return 0, "no supported protocol implementations", []
532