Package rosmaster :: Module master_api

Source Code for Module rosmaster.master_api

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2008, Willow Garage, Inc.
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  #
 
 33  # Revision $Id: master_api.py 11176 2010-09-20 18:21:37Z kwc $
 
 34  """
 
 35  ROS Master API. 
 
 36  
 
 37  L{ROSMasterHandler} provides the API implementation of the
 
 38  Master. Python allows an API to be introspected from a Python class,
 
 39  so the handler has a 1-to-1 mapping with the actual XMLRPC API.
 
 40  
 
 41  API return convention: (statusCode, statusMessage, returnValue)
 
 42  
 
 43   - statusCode: an integer indicating the completion condition of the method. 
 
 44   - statusMessage: a human-readable string message for debugging
 
 45   - returnValue: the return value of the method; method-specific.
 
 46  
 
 47  Current status codes: 
 
 48  
 
 49   - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
 
 50   - 0: FAILURE: Method was attempted but failed to complete correctly.
 
 51   - 1: SUCCESS: Method completed successfully.
 
 52  
 
 53  Individual methods may assign additional meaning/semantics to statusCode.
 
 54  """ 
 55  
 
 56  import os 
 57  import sys 
 58  import logging 
 59  import threading 
 60  import time 
 61  import traceback 
 62  
 
 63  from roslib.xmlrpc import XmlRpcHandler 
 64  
 
 65  import roslib.names 
 66  from roslib.names import resolve_name 
 67  import rosmaster.paramserver 
 68  import rosmaster.threadpool 
 69  
 
 70  from rosmaster.util import xmlrpcapi 
 71  from rosmaster.registrations import RegistrationManager 
 72  from rosmaster.validators import non_empty, non_empty_str, not_none, is_api, is_topic, is_service, valid_type_name, valid_name, empty_or_valid_name, ParameterInvalid 
 73  
 
 74  NUM_WORKERS = 3 #number of threads we use to send publisher_update notifications 
 75  
 
 76  # Return code slots
 
 77  STATUS = 0 
 78  MSG = 1 
 79  VAL = 2 
 80  
 
 81  _logger = logging.getLogger("rosmaster.master") 
 82  
 
 83  LOG_API = False 
