Package rosmaster :: Module master_api

Source Code for Module rosmaster.master_api

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2008, Willow Garage, Inc.
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  #
 
 33  # Revision $Id$
 
 34  """
 
 35  ROS Master API. 
 
 36  
 
 37  L{ROSMasterHandler} provides the API implementation of the
 
 38  Master. Python allows an API to be introspected from a Python class,
 
 39  so the handler has a 1-to-1 mapping with the actual XMLRPC API.
 
 40  
 
 41  API return convention: (statusCode, statusMessage, returnValue)
 
 42  
 
 43   - statusCode: an integer indicating the completion condition of the method. 
 
 44   - statusMessage: a human-readable string message for debugging
 
 45   - returnValue: the return value of the method; method-specific.
 
 46  
 
 47  Current status codes: 
 
 48  
 
 49   - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
 
 50   - 0: FAILURE: Method was attempted but failed to complete correctly.
 
 51   - 1: SUCCESS: Method completed successfully.
 
 52  
 
 53  Individual methods may assign additional meaning/semantics to statusCode.
 
 54  """ 
 55  
 
 56  import os 
 57  import sys 
 58  import logging 
 59  import threading 
 60  import time 
 61  import traceback 
 62  
 
 63  from rosgraph.xmlrpc import XmlRpcHandler 
 64  
 
 65  import rosgraph.names 
 66  from rosgraph.names import resolve_name 
 67  import rosmaster.paramserver 
 68  import rosmaster.threadpool 
 69  
 
 70  from rosmaster.util import xmlrpcapi 
 71  from rosmaster.registrations import RegistrationManager 
 72  from rosmaster.validators import non_empty, non_empty_str, not_none, is_api, is_topic, is_service, valid_type_name, valid_name, empty_or_valid_name, ParameterInvalid 
 73  
 
 74  NUM_WORKERS = 3 #number of threads we use to send publisher_update notifications 
 75  
 
 76  # Return code slots
 
 77  STATUS = 0 
 78  MSG = 1 
 79  VAL = 2 
 80  
 
 81  _logger = logging.getLogger("rosmaster.master") 
 82  
 
 83  LOG_API = False 