84 85 -def mloginfo(msg, *args):
86 """ 87 Info-level master log statements. These statements may be printed 88 to screen so they should be user-readable. 89 @param msg: Message string 90 @type msg: str 91 @param args: arguments for msg if msg is a format string 92 """ 93 #mloginfo is in core so that it is accessible to master and masterdata 94 _logger.info(msg, *args)
95
96 -def mlogwarn(msg, *args):
97 """ 98 Warn-level master log statements. These statements may be printed 99 to screen so they should be user-readable. 100 @param msg: Message string 101 @type msg: str 102 @param args: arguments for msg if msg is a format string 103 """ 104 #mloginfo is in core so that it is accessible to master and masterdata 105 _logger.warn(msg, *args) 106 if args: 107 print "WARN: "+msg%args 108 else: 109 print "WARN: "+str(msg)
110
111 112 -def apivalidate(error_return_value, validators=()):
113 """ 114 ROS master/slave arg-checking decorator. Applies the specified 115 validator to the corresponding argument and also remaps each 116 argument to be the value returned by the validator. Thus, 117 arguments can be simultaneously validated and canonicalized prior 118 to actual function call. 119 @param error_return_value: API value to return if call unexpectedly fails 120 @param validators: sequence of validators to apply to each 121 arg. None means no validation for the parameter is required. As all 122 api methods take caller_id as the first parameter, the validators 123 start with the second param. 124 @type validators: sequence 125 """ 126 def check_validates(f): 127 assert len(validators) == f.func_code.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id 128 def validated_f(*args, **kwds): 129 if LOG_API: 130 _logger.debug("%s%s", f.func_name, str(args[1:])) 131 #print "%s%s"%(f.func_name, str(args[1:])) 132 if len(args) == 1: 133 _logger.error("%s invoked without caller_id paramter"%f.func_name) 134 return -1, "missing required caller_id parameter", error_return_value 135 elif len(args) != f.func_code.co_argcount: 136 return -1, "Error: bad call arity", error_return_value 137 138 instance = args[0] 139 caller_id = args[1] 140 if not isinstance(caller_id, basestring): 141 _logger.error("%s: invalid caller_id param type", f.func_name) 142 return -1, "caller_id must be a string", error_return_value 143 144 newArgs = [instance, caller_id] #canonicalized args 145 try: 146 for (v, a) in zip(validators, args[2:]): 147 if v: 148 try: 149 newArgs.append(v(a, caller_id)) 150 except ParameterInvalid, e: 151 _logger.error("%s: invalid parameter: %s", f.func_name, str(e) or 'error') 152 return -1, str(e) or 'error', error_return_value 153 else: 154 newArgs.append(a) 155 156 if LOG_API: 157 retval = f(*newArgs, **kwds) 158 _logger.debug("%s%s returns %s", f.func_name, args[1:], retval) 159 return retval 160 else: 161 code, msg, val = f(*newArgs, **kwds) 162 if val is None: 163 return -1, "Internal error (None value returned)", error_return_value 164 return code, msg, val 165 except TypeError, te: #most likely wrong arg number 166 _logger.error(traceback.format_exc()) 167 return -1, "Error: invalid arguments: %s"%te, error_return_value 168 except Exception, e: #internal failure 169 _logger.error(traceback.format_exc()) 170 return 0, "Internal failure: %s"%e, error_return_value
171 validated_f.func_name = f.func_name 172 validated_f.__doc__ = f.__doc__ #preserve doc 173 return validated_f 174 return check_validates 175
176 -def publisher_update_task(api, topic, pub_uris):
177 """ 178 Contact api.publisherUpdate with specified parameters 179 @param api: XML-RPC URI of node to contact 180 @type api: str 181 @param topic: Topic name to send to node 182 @type topic: str 183 @param pub_uris: list of publisher APIs to send to node 184 @type pub_uris: [str] 185 """ 186 187 mloginfo("publisherUpdate[%s] -> %s", topic, api) 188 #TODO: check return value for errors so we can unsubscribe if stale 189 xmlrpcapi(api).publisherUpdate('/master', topic, pub_uris)
190
191 -def service_update_task(api, service, uri):
192 """ 193 Contact api.serviceUpdate with specified parameters 194 @param api: XML-RPC URI of node to contact 195 @type api: str 196 @param service: Service name to send to node 197 @type service: str 198 @param uri: URI to send to node 199 @type uri: str 200 """ 201 mloginfo("serviceUpdate[%s, %s] -> %s",service, uri, api) 202 xmlrpcapi(api).serviceUpdate('/master', service, uri)
203
204 ################################################### 205 # Master Implementation 206 207 -class ROSMasterHandler(object):
208 """ 209 XML-RPC handler for ROS master APIs. 210 API routines for the ROS Master Node. The Master Node is a 211 superset of the Slave Node and contains additional API methods for 212 creating and monitoring a graph of slave nodes. 213 214 By convention, ROS nodes take in caller_id as the first parameter 215 of any API call. The setting of this parameter is rarely done by 216 client code as ros::msproxy::MasterProxy automatically inserts 217 this parameter (see ros::client::getMaster()). 218 """ 219
220 - def __init__(self):
221 """ctor.""" 222 223 self.uri = None 224 self.done = False 225 226 self.thread_pool = rosmaster.threadpool.MarkedThreadPool(NUM_WORKERS) 227 # pub/sub/providers: dict { topicName : [publishers/subscribers names] } 228 self.ps_lock = threading.Condition(threading.Lock()) 229 230 self.reg_manager = RegistrationManager(self.thread_pool) 231 232 # maintain refs to reg_manager fields 233 self.publishers = self.reg_manager.publishers 234 self.subscribers = self.reg_manager.subscribers 235 self.services = self.reg_manager.services 236 self.param_subscribers = self.reg_manager.param_subscribers 237 238 self.topics_types = {} #dict { topicName : type } 239 240 # parameter server dictionary 241 self.param_server = rosmaster.paramserver.ParamDictionary(self.reg_manager)
242
243 - def _shutdown(self, reason=''):
244 if self.thread_pool is not None: 245 self.thread_pool.join_all(wait_for_tasks=False, wait_for_threads=False) 246 self.thread_pool = None 247 self.done = True
248
249 - def _ready(self, uri):
250 """ 251 Initialize the handler with the XMLRPC URI. This is a standard callback from the roslib.xmlrpc.XmlRpcNode API. 252 253 @param uri: XML-RPC URI 254 @type uri: str 255 """ 256 self.uri = uri
257
258 - def _ok(self):
259 return not self.done
260 261 ############################################################################### 262 # EXTERNAL API 263 264 @apivalidate(0, (None, ))
265 - def shutdown(self, caller_id, msg=''):
266 """ 267 Stop this server 268 @param caller_id: ROS caller id 269 @type caller_id: str 270 @param msg: a message describing why the node is being shutdown. 271 @type msg: str 272 @return: [code, msg, 0] 273 @rtype: [int, str, int] 274 """ 275 if msg: 276 print >> sys.stdout, "shutdown request: %s"%msg 277 else: 278 print >> sys.stdout, "shutdown requst" 279 self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)) 280 return 1, "shutdown", 0
281 282 @apivalidate('')
283 - def getUri(self, caller_id):
284 """ 285 Get the XML-RPC URI of this server. 286 @param caller_id str: ROS caller id 287 @return [int, str, str]: [1, "", xmlRpcUri] 288 """ 289 return 1, "", self.uri
290 291 292 @apivalidate(-1)
293 - def getPid(self, caller_id):
294 """ 295 Get the PID of this server 296 @param caller_id: ROS caller id 297 @type caller_id: str 298 @return: [1, "", serverProcessPID] 299 @rtype: [int, str, int] 300 """ 301 return 1, "", os.getpid()
302 303 304 ################################################################ 305 # PARAMETER SERVER ROUTINES 306 307 @apivalidate(0, (non_empty_str('key'),))
308 - def deleteParam(self, caller_id, key):
309 """ 310 Parameter Server: delete parameter 311 @param caller_id: ROS caller id 312 @type caller_id: str 313 @param key: parameter name 314 @type key: str 315 @return: [code, msg, 0] 316 @rtype: [int, str, int] 317 """ 318 try: 319 key = resolve_name(key, caller_id) 320 self.param_server.delete_param(key, self._notify_param_subscribers) 321 mloginfo("-PARAM [%s] by %s",key, caller_id) 322 return 1, "parameter %s deleted"%key, 0 323 except KeyError, e: 324 return -1, "parameter [%s] is not set"%key, 0
325 326 @apivalidate(0, (non_empty_str('key'), not_none('value')))
327 - def setParam(self, caller_id, key, value):
328 """ 329 Parameter Server: set parameter. NOTE: if value is a 330 dictionary it will be treated as a parameter tree, where key 331 is the parameter namespace. For example::: 332 {'x':1,'y':2,'sub':{'z':3}} 333 334 will set key/x=1, key/y=2, and key/sub/z=3. Furthermore, it 335 will replace all existing parameters in the key parameter 336 namespace with the parameters in value. You must set 337 parameters individually if you wish to perform a union update. 338 339 @param caller_id: ROS caller id 340 @type caller_id: str 341 @param key: parameter name 342 @type key: str 343 @param value: parameter value. 344 @type value: XMLRPCLegalValue 345 @return: [code, msg, 0] 346 @rtype: [int, str, int] 347 """ 348 key = resolve_name(key, caller_id) 349 self.param_server.set_param(key, value, self._notify_param_subscribers) 350 mloginfo("+PARAM [%s] by %s",key, caller_id) 351 return 1, "parameter %s set"%key, 0
352 353 @apivalidate(0, (non_empty_str('key'),))
354 - def getParam(self, caller_id, key):
355 """ 356 Retrieve parameter value from server. 357 @param caller_id: ROS caller id 358 @type caller_id: str 359 @param key: parameter to lookup. If key is a namespace, 360 getParam() will return a parameter tree. 361 @type key: str 362 getParam() will return a parameter tree. 363 364 @return: [code, statusMessage, parameterValue]. If code is not 365 1, parameterValue should be ignored. If key is a namespace, 366 the return value will be a dictionary, where each key is a 367 parameter in that namespace. Sub-namespaces are also 368 represented as dictionaries. 369 @rtype: [int, str, XMLRPCLegalValue] 370 """ 371 try: 372 key = resolve_name(key, caller_id) 373 return 1, "Parameter [%s]"%key, self.param_server.get_param(key) 374 except KeyError, e: 375 return -1, "Parameter [%s] is not set"%key, 0
376 377 @apivalidate(0, (non_empty_str('key'),))
378 - def searchParam(self, caller_id, key):
379 """ 380 Search for parameter key on parameter server. Search starts in caller's namespace and proceeds 381 upwards through parent namespaces until Parameter Server finds a matching key. 382 383 searchParam's behavior is to search for the first partial match. 384 For example, imagine that there are two 'robot_description' parameters:: 385 386 /robot_description 387 /robot_description/arm 388 /robot_description/base 389 /pr2/robot_description 390 /pr2/robot_description/base 391 392 If I start in the namespace /pr2/foo and search for 393 'robot_description', searchParam will match 394 /pr2/robot_description. If I search for 'robot_description/arm' 395 it will return /pr2/robot_description/arm, even though that 396 parameter does not exist (yet). 397 398 @param caller_id str: ROS caller id 399 @type caller_id: str 400 @param key: parameter key to search for. 401 @type key: str 402 @return: [code, statusMessage, foundKey]. If code is not 1, foundKey should be 403 ignored. 404 @rtype: [int, str, str] 405 """ 406 search_key = self.param_server.search_param(caller_id, key) 407 if search_key: 408 return 1, "Found [%s]"%search_key, search_key 409 else: 410 return -1, "Cannot find parameter [%s] in an upwards search"%key, ''
411 412 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
413 - def subscribeParam(self, caller_id, caller_api, key):
414 """ 415 Retrieve parameter value from server and subscribe to updates to that param. See 416 paramUpdate() in the Node API. 417 @param caller_id str: ROS caller id 418 @type caller_id: str 419 @param key: parameter to lookup. 420 @type key: str 421 @param caller_api: API URI for paramUpdate callbacks. 422 @type caller_api: str 423 @return: [code, statusMessage, parameterValue]. If code is not 424 1, parameterValue should be ignored. parameterValue is an empty dictionary if the parameter 425 has not been set yet. 426 @rtype: [int, str, XMLRPCLegalValue] 427 """ 428 key = resolve_name(key, caller_id) 429 try: 430 # ps_lock has precedence and is required due to 431 # potential self.reg_manager modification 432 self.ps_lock.acquire() 433 val = self.param_server.subscribe_param(key, (caller_id, caller_api)) 434 finally: 435 self.ps_lock.release() 436 return 1, "Subscribed to parameter [%s]"%key, val
437 438 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
439 - def unsubscribeParam(self, caller_id, caller_api, key):
440 """ 441 Retrieve parameter value from server and subscribe to updates to that param. See 442 paramUpdate() in the Node API. 443 @param caller_id str: ROS caller id 444 @type caller_id: str 445 @param key: parameter to lookup. 446 @type key: str 447 @param caller_api: API URI for paramUpdate callbacks. 448 @type caller_api: str 449 @return: [code, statusMessage, numUnsubscribed]. 450 If numUnsubscribed is zero it means that the caller was not subscribed to the parameter. 451 @rtype: [int, str, int] 452 """ 453 key = resolve_name(key, caller_id) 454 try: 455 # ps_lock is required due to potential self.reg_manager modification 456 self.ps_lock.acquire() 457 retval = self.param_server.unsubscribe_param(key, (caller_id, caller_api)) 458 finally: 459 self.ps_lock.release() 460 return 1, "Unsubscribe to parameter [%s]"%key, 1
461 462 463 @apivalidate(False, (non_empty_str('key'),))
464 - def hasParam(self, caller_id, key):
465 """ 466 Check if parameter is stored on server. 467 @param caller_id str: ROS caller id 468 @type caller_id: str 469 @param key: parameter to check 470 @type key: str 471 @return: [code, statusMessage, hasParam] 472 @rtype: [int, str, bool] 473 """ 474 key = resolve_name(key, caller_id) 475 if self.param_server.has_param(key): 476 return 1, key, True 477 else: 478 return 1, key, False
479 480 @apivalidate([])
481 - def getParamNames(self, caller_id):
482 """ 483 Get list of all parameter names stored on this server. 484 This does not adjust parameter names for caller's scope. 485 486 @param caller_id: ROS caller id 487 @type caller_id: str 488 @return: [code, statusMessage, parameterNameList] 489 @rtype: [int, str, [str]] 490 """ 491 return 1, "Parameter names", self.param_server.get_param_names()
492 493 ################################################################################## 494 # NOTIFICATION ROUTINES 495
496 - def _notify(self, registrations, task, key, value):
497 """ 498 Generic implementation of callback notification 499 @param registrations: Registrations 500 @type registrations: L{Registrations} 501 @param task: task to queue 502 @type task: fn 503 @param key: registration key 504 @type key: str 505 @param value: value to pass to task 506 @type value: Any 507 """ 508 # cache thread_pool for thread safety 509 thread_pool = self.thread_pool 510 if not thread_pool: 511 return 512 513 if registrations.has_key(key): 514 try: 515 for node_api in registrations.get_apis(key): 516 # use the api as a marker so that we limit one thread per subscriber 517 thread_pool.queue_task(node_api, task, (node_api, key, value)) 518 except KeyError: 519 _logger.warn('subscriber data stale (key [%s], listener [%s]): node API unknown'%(key, s))
520
521 - def _notify_param_subscribers(self, updates):
522 """ 523 Notify parameter subscribers of new parameter value 524 @param updates [([str], str, any)*]: [(subscribers, param_key, param_value)*] 525 @param param_value str: parameter value 526 """ 527 # cache thread_pool for thread safety 528 thread_pool = self.thread_pool 529 if not thread_pool: 530 return 531 532 for subscribers, key, value in updates: 533 # use the api as a marker so that we limit one thread per subscriber 534 for caller_id, caller_api in subscribers: 535 self.thread_pool.queue_task(caller_api, self.param_update_task, (caller_id, caller_api, key, value))
536
537 - def param_update_task(self, caller_id, caller_api, param_key, param_value):
538 """ 539 Contact api.paramUpdate with specified parameters 540 @param caller_id: caller ID 541 @type caller_id: str 542 @param caller_api: XML-RPC URI of node to contact 543 @type caller_api: str 544 @param param_key: parameter key to pass to node 545 @type param_key: str 546 @param param_value: parameter value to pass to node 547 @type param_value: str 548 """ 549 mloginfo("paramUpdate[%s]", param_key) 550 code, _, _ = xmlrpcapi(caller_api).paramUpdate('/master', param_key, param_value) 551 if code == -1: 552 try: 553 # ps_lock is required due to potential self.reg_manager modification 554 self.ps_lock.acquire() 555 # reverse lookup to figure out who we just called 556 matches = self.reg_manager.reverse_lookup(caller_api) 557 for m in matches: 558 retval = self.param_server.unsubscribe_param(param_key, (m.id, caller_api)) 559 finally: 560 self.ps_lock.release()
561 562
563 - def _notify_topic_subscribers(self, topic, pub_uris):
564 """ 565 Notify subscribers with new publisher list 566 @param topic: name of topic 567 @type topic: str 568 @param pub_uris: list of URIs of publishers. 569 @type pub_uris: [str] 570 """ 571 self._notify(self.subscribers, publisher_update_task, topic, pub_uris)
572
573 - def _notify_service_update(self, service, service_api):
574 """ 575 Notify clients of new service provider 576 @param service: name of service 577 @type service: str 578 @param service_api: new service URI 579 @type service_api: str 580 """ 581 ###TODO:XXX:stub code, this callback doesnot exist yet 582 self._notify(self.service_clients, service_update_task, service, service_api)
583 584 ################################################################################## 585 # SERVICE PROVIDER 586 587 @apivalidate(0, ( is_service('service'), is_api('service_api'), is_api('caller_api')))
588 - def registerService(self, caller_id, service, service_api, caller_api):
589 """ 590 Register the caller as a provider of the specified service. 591 @param caller_id str: ROS caller id 592 @type caller_id: str 593 @param service: Fully-qualified name of service 594 @type service: str 595 @param service_api: Service URI 596 @type service_api: str 597 @param caller_api: XML-RPC URI of caller node 598 @type caller_api: str 599 @return: (code, message, ignore) 600 @rtype: (int, str, int) 601 """ 602 try: 603 self.ps_lock.acquire() 604 self.reg_manager.register_service(service, caller_id, caller_api, service_api) 605 mloginfo("+SERVICE [%s] %s %s", service, caller_id, caller_api) 606 if 0: #TODO 607 self._notify_service_update(service, service_api) 608 finally: 609 self.ps_lock.release() 610 return 1, "Registered [%s] as provider of [%s]"%(caller_id, service), 1
611 612 @apivalidate(0, (is_service('service'),))
613 - def lookupService(self, caller_id, service):
614 """ 615 Lookup all provider of a particular service. 616 @param caller_id str: ROS caller id 617 @type caller_id: str 618 @param service: fully-qualified name of service to lookup. 619 @type: service: str 620 @return: (code, message, serviceUrl). service URL is provider's 621 ROSRPC URI with address and port. Fails if there is no provider. 622 @rtype: (int, str, str) 623 """ 624 try: 625 self.ps_lock.acquire() 626 service_url = self.services.get_service_api(service) 627 finally: 628 self.ps_lock.release() 629 if service_url: 630 return 1, "rosrpc URI: [%s]"%service_url, service_url 631 else: 632 return -1, "no provider", ''
633 634 @apivalidate(0, ( is_service('service'), is_api('service_api')))
635 - def unregisterService(self, caller_id, service, service_api):
636 """ 637 Unregister the caller as a provider of the specified service. 638 @param caller_id str: ROS caller id 639 @type caller_id: str 640 @param service: Fully-qualified name of service 641 @type service: str 642 @param service_api: API URI of service to unregister. Unregistration will only occur if current 643 registration matches. 644 @type service_api: str 645 @return: (code, message, numUnregistered). Number of unregistrations (either 0 or 1). 646 If this is zero it means that the caller was not registered as a service provider. 647 The call still succeeds as the intended final state is reached. 648 @rtype: (int, str, int) 649 """ 650 try: 651 self.ps_lock.acquire() 652 retval = self.reg_manager.unregister_service(service, caller_id, service_api) 653 if 0: #TODO 654 self._notify_service_update(service, service_api) 655 mloginfo("-SERVICE [%s] %s %s", service, caller_id, service_api) 656 return retval 657 finally: 658 self.ps_lock.release()
659 660 ################################################################################## 661 # PUBLISH/SUBSCRIBE 662 663 @apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
664 - def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
665 """ 666 Subscribe the caller to the specified topic. In addition to receiving 667 a list of current publishers, the subscriber will also receive notifications 668 of new publishers via the publisherUpdate API. 669 @param caller_id: ROS caller id 670 @type caller_id: str 671 @param topic str: Fully-qualified name of topic to subscribe to. 672 @param topic_type: Datatype for topic. Must be a package-resource name, i.e. the .msg name. 673 @type topic_type: str 674 @param caller_api: XML-RPC URI of caller node for new publisher notifications 675 @type caller_api: str 676 @return: (code, message, publishers). Publishers is a list of XMLRPC API URIs 677 for nodes currently publishing the specified topic. 678 @rtype: (int, str, [str]) 679 """ 680 #NOTE: subscribers do not get to set topic type 681 try: 682 self.ps_lock.acquire() 683 self.reg_manager.register_subscriber(topic, caller_id, caller_api) 684 685 # ROS 1.1: subscriber can now set type if it is not already set 686 # - don't let '*' type squash valid typing 687 if not topic in self.topics_types and topic_type != roslib.names.ANYTYPE: 688 self.topics_types[topic] = topic_type 689 690 mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api) 691 pub_uris = self.publishers.get_apis(topic) 692 finally: 693 self.ps_lock.release() 694 return 1, "Subscribed to [%s]"%topic, pub_uris
695 696 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
697 - def unregisterSubscriber(self, caller_id, topic, caller_api):
698 """ 699 Unregister the caller as a publisher of the topic. 700 @param caller_id: ROS caller id 701 @type caller_id: str 702 @param topic: Fully-qualified name of topic to unregister. 703 @type topic: str 704 @param caller_api: API URI of service to unregister. Unregistration will only occur if current 705 registration matches. 706 @type caller_api: str 707 @return: (code, statusMessage, numUnsubscribed). 708 If numUnsubscribed is zero it means that the caller was not registered as a subscriber. 709 The call still succeeds as the intended final state is reached. 710 @rtype: (int, str, int) 711 """ 712 try: 713 self.ps_lock.acquire() 714 retval = self.reg_manager.unregister_subscriber(topic, caller_id, caller_api) 715 mloginfo("-SUB [%s] %s %s",topic, caller_id, caller_api) 716 return retval 717 finally: 718 self.ps_lock.release()
719 720 @apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
721 - def registerPublisher(self, caller_id, topic, topic_type, caller_api):
722 """ 723 Register the caller as a publisher the topic. 724 @param caller_id: ROS caller id 725 @type caller_id: str 726 @param topic: Fully-qualified name of topic to register. 727 @type topic: str 728 @param topic_type: Datatype for topic. Must be a 729 package-resource name, i.e. the .msg name. 730 @type topic_type: str 731 @param caller_api str: ROS caller XML-RPC API URI 732 @type caller_api: str 733 @return: (code, statusMessage, subscriberApis). 734 List of current subscribers of topic in the form of XMLRPC URIs. 735 @rtype: (int, str, [str]) 736 """ 737 #NOTE: we need topic_type for getPublishedTopics. 738 try: 739 self.ps_lock.acquire() 740 self.reg_manager.register_publisher(topic, caller_id, caller_api) 741 # don't let '*' type squash valid typing 742 if topic_type != roslib.names.ANYTYPE or not topic in self.topics_types: 743 self.topics_types[topic] = topic_type 744 pub_uris = self.publishers.get_apis(topic) 745 self._notify_topic_subscribers(topic, pub_uris) 746 mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api) 747 sub_uris = self.subscribers.get_apis(topic) 748 finally: 749 self.ps_lock.release() 750 return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris
751 752 753 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
754 - def unregisterPublisher(self, caller_id, topic, caller_api):
755 """ 756 Unregister the caller as a publisher of the topic. 757 @param caller_id: ROS caller id 758 @type caller_id: str 759 @param topic: Fully-qualified name of topic to unregister. 760 @type topic: str 761 @param caller_api str: API URI of service to 762 unregister. Unregistration will only occur if current 763 registration matches. 764 @type caller_api: str 765 @return: (code, statusMessage, numUnregistered). 766 If numUnregistered is zero it means that the caller was not registered as a publisher. 767 The call still succeeds as the intended final state is reached. 768 @rtype: (int, str, int) 769 """ 770 try: 771 self.ps_lock.acquire() 772 retval = self.reg_manager.unregister_publisher(topic, caller_id, caller_api) 773 if retval[VAL]: 774 self._notify_topic_subscribers(topic, self.publishers.get_apis(topic)) 775 mloginfo("-PUB [%s] %s %s",topic, caller_id, caller_api) 776 finally: 777 self.ps_lock.release() 778 return retval
779 780 ################################################################################## 781 # GRAPH STATE APIS 782 783 @apivalidate('', (valid_name('node'),))
784 - def lookupNode(self, caller_id, node_name):
785 """ 786 Get the XML-RPC URI of the node with the associated 787 name/caller_id. This API is for looking information about 788 publishers and subscribers. Use lookupService instead to lookup 789 ROS-RPC URIs. 790 @param caller_id: ROS caller id 791 @type caller_id: str 792 @param node: name of node to lookup 793 @type node: str 794 @return: (code, msg, URI) 795 @rtype: (int, str, str) 796 """ 797 try: 798 self.ps_lock.acquire() 799 node = self.reg_manager.get_node(node_name) 800 if node is not None: 801 retval = 1, "node api", node.api 802 else: 803 retval = -1, "unknown node [%s]"%node_name, '' 804 finally: 805 self.ps_lock.release() 806 return retval
807 808 @apivalidate(0, (empty_or_valid_name('subgraph'),))
809 - def getPublishedTopics(self, caller_id, subgraph):
810 """ 811 Get list of topics that can be subscribed to. This does not return topics that have no publishers. 812 See L{getSystemState()} to get more comprehensive list. 813 @param caller_id: ROS caller id 814 @type caller_id: str 815 @param subgraph: Restrict topic names to match within the specified subgraph. Subgraph namespace 816 is resolved relative to the caller's namespace. Use '' to specify all names. 817 @type subgraph: str 818 @return: (code, msg, [[topic1, type1]...[topicN, typeN]]) 819 @rtype: (int, str, [[str, str],]) 820 """ 821 try: 822 self.ps_lock.acquire() 823 # force subgraph to be a namespace with trailing slash 824 if subgraph and subgraph[-1] != roslib.names.SEP: 825 subgraph = subgraph + roslib.names.SEP 826 #we don't bother with subscribers as subscribers don't report topic types. also, the intended 827 #use case is for subscribe-by-topic-type 828 retval = [[t, self.topics_types[t]] for t in self.publishers.iterkeys() if t.startswith(subgraph)] 829 finally: 830 self.ps_lock.release() 831 return 1, "current topics", retval
832 833 @apivalidate([])
834 - def getTopicTypes(self, caller_id):
835 """ 836 Retrieve list topic names and their types. 837 @param caller_id: ROS caller id 838 @type caller_id: str 839 @rtype: (int, str, [[str,str]] ) 840 @return: (code, statusMessage, topicTypes). topicTypes is a list of [topicName, topicType] pairs. 841 """ 842 try: 843 self.ps_lock.acquire() 844 retval = self.topics_types.items() 845 finally: 846 self.ps_lock.release() 847 return 1, "current system state", retval
848 849 @apivalidate([[],[], []])
850 - def getSystemState(self, caller_id):
851 """ 852 Retrieve list representation of system state (i.e. publishers, subscribers, and services). 853 @param caller_id: ROS caller id 854 @type caller_id: str 855 @rtype: (int, str, [[str,[str]], [str,[str]], [str,[str]]]) 856 @return: (code, statusMessage, systemState). 857 858 System state is in list representation:: 859 [publishers, subscribers, services]. 860 861 publishers is of the form:: 862 [ [topic1, [topic1Publisher1...topic1PublisherN]] ... ] 863 864 subscribers is of the form:: 865 [ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ] 866 867 services is of the form:: 868 [ [service1, [service1Provider1...service1ProviderN]] ... ] 869 """ 870 edges = [] 871 try: 872 self.ps_lock.acquire() 873 retval = [r.get_state() for r in (self.publishers, self.subscribers, self.services)] 874 finally: 875 self.ps_lock.release() 876 return 1, "current system state", retval 877