84 85 -def mloginfo(msg, *args):
86 """ 87 Info-level master log statements. These statements may be printed 88 to screen so they should be user-readable. 89 @param msg: Message string 90 @type msg: str 91 @param args: arguments for msg if msg is a format string 92 """ 93 #mloginfo is in core so that it is accessible to master and masterdata 94 _logger.info(msg, *args)
95
96 -def mlogwarn(msg, *args):
97 """ 98 Warn-level master log statements. These statements may be printed 99 to screen so they should be user-readable. 100 @param msg: Message string 101 @type msg: str 102 @param args: arguments for msg if msg is a format string 103 """ 104 #mloginfo is in core so that it is accessible to master and masterdata 105 _logger.warn(msg, *args) 106 if args: 107 print "WARN: "+msg%args 108 else: 109 print "WARN: "+str(msg)
110
111 112 -def apivalidate(error_return_value, validators=()):
113 """ 114 ROS master/slave arg-checking decorator. Applies the specified 115 validator to the corresponding argument and also remaps each 116 argument to be the value returned by the validator. Thus, 117 arguments can be simultaneously validated and canonicalized prior 118 to actual function call. 119 @param error_return_value: API value to return if call unexpectedly fails 120 @param validators: sequence of validators to apply to each 121 arg. None means no validation for the parameter is required. As all 122 api methods take caller_id as the first parameter, the validators 123 start with the second param. 124 @type validators: sequence 125 """ 126 def check_validates(f): 127 assert len(validators) == f.func_code.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id 128 def validated_f(*args, **kwds): 129 if LOG_API: 130 _logger.debug("%s%s", f.func_name, str(args[1:])) 131 #print "%s%s"%(f.func_name, str(args[1:])) 132 if len(args) == 1: 133 _logger.error("%s invoked without caller_id paramter"%f.func_name) 134 return -1, "missing required caller_id parameter", error_return_value 135 elif len(args) != f.func_code.co_argcount: 136 return -1, "Error: bad call arity", error_return_value 137 138 instance = args[0] 139 caller_id = args[1] 140 if not isinstance(caller_id, basestring): 141 _logger.error("%s: invalid caller_id param type", f.func_name) 142 return -1, "caller_id must be a string", error_return_value 143 144 newArgs = [instance, caller_id] #canonicalized args 145 try: 146 for (v, a) in zip(validators, args[2:]): 147 if v: 148 try: 149 newArgs.append(v(a, caller_id)) 150 except ParameterInvalid, e: 151 _logger.error("%s: invalid parameter: %s", f.func_name, str(e) or 'error') 152 return -1, str(e) or 'error', error_return_value 153 else: 154 newArgs.append(a) 155 156 if LOG_API: 157 retval = f(*newArgs, **kwds) 158 _logger.debug("%s%s returns %s", f.func_name, args[1:], retval) 159 return retval 160 else: 161 code, msg, val = f(*newArgs, **kwds) 162 if val is None: 163 return -1, "Internal error (None value returned)", error_return_value 164 return code, msg, val 165 except TypeError, te: #most likely wrong arg number 166 _logger.error(traceback.format_exc()) 167 return -1, "Error: invalid arguments: %s"%te, error_return_value 168 except Exception, e: #internal failure 169 _logger.error(traceback.format_exc()) 170 return 0, "Internal failure: %s"%e, error_return_value
171 validated_f.func_name = f.func_name 172 validated_f.__doc__ = f.__doc__ #preserve doc 173 return validated_f 174 return check_validates 175
176 -def publisher_update_task(api, topic, pub_uris):
177 """ 178 Contact api.publisherUpdate with specified parameters 179 @param api: XML-RPC URI of node to contact 180 @type api: str 181 @param topic: Topic name to send to node 182 @type topic: str 183 @param pub_uris: list of publisher APIs to send to node 184 @type pub_uris: [str] 185 """ 186 187 mloginfo("publisherUpdate[%s] -> %s", topic, api) 188 #TODO: check return value for errors so we can unsubscribe if stale 189 xmlrpcapi(api).publisherUpdate('/master', topic, pub_uris)
190
191 -def service_update_task(api, service, uri):
192 """ 193 Contact api.serviceUpdate with specified parameters 194 @param api: XML-RPC URI of node to contact 195 @type api: str 196 @param service: Service name to send to node 197 @type service: str 198 @param uri: URI to send to node 199 @type uri: str 200 """ 201 mloginfo("serviceUpdate[%s, %s] -> %s",service, uri, api) 202 xmlrpcapi(api).serviceUpdate('/master', service, uri)
203
204 ################################################### 205 # Master Implementation 206 207 -class ROSMasterHandler(object):
208 """ 209 XML-RPC handler for ROS master APIs. 210 API routines for the ROS Master Node. The Master Node is a 211 superset of the Slave Node and contains additional API methods for 212 creating and monitoring a graph of slave nodes. 213 214 By convention, ROS nodes take in caller_id as the first parameter 215 of any API call. The setting of this parameter is rarely done by 216 client code as ros::msproxy::MasterProxy automatically inserts 217 this parameter (see ros::client::getMaster()). 218 """ 219
220 - def __init__(self):
221 """ctor.""" 222 223 self.uri = None 224 self.done = False 225 226 self.thread_pool = rosmaster.threadpool.MarkedThreadPool(NUM_WORKERS) 227 # pub/sub/providers: dict { topicName : [publishers/subscribers names] } 228 self.ps_lock = threading.Condition(threading.Lock()) 229 230 self.reg_manager = RegistrationManager(self.thread_pool) 231 232 # maintain refs to reg_manager fields 233 self.publishers = self.reg_manager.publishers 234 self.subscribers = self.reg_manager.subscribers 235 self.services = self.reg_manager.services 236 self.param_subscribers = self.reg_manager.param_subscribers 237 238 self.topics_types = {} #dict { topicName : type } 239 240 # parameter server dictionary 241 self.param_server = rosmaster.paramserver.ParamDictionary(self.reg_manager)
242
243 - def _shutdown(self, reason=''):
244 if self.thread_pool is not None: 245 self.thread_pool.join_all(wait_for_tasks=False, wait_for_threads=False) 246 self.thread_pool = None 247 self.done = True
248
249 - def _ready(self, uri):
250 """ 251 Initialize the handler with the XMLRPC URI. This is a standard callback from the XmlRpcNode API. 252 253 @param uri: XML-RPC URI 254 @type uri: str 255 """ 256 self.uri = uri
257
258 - def _ok(self):
259 return not self.done
260 261 ############################################################################### 262 # EXTERNAL API 263 264 @apivalidate(0, (None, ))
265 - def shutdown(self, caller_id, msg=''):
266 """ 267 Stop this server 268 @param caller_id: ROS caller id 269 @type caller_id: str 270 @param msg: a message describing why the node is being shutdown. 271 @type msg: str 272 @return: [code, msg, 0] 273 @rtype: [int, str, int] 274 """ 275 if msg: 276 print >> sys.stdout, "shutdown request: %s"%msg 277 else: 278 print >> sys.stdout, "shutdown requst" 279 self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)) 280 return 1, "shutdown", 0
281 282 @apivalidate('')
283 - def getUri(self, caller_id):
284 """ 285 Get the XML-RPC URI of this server. 286 @param caller_id str: ROS caller id 287 @return [int, str, str]: [1, "", xmlRpcUri] 288 """ 289 return 1, "", self.uri
290 291 292 @apivalidate(-1)
293 - def getPid(self, caller_id):
294 """ 295 Get the PID of this server 296 @param caller_id: ROS caller id 297 @type caller_id: str 298 @return: [1, "", serverProcessPID] 299 @rtype: [int, str, int] 300 """ 301 return 1, "", os.getpid()
302 303 304 ################################################################ 305 # PARAMETER SERVER ROUTINES 306 307 @apivalidate(0, (non_empty_str('key'),))
308 - def deleteParam(self, caller_id, key):
309 """ 310 Parameter Server: delete parameter 311 @param caller_id: ROS caller id 312 @type caller_id: str 313 @param key: parameter name 314 @type key: str 315 @return: [code, msg, 0] 316 @rtype: [int, str, int] 317 """ 318 try: 319 key = resolve_name(key, caller_id) 320 self.param_server.delete_param(key, self._notify_param_subscribers) 321 mloginfo("-PARAM [%s] by %s",key, caller_id) 322 return 1, "parameter %s deleted"%key, 0 323 except KeyError, e: 324 return -1, "parameter [%s] is not set"%key, 0
325 326 @apivalidate(0, (non_empty_str('key'), not_none('value')))
327 - def setParam(self, caller_id, key, value):
328 """ 329 Parameter Server: set parameter. NOTE: if value is a 330 dictionary it will be treated as a parameter tree, where key 331 is the parameter namespace. For example::: 332 {'x':1,'y':2,'sub':{'z':3}} 333 334 will set key/x=1, key/y=2, and key/sub/z=3. Furthermore, it 335 will replace all existing parameters in the key parameter 336 namespace with the parameters in value. You must set 337 parameters individually if you wish to perform a union update. 338 339 @param caller_id: ROS caller id 340 @type caller_id: str 341 @param key: parameter name 342 @type key: str 343 @param value: parameter value. 344 @type value: XMLRPCLegalValue 345 @return: [code, msg, 0] 346 @rtype: [int, str, int] 347 """ 348 key = resolve_name(key, caller_id) 349 self.param_server.set_param(key, value, self._notify_param_subscribers) 350 mloginfo("+PARAM [%s] by %s",key, caller_id) 351 return 1, "parameter %s set"%key, 0
352 353 @apivalidate(0, (non_empty_str('key'),))
354 - def getParam(self, caller_id, key):
355 """ 356 Retrieve parameter value from server. 357 @param caller_id: ROS caller id 358 @type caller_id: str 359 @param key: parameter to lookup. If key is a namespace, 360 getParam() will return a parameter tree. 361 @type key: str 362 getParam() will return a parameter tree. 363 364 @return: [code, statusMessage, parameterValue]. If code is not 365 1, parameterValue should be ignored. If key is a namespace, 366 the return value will be a dictionary, where each key is a 367 parameter in that namespace. Sub-namespaces are also 368 represented as dictionaries. 369 @rtype: [int, str, XMLRPCLegalValue] 370 """ 371 try: 372 key = resolve_name(key, caller_id) 373 return 1, "Parameter [%s]"%key, self.param_server.get_param(key) 374 except KeyError, e: 375 return -1, "Parameter [%s] is not set"%key, 0
376 377 @apivalidate(0, (non_empty_str('key'),))
378 - def searchParam(self, caller_id, key):
379 """ 380 Search for parameter key on parameter server. Search starts in caller's namespace and proceeds 381 upwards through parent namespaces until Parameter Server finds a matching key. 382 383 searchParam's behavior is to search for the first partial match. 384 For example, imagine that there are two 'robot_description' parameters:: 385 386 /robot_description 387 /robot_description/arm 388 /robot_description/base 389 /pr2/robot_description 390 /pr2/robot_description/base 391 392 If I start in the namespace /pr2/foo and search for 393 'robot_description', searchParam will match 394 /pr2/robot_description. If I search for 'robot_description/arm' 395 it will return /pr2/robot_description/arm, even though that 396 parameter does not exist (yet). 397 398 @param caller_id str: ROS caller id 399 @type caller_id: str 400 @param key: parameter key to search for. 401 @type key: str 402 @return: [code, statusMessage, foundKey]. If code is not 1, foundKey should be 403 ignored. 404 @rtype: [int, str, str] 405 """ 406 search_key = self.param_server.search_param(caller_id, key) 407 if search_key: 408 return 1, "Found [%s]"%search_key, search_key 409 else: 410 return -1, "Cannot find parameter [%s] in an upwards search"%key, ''
411 412 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
413 - def subscribeParam(self, caller_id, caller_api, key):
414 """ 415 Retrieve parameter value from server and subscribe to updates to that param. See 416 paramUpdate() in the Node API. 417 @param caller_id str: ROS caller id 418 @type caller_id: str 419 @param key: parameter to lookup. 420 @type key: str 421 @param caller_api: API URI for paramUpdate callbacks. 422 @type caller_api: str 423 @return: [code, statusMessage, parameterValue]. If code is not 424 1, parameterValue should be ignored. parameterValue is an empty dictionary if the parameter 425 has not been set yet. 426 @rtype: [int, str, XMLRPCLegalValue] 427 """ 428 key = resolve_name(key, caller_id) 429 try: 430 # ps_lock has precedence and is required due to 431 # potential self.reg_manager modification 432 self.ps_lock.acquire() 433 val = self.param_server.subscribe_param(key, (caller_id, caller_api)) 434 finally: 435 self.ps_lock.release() 436 return 1, "Subscribed to parameter [%s]"%key, val
437 438 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
439 - def unsubscribeParam(self, caller_id, caller_api, key):
440 """ 441 Retrieve parameter value from server and subscribe to updates to that param. See 442 paramUpdate() in the Node API. 443 @param caller_id str: ROS caller id 444 @type caller_id: str 445 @param key: parameter to lookup. 446 @type key: str 447 @param caller_api: API URI for paramUpdate callbacks. 448 @type caller_api: str 449 @return: [code, statusMessage, numUnsubscribed]. 450 If numUnsubscribed is zero it means that the caller was not subscribed to the parameter. 451 @rtype: [int, str, int] 452 """ 453 key = resolve_name(key, caller_id) 454 try: 455 # ps_lock is required due to potential self.reg_manager modification 456 self.ps_lock.acquire() 457 retval = self.param_server.unsubscribe_param(key, (caller_id, caller_api)) 458 finally: 459 self.ps_lock.release() 460 return 1, "Unsubscribe to parameter [%s]"%key, 1
461 462 463 @apivalidate(False, (non_empty_str('key'),))
464 - def hasParam(self, caller_id, key):
465 """ 466 Check if parameter is stored on server. 467 @param caller_id str: ROS caller id 468 @type caller_id: str 469 @param key: parameter to check 470 @type key: str 471 @return: [code, statusMessage, hasParam] 472 @rtype: [int, str, bool] 473 """ 474 key = resolve_name(key, caller_id) 475 if self.param_server.has_param(key): 476 return 1, key, True 477 else: 478 return 1, key, False
479 480 @apivalidate([])
481 - def getParamNames(self, caller_id):
482 """ 483 Get list of all parameter names stored on this server. 484 This does not adjust parameter names for caller's scope. 485 486 @param caller_id: ROS caller id 487 @type caller_id: str 488 @return: [code, statusMessage, parameterNameList] 489 @rtype: [int, str, [str]] 490 """ 491 return 1, "Parameter names", self.param_server.get_param_names()
492 493 ################################################################################## 494 # NOTIFICATION ROUTINES 495
496 - def _notify(self, registrations, task, key, value, node_apis):
497 """ 498 Generic implementation of callback notification 499 @param registrations: Registrations 500 @type registrations: L{Registrations} 501 @param task: task to queue 502 @type task: fn 503 @param key: registration key 504 @type key: str 505 @param value: value to pass to task 506 @type value: Any 507 """ 508 # cache thread_pool for thread safety 509 thread_pool = self.thread_pool 510 if not thread_pool: 511 return 512 513 try: 514 for node_api in node_apis: 515 # use the api as a marker so that we limit one thread per subscriber 516 thread_pool.queue_task(node_api, task, (node_api, key, value)) 517 except KeyError: 518 _logger.warn('subscriber data stale (key [%s], listener [%s]): node API unknown'%(key, s))
519
520 - def _notify_param_subscribers(self, updates):
521 """ 522 Notify parameter subscribers of new parameter value 523 @param updates [([str], str, any)*]: [(subscribers, param_key, param_value)*] 524 @param param_value str: parameter value 525 """ 526 # cache thread_pool for thread safety 527 thread_pool = self.thread_pool 528 if not thread_pool: 529 return 530 531 for subscribers, key, value in updates: 532 # use the api as a marker so that we limit one thread per subscriber 533 for caller_id, caller_api in subscribers: 534 self.thread_pool.queue_task(caller_api, self.param_update_task, (caller_id, caller_api, key, value))
535
536 - def param_update_task(self, caller_id, caller_api, param_key, param_value):
537 """ 538 Contact api.paramUpdate with specified parameters 539 @param caller_id: caller ID 540 @type caller_id: str 541 @param caller_api: XML-RPC URI of node to contact 542 @type caller_api: str 543 @param param_key: parameter key to pass to node 544 @type param_key: str 545 @param param_value: parameter value to pass to node 546 @type param_value: str 547 """ 548 mloginfo("paramUpdate[%s]", param_key) 549 code, _, _ = xmlrpcapi(caller_api).paramUpdate('/master', param_key, param_value) 550 if code == -1: 551 try: 552 # ps_lock is required due to potential self.reg_manager modification 553 self.ps_lock.acquire() 554 # reverse lookup to figure out who we just called 555 matches = self.reg_manager.reverse_lookup(caller_api) 556 for m in matches: 557 retval = self.param_server.unsubscribe_param(param_key, (m.id, caller_api)) 558 finally: 559 self.ps_lock.release()
560
561 - def _notify_topic_subscribers(self, topic, pub_uris, sub_uris):
562 """ 563 Notify subscribers with new publisher list 564 @param topic: name of topic 565 @type topic: str 566 @param pub_uris: list of URIs of publishers. 567 @type pub_uris: [str] 568 """ 569 self._notify(self.subscribers, publisher_update_task, topic, pub_uris, sub_uris)
570 571 ################################################################################## 572 # SERVICE PROVIDER 573 574 @apivalidate(0, ( is_service('service'), is_api('service_api'), is_api('caller_api')))
575 - def registerService(self, caller_id, service, service_api, caller_api):
576 """ 577 Register the caller as a provider of the specified service. 578 @param caller_id str: ROS caller id 579 @type caller_id: str 580 @param service: Fully-qualified name of service 581 @type service: str 582 @param service_api: Service URI 583 @type service_api: str 584 @param caller_api: XML-RPC URI of caller node 585 @type caller_api: str 586 @return: (code, message, ignore) 587 @rtype: (int, str, int) 588 """ 589 try: 590 self.ps_lock.acquire() 591 self.reg_manager.register_service(service, caller_id, caller_api, service_api) 592 mloginfo("+SERVICE [%s] %s %s", service, caller_id, caller_api) 593 finally: 594 self.ps_lock.release() 595 return 1, "Registered [%s] as provider of [%s]"%(caller_id, service), 1
596 597 @apivalidate(0, (is_service('service'),))
598 - def lookupService(self, caller_id, service):
599 """ 600 Lookup all provider of a particular service. 601 @param caller_id str: ROS caller id 602 @type caller_id: str 603 @param service: fully-qualified name of service to lookup. 604 @type: service: str 605 @return: (code, message, serviceUrl). service URL is provider's 606 ROSRPC URI with address and port. Fails if there is no provider. 607 @rtype: (int, str, str) 608 """ 609 try: 610 self.ps_lock.acquire() 611 service_url = self.services.get_service_api(service) 612 finally: 613 self.ps_lock.release() 614 if service_url: 615 return 1, "rosrpc URI: [%s]"%service_url, service_url 616 else: 617 return -1, "no provider", ''
618 619 @apivalidate(0, ( is_service('service'), is_api('service_api')))
620 - def unregisterService(self, caller_id, service, service_api):
621 """ 622 Unregister the caller as a provider of the specified service. 623 @param caller_id str: ROS caller id 624 @type caller_id: str 625 @param service: Fully-qualified name of service 626 @type service: str 627 @param service_api: API URI of service to unregister. Unregistration will only occur if current 628 registration matches. 629 @type service_api: str 630 @return: (code, message, numUnregistered). Number of unregistrations (either 0 or 1). 631 If this is zero it means that the caller was not registered as a service provider. 632 The call still succeeds as the intended final state is reached. 633 @rtype: (int, str, int) 634 """ 635 try: 636 self.ps_lock.acquire() 637 retval = self.reg_manager.unregister_service(service, caller_id, service_api) 638 mloginfo("-SERVICE [%s] %s %s", service, caller_id, service_api) 639 return retval 640 finally: 641 self.ps_lock.release()
642 643 ################################################################################## 644 # PUBLISH/SUBSCRIBE 645 646 @apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
647 - def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
648 """ 649 Subscribe the caller to the specified topic. In addition to receiving 650 a list of current publishers, the subscriber will also receive notifications 651 of new publishers via the publisherUpdate API. 652 @param caller_id: ROS caller id 653 @type caller_id: str 654 @param topic str: Fully-qualified name of topic to subscribe to. 655 @param topic_type: Datatype for topic. Must be a package-resource name, i.e. the .msg name. 656 @type topic_type: str 657 @param caller_api: XML-RPC URI of caller node for new publisher notifications 658 @type caller_api: str 659 @return: (code, message, publishers). Publishers is a list of XMLRPC API URIs 660 for nodes currently publishing the specified topic. 661 @rtype: (int, str, [str]) 662 """ 663 #NOTE: subscribers do not get to set topic type 664 try: 665 self.ps_lock.acquire() 666 self.reg_manager.register_subscriber(topic, caller_id, caller_api) 667 668 # ROS 1.1: subscriber can now set type if it is not already set 669 # - don't let '*' type squash valid typing 670 if not topic in self.topics_types and topic_type != rosgraph.names.ANYTYPE: 671 self.topics_types[topic] = topic_type 672 673 mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api) 674 pub_uris = self.publishers.get_apis(topic) 675 finally: 676 self.ps_lock.release() 677 return 1, "Subscribed to [%s]"%topic, pub_uris
678 679 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
680 - def unregisterSubscriber(self, caller_id, topic, caller_api):
681 """ 682 Unregister the caller as a publisher of the topic. 683 @param caller_id: ROS caller id 684 @type caller_id: str 685 @param topic: Fully-qualified name of topic to unregister. 686 @type topic: str 687 @param caller_api: API URI of service to unregister. Unregistration will only occur if current 688 registration matches. 689 @type caller_api: str 690 @return: (code, statusMessage, numUnsubscribed). 691 If numUnsubscribed is zero it means that the caller was not registered as a subscriber. 692 The call still succeeds as the intended final state is reached. 693 @rtype: (int, str, int) 694 """ 695 try: 696 self.ps_lock.acquire() 697 retval = self.reg_manager.unregister_subscriber(topic, caller_id, caller_api) 698 mloginfo("-SUB [%s] %s %s",topic, caller_id, caller_api) 699 return retval 700 finally: 701 self.ps_lock.release()
702 703 @apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
704 - def registerPublisher(self, caller_id, topic, topic_type, caller_api):
705 """ 706 Register the caller as a publisher the topic. 707 @param caller_id: ROS caller id 708 @type caller_id: str 709 @param topic: Fully-qualified name of topic to register. 710 @type topic: str 711 @param topic_type: Datatype for topic. Must be a 712 package-resource name, i.e. the .msg name. 713 @type topic_type: str 714 @param caller_api str: ROS caller XML-RPC API URI 715 @type caller_api: str 716 @return: (code, statusMessage, subscriberApis). 717 List of current subscribers of topic in the form of XMLRPC URIs. 718 @rtype: (int, str, [str]) 719 """ 720 #NOTE: we need topic_type for getPublishedTopics. 721 try: 722 self.ps_lock.acquire() 723 self.reg_manager.register_publisher(topic, caller_id, caller_api) 724 # don't let '*' type squash valid typing 725 if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types: 726 self.topics_types[topic] = topic_type 727 pub_uris = self.publishers.get_apis(topic) 728 sub_uris = self.subscribers.get_apis(topic) 729 self._notify_topic_subscribers(topic, pub_uris, sub_uris) 730 mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api) 731 sub_uris = self.subscribers.get_apis(topic) 732 finally: 733 self.ps_lock.release() 734 return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris
735 736 737 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
738 - def unregisterPublisher(self, caller_id, topic, caller_api):
739 """ 740 Unregister the caller as a publisher of the topic. 741 @param caller_id: ROS caller id 742 @type caller_id: str 743 @param topic: Fully-qualified name of topic to unregister. 744 @type topic: str 745 @param caller_api str: API URI of service to 746 unregister. Unregistration will only occur if current 747 registration matches. 748 @type caller_api: str 749 @return: (code, statusMessage, numUnregistered). 750 If numUnregistered is zero it means that the caller was not registered as a publisher. 751 The call still succeeds as the intended final state is reached. 752 @rtype: (int, str, int) 753 """ 754 try: 755 self.ps_lock.acquire() 756 retval = self.reg_manager.unregister_publisher(topic, caller_id, caller_api) 757 if retval[VAL]: 758 self._notify_topic_subscribers(topic, self.publishers.get_apis(topic), self.subscribers.get_apis(topic)) 759 mloginfo("-PUB [%s] %s %s",topic, caller_id, caller_api) 760 finally: 761 self.ps_lock.release() 762 return retval
763 764 ################################################################################## 765 # GRAPH STATE APIS 766 767 @apivalidate('', (valid_name('node'),))
768 - def lookupNode(self, caller_id, node_name):
769 """ 770 Get the XML-RPC URI of the node with the associated 771 name/caller_id. This API is for looking information about 772 publishers and subscribers. Use lookupService instead to lookup 773 ROS-RPC URIs. 774 @param caller_id: ROS caller id 775 @type caller_id: str 776 @param node: name of node to lookup 777 @type node: str 778 @return: (code, msg, URI) 779 @rtype: (int, str, str) 780 """ 781 try: 782 self.ps_lock.acquire() 783 node = self.reg_manager.get_node(node_name) 784 if node is not None: 785 retval = 1, "node api", node.api 786 else: 787 retval = -1, "unknown node [%s]"%node_name, '' 788 finally: 789 self.ps_lock.release() 790 return retval
791 792 @apivalidate(0, (empty_or_valid_name('subgraph'),))
793 - def getPublishedTopics(self, caller_id, subgraph):
794 """ 795 Get list of topics that can be subscribed to. This does not return topics that have no publishers. 796 See L{getSystemState()} to get more comprehensive list. 797 @param caller_id: ROS caller id 798 @type caller_id: str 799 @param subgraph: Restrict topic names to match within the specified subgraph. Subgraph namespace 800 is resolved relative to the caller's namespace. Use '' to specify all names. 801 @type subgraph: str 802 @return: (code, msg, [[topic1, type1]...[topicN, typeN]]) 803 @rtype: (int, str, [[str, str],]) 804 """ 805 try: 806 self.ps_lock.acquire() 807 # force subgraph to be a namespace with trailing slash 808 if subgraph and subgraph[-1] != rosgraph.names.SEP: 809 subgraph = subgraph + rosgraph.names.SEP 810 #we don't bother with subscribers as subscribers don't report topic types. also, the intended 811 #use case is for subscribe-by-topic-type 812 retval = [[t, self.topics_types[t]] for t in self.publishers.iterkeys() if t.startswith(subgraph)] 813 finally: 814 self.ps_lock.release() 815 return 1, "current topics", retval
816 817 @apivalidate([])
818 - def getTopicTypes(self, caller_id):
819 """ 820 Retrieve list topic names and their types. 821 @param caller_id: ROS caller id 822 @type caller_id: str 823 @rtype: (int, str, [[str,str]] ) 824 @return: (code, statusMessage, topicTypes). topicTypes is a list of [topicName, topicType] pairs. 825 """ 826 try: 827 self.ps_lock.acquire() 828 retval = self.topics_types.items() 829 finally: 830 self.ps_lock.release() 831 return 1, "current system state", retval
832 833 @apivalidate([[],[], []])
834 - def getSystemState(self, caller_id):
835 """ 836 Retrieve list representation of system state (i.e. publishers, subscribers, and services). 837 @param caller_id: ROS caller id 838 @type caller_id: str 839 @rtype: (int, str, [[str,[str]], [str,[str]], [str,[str]]]) 840 @return: (code, statusMessage, systemState). 841 842 System state is in list representation:: 843 [publishers, subscribers, services]. 844 845 publishers is of the form:: 846 [ [topic1, [topic1Publisher1...topic1PublisherN]] ... ] 847 848 subscribers is of the form:: 849 [ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ] 850 851 services is of the form:: 852 [ [service1, [service1Provider1...service1ProviderN]] ... ] 853 """ 854 edges = [] 855 try: 856 self.ps_lock.acquire() 857 retval = [r.get_state() for r in (self.publishers, self.subscribers, self.services)] 858 finally: 859 self.ps_lock.release() 860 return 1, "current system state", retval 861