Package rosmaster :: Module master_api

Source Code for Module rosmaster.master_api

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2008, Willow Garage, Inc.
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  #
 
 33  # Revision $Id$
 
 34  """
 
 35  ROS Master API. 
 
 36  
 
 37  L{ROSMasterHandler} provides the API implementation of the
 
 38  Master. Python allows an API to be introspected from a Python class,
 
 39  so the handler has a 1-to-1 mapping with the actual XMLRPC API.
 
 40  
 
 41  API return convention: (statusCode, statusMessage, returnValue)
 
 42  
 
 43   - statusCode: an integer indicating the completion condition of the method. 
 
 44   - statusMessage: a human-readable string message for debugging
 
 45   - returnValue: the return value of the method; method-specific.
 
 46  
 
 47  Current status codes: 
 
 48  
 
 49   - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
 
 50   - 0: FAILURE: Method was attempted but failed to complete correctly.
 
 51   - 1: SUCCESS: Method completed successfully.
 
 52  
 
 53  Individual methods may assign additional meaning/semantics to statusCode.
 
 54  """ 
 55  
 
 56  from __future__ import print_function 
 57  
 
 58  import os 
 59  import sys 
 60  import logging 
 61  import threading 
 62  import time 
 63  import traceback 
 64  
 
 65  from rosgraph.xmlrpc import XmlRpcHandler 
 66  
 
 67  import rosgraph.names 
 68  from rosgraph.names import resolve_name 
 69  import rosmaster.paramserver 
 70  import rosmaster.threadpool 
 71  
 
 72  from rosmaster.util import xmlrpcapi 
 73  from rosmaster.registrations import RegistrationManager 
 74  from rosmaster.validators import non_empty, non_empty_str, not_none, is_api, is_topic, is_service, valid_type_name, valid_name, empty_or_valid_name, ParameterInvalid 
 75  
 
 76  NUM_WORKERS = 3 #number of threads we use to send publisher_update notifications 
 77  
 
 78  # Return code slots
 
 79  STATUS = 0 
 80  MSG = 1 
 81  VAL = 2 
 82  
 
 83  _logger = logging.getLogger("rosmaster.master") 
 84  
 
 85  LOG_API = False 
86 87 -def mloginfo(msg, *args):
88 """ 89 Info-level master log statements. These statements may be printed 90 to screen so they should be user-readable. 91 @param msg: Message string 92 @type msg: str 93 @param args: arguments for msg if msg is a format string 94 """ 95 #mloginfo is in core so that it is accessible to master and masterdata 96 _logger.info(msg, *args)
97
98 -def mlogwarn(msg, *args):
99 """ 100 Warn-level master log statements. These statements may be printed 101 to screen so they should be user-readable. 102 @param msg: Message string 103 @type msg: str 104 @param args: arguments for msg if msg is a format string 105 """ 106 #mloginfo is in core so that it is accessible to master and masterdata 107 _logger.warn(msg, *args) 108 if args: 109 print("WARN: " + msg % args) 110 else: 111 print("WARN: " + str(msg))
112
113 114 -def apivalidate(error_return_value, validators=()):
115 """ 116 ROS master/slave arg-checking decorator. Applies the specified 117 validator to the corresponding argument and also remaps each 118 argument to be the value returned by the validator. Thus, 119 arguments can be simultaneously validated and canonicalized prior 120 to actual function call. 121 @param error_return_value: API value to return if call unexpectedly fails 122 @param validators: sequence of validators to apply to each 123 arg. None means no validation for the parameter is required. As all 124 api methods take caller_id as the first parameter, the validators 125 start with the second param. 126 @type validators: sequence 127 """ 128 def check_validates(f): 129 try: 130 func_code = f.__code__ 131 func_name = f.__name__ 132 except AttributeError: 133 func_code = f.func_code 134 func_name = f.func_name 135 assert len(validators) == func_code.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id 136 def validated_f(*args, **kwds): 137 if LOG_API: 138 _logger.debug("%s%s", func_name, str(args[1:])) 139 #print "%s%s"%(func_name, str(args[1:])) 140 if len(args) == 1: 141 _logger.error("%s invoked without caller_id paramter" % func_name) 142 return -1, "missing required caller_id parameter", error_return_value 143 elif len(args) != func_code.co_argcount: 144 return -1, "Error: bad call arity", error_return_value 145 146 instance = args[0] 147 caller_id = args[1] 148 def isstring(s): 149 """Small helper version to check an object is a string in 150 a way that works for both Python 2 and 3 151 """ 152 try: 153 return isinstance(s, basestring) 154 except NameError: 155 return isinstance(s, str)
156 if not isstring(caller_id): 157 _logger.error("%s: invalid caller_id param type", func_name) 158 return -1, "caller_id must be a string", error_return_value 159 160 newArgs = [instance, caller_id] #canonicalized args 161 try: 162 for (v, a) in zip(validators, args[2:]): 163 if v: 164 try: 165 newArgs.append(v(a, caller_id)) 166 except ParameterInvalid as e: 167 _logger.error("%s: invalid parameter: %s", func_name, str(e) or 'error') 168 return -1, str(e) or 'error', error_return_value 169 else: 170 newArgs.append(a) 171 172 if LOG_API: 173 retval = f(*newArgs, **kwds) 174 _logger.debug("%s%s returns %s", func_name, args[1:], retval) 175 return retval 176 else: 177 code, msg, val = f(*newArgs, **kwds) 178 if val is None: 179 return -1, "Internal error (None value returned)", error_return_value 180 return code, msg, val 181 except TypeError as te: #most likely wrong arg number 182 _logger.error(traceback.format_exc()) 183 return -1, "Error: invalid arguments: %s"%te, error_return_value 184 except Exception as e: #internal failure 185 _logger.error(traceback.format_exc()) 186 return 0, "Internal failure: %s"%e, error_return_value 187 try: 188 validated_f.__name__ = func_name 189 except AttributeError: 190 validated_f.func_name = func_name 191 validated_f.__doc__ = f.__doc__ #preserve doc 192 return validated_f 193 return check_validates 194
195 -def publisher_update_task(api, topic, pub_uris):
196 """ 197 Contact api.publisherUpdate with specified parameters 198 @param api: XML-RPC URI of node to contact 199 @type api: str 200 @param topic: Topic name to send to node 201 @type topic: str 202 @param pub_uris: list of publisher APIs to send to node 203 @type pub_uris: [str] 204 """ 205 msg = "publisherUpdate[%s] -> %s %s" % (topic, api, pub_uris) 206 mloginfo(msg) 207 start_sec = time.time() 208 try: 209 #TODO: check return value for errors so we can unsubscribe if stale 210 ret = xmlrpcapi(api).publisherUpdate('/master', topic, pub_uris) 211 msg_suffix = "result=%s" % ret 212 except Exception as ex: 213 msg_suffix = "exception=%s" % ex 214 raise 215 finally: 216 delta_sec = time.time() - start_sec 217 mloginfo("%s: sec=%0.2f, %s", msg, delta_sec, msg_suffix)
218
219 220 -def service_update_task(api, service, uri):
221 """ 222 Contact api.serviceUpdate with specified parameters 223 @param api: XML-RPC URI of node to contact 224 @type api: str 225 @param service: Service name to send to node 226 @type service: str 227 @param uri: URI to send to node 228 @type uri: str 229 """ 230 mloginfo("serviceUpdate[%s, %s] -> %s",service, uri, api) 231 xmlrpcapi(api).serviceUpdate('/master', service, uri)
232
233 ################################################### 234 # Master Implementation 235 236 -class ROSMasterHandler(object):
237 """ 238 XML-RPC handler for ROS master APIs. 239 API routines for the ROS Master Node. The Master Node is a 240 superset of the Slave Node and contains additional API methods for 241 creating and monitoring a graph of slave nodes. 242 243 By convention, ROS nodes take in caller_id as the first parameter 244 of any API call. The setting of this parameter is rarely done by 245 client code as ros::msproxy::MasterProxy automatically inserts 246 this parameter (see ros::client::getMaster()). 247 """ 248
249 - def __init__(self, num_workers=NUM_WORKERS):
250 """ctor.""" 251 252 self.uri = None 253 self.done = False 254 255 self.thread_pool = rosmaster.threadpool.MarkedThreadPool(num_workers) 256 # pub/sub/providers: dict { topicName : [publishers/subscribers names] } 257 self.ps_lock = threading.Condition(threading.Lock()) 258 259 self.reg_manager = RegistrationManager(self.thread_pool) 260 261 # maintain refs to reg_manager fields 262 self.publishers = self.reg_manager.publishers 263 self.subscribers = self.reg_manager.subscribers 264 self.services = self.reg_manager.services 265 self.param_subscribers = self.reg_manager.param_subscribers 266 267 self.topics_types = {} #dict { topicName : type } 268 269 # parameter server dictionary 270 self.param_server = rosmaster.paramserver.ParamDictionary(self.reg_manager)
271
272 - def _shutdown(self, reason=''):
273 if self.thread_pool is not None: 274 self.thread_pool.join_all(wait_for_tasks=False, wait_for_threads=False) 275 self.thread_pool = None 276 self.done = True
277
278 - def _ready(self, uri):
279 """ 280 Initialize the handler with the XMLRPC URI. This is a standard callback from the XmlRpcNode API. 281 282 @param uri: XML-RPC URI 283 @type uri: str 284 """ 285 self.uri = uri
286
287 - def _ok(self):
288 return not self.done
289 290 ############################################################################### 291 # EXTERNAL API 292 293 @apivalidate(0, (None, ))
294 - def shutdown(self, caller_id, msg=''):
295 """ 296 Stop this server 297 @param caller_id: ROS caller id 298 @type caller_id: str 299 @param msg: a message describing why the node is being shutdown. 300 @type msg: str 301 @return: [code, msg, 0] 302 @rtype: [int, str, int] 303 """ 304 if msg: 305 print("shutdown request: %s" % msg, file=sys.stdout) 306 else: 307 print("shutdown requst", file=sys.stdout) 308 self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)) 309 return 1, "shutdown", 0
310 311 @apivalidate('')
312 - def getUri(self, caller_id):
313 """ 314 Get the XML-RPC URI of this server. 315 @param caller_id str: ROS caller id 316 @return [int, str, str]: [1, "", xmlRpcUri] 317 """ 318 return 1, "", self.uri
319 320 321 @apivalidate(-1)
322 - def getPid(self, caller_id):
323 """ 324 Get the PID of this server 325 @param caller_id: ROS caller id 326 @type caller_id: str 327 @return: [1, "", serverProcessPID] 328 @rtype: [int, str, int] 329 """ 330 return 1, "", os.getpid()
331 332 333 ################################################################ 334 # PARAMETER SERVER ROUTINES 335 336 @apivalidate(0, (non_empty_str('key'),))
337 - def deleteParam(self, caller_id, key):
338 """ 339 Parameter Server: delete parameter 340 @param caller_id: ROS caller id 341 @type caller_id: str 342 @param key: parameter name 343 @type key: str 344 @return: [code, msg, 0] 345 @rtype: [int, str, int] 346 """ 347 try: 348 key = resolve_name(key, caller_id) 349 self.param_server.delete_param(key, self._notify_param_subscribers) 350 mloginfo("-PARAM [%s] by %s",key, caller_id) 351 return 1, "parameter %s deleted"%key, 0 352 except KeyError as e: 353 return -1, "parameter [%s] is not set"%key, 0
354 355 @apivalidate(0, (non_empty_str('key'), not_none('value')))
356 - def setParam(self, caller_id, key, value):
357 """ 358 Parameter Server: set parameter. NOTE: if value is a 359 dictionary it will be treated as a parameter tree, where key 360 is the parameter namespace. For example::: 361 {'x':1,'y':2,'sub':{'z':3}} 362 363 will set key/x=1, key/y=2, and key/sub/z=3. Furthermore, it 364 will replace all existing parameters in the key parameter 365 namespace with the parameters in value. You must set 366 parameters individually if you wish to perform a union update. 367 368 @param caller_id: ROS caller id 369 @type caller_id: str 370 @param key: parameter name 371 @type key: str 372 @param value: parameter value. 373 @type value: XMLRPCLegalValue 374 @return: [code, msg, 0] 375 @rtype: [int, str, int] 376 """ 377 key = resolve_name(key, caller_id) 378 self.param_server.set_param(key, value, self._notify_param_subscribers, caller_id) 379 mloginfo("+PARAM [%s] by %s",key, caller_id) 380 return 1, "parameter %s set"%key, 0
381 382 @apivalidate(0, (non_empty_str('key'),))
383 - def getParam(self, caller_id, key):
384 """ 385 Retrieve parameter value from server. 386 @param caller_id: ROS caller id 387 @type caller_id: str 388 @param key: parameter to lookup. If key is a namespace, 389 getParam() will return a parameter tree. 390 @type key: str 391 getParam() will return a parameter tree. 392 393 @return: [code, statusMessage, parameterValue]. If code is not 394 1, parameterValue should be ignored. If key is a namespace, 395 the return value will be a dictionary, where each key is a 396 parameter in that namespace. Sub-namespaces are also 397 represented as dictionaries. 398 @rtype: [int, str, XMLRPCLegalValue] 399 """ 400 try: 401 key = resolve_name(key, caller_id) 402 return 1, "Parameter [%s]"%key, self.param_server.get_param(key) 403 except KeyError as e: 404 return -1, "Parameter [%s] is not set"%key, 0
405 406 @apivalidate(0, (non_empty_str('key'),))
407 - def searchParam(self, caller_id, key):
408 """ 409 Search for parameter key on parameter server. Search starts in caller's namespace and proceeds 410 upwards through parent namespaces until Parameter Server finds a matching key. 411 412 searchParam's behavior is to search for the first partial match. 413 For example, imagine that there are two 'robot_description' parameters:: 414 415 /robot_description 416 /robot_description/arm 417 /robot_description/base 418 /pr2/robot_description 419 /pr2/robot_description/base 420 421 If I start in the namespace /pr2/foo and search for 422 'robot_description', searchParam will match 423 /pr2/robot_description. If I search for 'robot_description/arm' 424 it will return /pr2/robot_description/arm, even though that 425 parameter does not exist (yet). 426 427 @param caller_id str: ROS caller id 428 @type caller_id: str 429 @param key: parameter key to search for. 430 @type key: str 431 @return: [code, statusMessage, foundKey]. If code is not 1, foundKey should be 432 ignored. 433 @rtype: [int, str, str] 434 """ 435 search_key = self.param_server.search_param(caller_id, key) 436 if search_key: 437 return 1, "Found [%s]"%search_key, search_key 438 else: 439 return -1, "Cannot find parameter [%s] in an upwards search"%key, ''
440 441 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
442 - def subscribeParam(self, caller_id, caller_api, key):
443 """ 444 Retrieve parameter value from server and subscribe to updates to that param. See 445 paramUpdate() in the Node API. 446 @param caller_id str: ROS caller id 447 @type caller_id: str 448 @param key: parameter to lookup. 449 @type key: str 450 @param caller_api: API URI for paramUpdate callbacks. 451 @type caller_api: str 452 @return: [code, statusMessage, parameterValue]. If code is not 453 1, parameterValue should be ignored. parameterValue is an empty dictionary if the parameter 454 has not been set yet. 455 @rtype: [int, str, XMLRPCLegalValue] 456 """ 457 key = resolve_name(key, caller_id) 458 try: 459 # ps_lock has precedence and is required due to 460 # potential self.reg_manager modification 461 self.ps_lock.acquire() 462 val = self.param_server.subscribe_param(key, (caller_id, caller_api)) 463 finally: 464 self.ps_lock.release() 465 mloginfo("+CACHEDPARAM [%s] by %s",key, caller_id) 466 return 1, "Subscribed to parameter [%s]"%key, val
467 468 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
469 - def unsubscribeParam(self, caller_id, caller_api, key):
470 """ 471 Retrieve parameter value from server and subscribe to updates to that param. See 472 paramUpdate() in the Node API. 473 @param caller_id str: ROS caller id 474 @type caller_id: str 475 @param key: parameter to lookup. 476 @type key: str 477 @param caller_api: API URI for paramUpdate callbacks. 478 @type caller_api: str 479 @return: [code, statusMessage, numUnsubscribed]. 480 If numUnsubscribed is zero it means that the caller was not subscribed to the parameter. 481 @rtype: [int, str, int] 482 """ 483 key = resolve_name(key, caller_id) 484 try: 485 # ps_lock is required due to potential self.reg_manager modification 486 self.ps_lock.acquire() 487 retval = self.param_server.unsubscribe_param(key, (caller_id, caller_api)) 488 finally: 489 self.ps_lock.release() 490 mloginfo("-CACHEDPARAM [%s] by %s",key, caller_id) 491 return 1, "Unsubscribe to parameter [%s]"%key, 1
492 493 494 @apivalidate(False, (non_empty_str('key'),))
495 - def hasParam(self, caller_id, key):
496 """ 497 Check if parameter is stored on server. 498 @param caller_id str: ROS caller id 499 @type caller_id: str 500 @param key: parameter to check 501 @type key: str 502 @return: [code, statusMessage, hasParam] 503 @rtype: [int, str, bool] 504 """ 505 key = resolve_name(key, caller_id) 506 if self.param_server.has_param(key): 507 return 1, key, True 508 else: 509 return 1, key, False
510 511 @apivalidate([])
512 - def getParamNames(self, caller_id):
513 """ 514 Get list of all parameter names stored on this server. 515 This does not adjust parameter names for caller's scope. 516 517 @param caller_id: ROS caller id 518 @type caller_id: str 519 @return: [code, statusMessage, parameterNameList] 520 @rtype: [int, str, [str]] 521 """ 522 return 1, "Parameter names", self.param_server.get_param_names()
523 524 ################################################################################## 525 # NOTIFICATION ROUTINES 526
527 - def _notify(self, registrations, task, key, value, node_apis):
528 """ 529 Generic implementation of callback notification 530 @param registrations: Registrations 531 @type registrations: L{Registrations} 532 @param task: task to queue 533 @type task: fn 534 @param key: registration key 535 @type key: str 536 @param value: value to pass to task 537 @type value: Any 538 """ 539 # cache thread_pool for thread safety 540 thread_pool = self.thread_pool 541 if not thread_pool: 542 return 543 544 try: 545 for node_api in node_apis: 546 # use the api as a marker so that we limit one thread per subscriber 547 thread_pool.queue_task(node_api, task, (node_api, key, value)) 548 except KeyError: 549 _logger.warn('subscriber data stale (key [%s], listener [%s]): node API unknown'%(key, s))
550
551 - def _notify_param_subscribers(self, updates):
552 """ 553 Notify parameter subscribers of new parameter value 554 @param updates [([str], str, any)*]: [(subscribers, param_key, param_value)*] 555 @param param_value str: parameter value 556 """ 557 # cache thread_pool for thread safety 558 thread_pool = self.thread_pool 559 if not thread_pool: 560 return 561 562 for subscribers, key, value in updates: 563 # use the api as a marker so that we limit one thread per subscriber 564 for caller_id, caller_api in subscribers: 565 self.thread_pool.queue_task(caller_api, self.param_update_task, (caller_id, caller_api, key, value))
566
567 - def param_update_task(self, caller_id, caller_api, param_key, param_value):
568 """ 569 Contact api.paramUpdate with specified parameters 570 @param caller_id: caller ID 571 @type caller_id: str 572 @param caller_api: XML-RPC URI of node to contact 573 @type caller_api: str 574 @param param_key: parameter key to pass to node 575 @type param_key: str 576 @param param_value: parameter value to pass to node 577 @type param_value: str 578 """ 579 mloginfo("paramUpdate[%s]", param_key) 580 code, _, _ = xmlrpcapi(caller_api).paramUpdate('/master', param_key, param_value) 581 if code == -1: 582 try: 583 # ps_lock is required due to potential self.reg_manager modification 584 self.ps_lock.acquire() 585 # reverse lookup to figure out who we just called 586 matches = self.reg_manager.reverse_lookup(caller_api) 587 for m in matches: 588 retval = self.param_server.unsubscribe_param(param_key, (m.id, caller_api)) 589 finally: 590 self.ps_lock.release()
591
592 - def _notify_topic_subscribers(self, topic, pub_uris, sub_uris):
593 """ 594 Notify subscribers with new publisher list 595 @param topic: name of topic 596 @type topic: str 597 @param pub_uris: list of URIs of publishers. 598 @type pub_uris: [str] 599 """ 600 self._notify(self.subscribers, publisher_update_task, topic, pub_uris, sub_uris)
601 602 ################################################################################## 603 # SERVICE PROVIDER 604 605 @apivalidate(0, ( is_service('service'), is_api('service_api'), is_api('caller_api')))
606 - def registerService(self, caller_id, service, service_api, caller_api):
607 """ 608 Register the caller as a provider of the specified service. 609 @param caller_id str: ROS caller id 610 @type caller_id: str 611 @param service: Fully-qualified name of service 612 @type service: str 613 @param service_api: Service URI 614 @type service_api: str 615 @param caller_api: XML-RPC URI of caller node 616 @type caller_api: str 617 @return: (code, message, ignore) 618 @rtype: (int, str, int) 619 """ 620 try: 621 self.ps_lock.acquire() 622 self.reg_manager.register_service(service, caller_id, caller_api, service_api) 623 mloginfo("+SERVICE [%s] %s %s", service, caller_id, caller_api) 624 finally: 625 self.ps_lock.release() 626 return 1, "Registered [%s] as provider of [%s]"%(caller_id, service), 1
627 628 @apivalidate('', (is_service('service'),))
629 - def lookupService(self, caller_id, service):
630 """ 631 Lookup all provider of a particular service. 632 @param caller_id str: ROS caller id 633 @type caller_id: str 634 @param service: fully-qualified name of service to lookup. 635 @type: service: str 636 @return: (code, message, serviceUrl). service URL is provider's 637 ROSRPC URI with address and port. Fails if there is no provider. 638 @rtype: (int, str, str) 639 """ 640 try: 641 self.ps_lock.acquire() 642 service_url = self.services.get_service_api(service) 643 finally: 644 self.ps_lock.release() 645 if service_url: 646 return 1, "rosrpc URI: [%s]"%service_url, service_url 647 else: 648 return -1, "no provider", ''
649 650 @apivalidate(0, ( is_service('service'), is_api('service_api')))
651 - def unregisterService(self, caller_id, service, service_api):
652 """ 653 Unregister the caller as a provider of the specified service. 654 @param caller_id str: ROS caller id 655 @type caller_id: str 656 @param service: Fully-qualified name of service 657 @type service: str 658 @param service_api: API URI of service to unregister. Unregistration will only occur if current 659 registration matches. 660 @type service_api: str 661 @return: (code, message, numUnregistered). Number of unregistrations (either 0 or 1). 662 If this is zero it means that the caller was not registered as a service provider. 663 The call still succeeds as the intended final state is reached. 664 @rtype: (int, str, int) 665 """ 666 try: 667 self.ps_lock.acquire() 668 retval = self.reg_manager.unregister_service(service, caller_id, service_api) 669 mloginfo("-SERVICE [%s] %s %s", service, caller_id, service_api) 670 return retval 671 finally: 672 self.ps_lock.release()
673 674 ################################################################################## 675 # PUBLISH/SUBSCRIBE 676 677 @apivalidate([], ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
678 - def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
679 """ 680 Subscribe the caller to the specified topic. In addition to receiving 681 a list of current publishers, the subscriber will also receive notifications 682 of new publishers via the publisherUpdate API. 683 @param caller_id: ROS caller id 684 @type caller_id: str 685 @param topic str: Fully-qualified name of topic to subscribe to. 686 @param topic_type: Datatype for topic. Must be a package-resource name, i.e. the .msg name. 687 @type topic_type: str 688 @param caller_api: XML-RPC URI of caller node for new publisher notifications 689 @type caller_api: str 690 @return: (code, message, publishers). Publishers is a list of XMLRPC API URIs 691 for nodes currently publishing the specified topic. 692 @rtype: (int, str, [str]) 693 """ 694 #NOTE: subscribers do not get to set topic type 695 try: 696 self.ps_lock.acquire() 697 self.reg_manager.register_subscriber(topic, caller_id, caller_api) 698 699 # ROS 1.1: subscriber can now set type if it is not already set 700 # - don't let '*' type squash valid typing 701 if not topic in self.topics_types and topic_type != rosgraph.names.ANYTYPE: 702 self.topics_types[topic] = topic_type 703 704 mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api) 705 pub_uris = self.publishers.get_apis(topic) 706 finally: 707 self.ps_lock.release() 708 return 1, "Subscribed to [%s]"%topic, pub_uris
709 710 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
711 - def unregisterSubscriber(self, caller_id, topic, caller_api):
712 """ 713 Unregister the caller as a subscriber of the topic. 714 @param caller_id: ROS caller id 715 @type caller_id: str 716 @param topic: Fully-qualified name of topic to unregister. 717 @type topic: str 718 @param caller_api: API URI of service to unregister. Unregistration will only occur if current 719 registration matches. 720 @type caller_api: str 721 @return: (code, statusMessage, numUnsubscribed). 722 If numUnsubscribed is zero it means that the caller was not registered as a subscriber. 723 The call still succeeds as the intended final state is reached. 724 @rtype: (int, str, int) 725 """ 726 try: 727 self.ps_lock.acquire() 728 retval = self.reg_manager.unregister_subscriber(topic, caller_id, caller_api) 729 mloginfo("-SUB [%s] %s %s",topic, caller_id, caller_api) 730 return retval 731 finally: 732 self.ps_lock.release()
733 734 @apivalidate([], ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
735 - def registerPublisher(self, caller_id, topic, topic_type, caller_api):
736 """ 737 Register the caller as a publisher the topic. 738 @param caller_id: ROS caller id 739 @type caller_id: str 740 @param topic: Fully-qualified name of topic to register. 741 @type topic: str 742 @param topic_type: Datatype for topic. Must be a 743 package-resource name, i.e. the .msg name. 744 @type topic_type: str 745 @param caller_api str: ROS caller XML-RPC API URI 746 @type caller_api: str 747 @return: (code, statusMessage, subscriberApis). 748 List of current subscribers of topic in the form of XMLRPC URIs. 749 @rtype: (int, str, [str]) 750 """ 751 #NOTE: we need topic_type for getPublishedTopics. 752 try: 753 self.ps_lock.acquire() 754 self.reg_manager.register_publisher(topic, caller_id, caller_api) 755 # don't let '*' type squash valid typing 756 if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types: 757 self.topics_types[topic] = topic_type 758 pub_uris = self.publishers.get_apis(topic) 759 sub_uris = self.subscribers.get_apis(topic) 760 self._notify_topic_subscribers(topic, pub_uris, sub_uris) 761 mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api) 762 sub_uris = self.subscribers.get_apis(topic) 763 finally: 764 self.ps_lock.release() 765 return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris
766 767 768 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
769 - def unregisterPublisher(self, caller_id, topic, caller_api):
770 """ 771 Unregister the caller as a publisher of the topic. 772 @param caller_id: ROS caller id 773 @type caller_id: str 774 @param topic: Fully-qualified name of topic to unregister. 775 @type topic: str 776 @param caller_api str: API URI of service to 777 unregister. Unregistration will only occur if current 778 registration matches. 779 @type caller_api: str 780 @return: (code, statusMessage, numUnregistered). 781 If numUnregistered is zero it means that the caller was not registered as a publisher. 782 The call still succeeds as the intended final state is reached. 783 @rtype: (int, str, int) 784 """ 785 try: 786 self.ps_lock.acquire() 787 retval = self.reg_manager.unregister_publisher(topic, caller_id, caller_api) 788 if retval[VAL]: 789 self._notify_topic_subscribers(topic, self.publishers.get_apis(topic), self.subscribers.get_apis(topic)) 790 mloginfo("-PUB [%s] %s %s",topic, caller_id, caller_api) 791 finally: 792 self.ps_lock.release() 793 return retval
794 795 ################################################################################## 796 # GRAPH STATE APIS 797 798 @apivalidate('', (valid_name('node'),))
799 - def lookupNode(self, caller_id, node_name):
800 """ 801 Get the XML-RPC URI of the node with the associated 802 name/caller_id. This API is for looking information about 803 publishers and subscribers. Use lookupService instead to lookup 804 ROS-RPC URIs. 805 @param caller_id: ROS caller id 806 @type caller_id: str 807 @param node: name of node to lookup 808 @type node: str 809 @return: (code, msg, URI) 810 @rtype: (int, str, str) 811 """ 812 try: 813 self.ps_lock.acquire() 814 node = self.reg_manager.get_node(node_name) 815 if node is not None: 816 retval = 1, "node api", node.api 817 else: 818 retval = -1, "unknown node [%s]"%node_name, '' 819 finally: 820 self.ps_lock.release() 821 return retval
822 823 @apivalidate(0, (empty_or_valid_name('subgraph'),))
824 - def getPublishedTopics(self, caller_id, subgraph):
825 """ 826 Get list of topics that can be subscribed to. This does not return topics that have no publishers. 827 See L{getSystemState()} to get more comprehensive list. 828 @param caller_id: ROS caller id 829 @type caller_id: str 830 @param subgraph: Restrict topic names to match within the specified subgraph. Subgraph namespace 831 is resolved relative to the caller's namespace. Use '' to specify all names. 832 @type subgraph: str 833 @return: (code, msg, [[topic1, type1]...[topicN, typeN]]) 834 @rtype: (int, str, [[str, str],]) 835 """ 836 try: 837 self.ps_lock.acquire() 838 # force subgraph to be a namespace with trailing slash 839 if subgraph and subgraph[-1] != rosgraph.names.SEP: 840 subgraph = subgraph + rosgraph.names.SEP 841 #we don't bother with subscribers as subscribers don't report topic types. also, the intended 842 #use case is for subscribe-by-topic-type 843 retval = [[t, self.topics_types[t]] for t in self.publishers.iterkeys() if t.startswith(subgraph)] 844 finally: 845 self.ps_lock.release() 846 return 1, "current topics", retval
847 848 @apivalidate([])
849 - def getTopicTypes(self, caller_id):
850 """ 851 Retrieve list topic names and their types. 852 @param caller_id: ROS caller id 853 @type caller_id: str 854 @rtype: (int, str, [[str,str]] ) 855 @return: (code, statusMessage, topicTypes). topicTypes is a list of [topicName, topicType] pairs. 856 """ 857 try: 858 self.ps_lock.acquire() 859 retval = list(self.topics_types.items()) 860 finally: 861 self.ps_lock.release() 862 return 1, "current system state", retval
863 864 @apivalidate([[],[], []])
865 - def getSystemState(self, caller_id):
866 """ 867 Retrieve list representation of system state (i.e. publishers, subscribers, and services). 868 @param caller_id: ROS caller id 869 @type caller_id: str 870 @rtype: (int, str, [[str,[str]], [str,[str]], [str,[str]]]) 871 @return: (code, statusMessage, systemState). 872 873 System state is in list representation:: 874 [publishers, subscribers, services]. 875 876 publishers is of the form:: 877 [ [topic1, [topic1Publisher1...topic1PublisherN]] ... ] 878 879 subscribers is of the form:: 880 [ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ] 881 882 services is of the form:: 883 [ [service1, [service1Provider1...service1ProviderN]] ... ] 884 """ 885 edges = [] 886 try: 887 self.ps_lock.acquire() 888 retval = [r.get_state() for r in (self.publishers, self.subscribers, self.services)] 889 finally: 890 self.ps_lock.release() 891 return 1, "current system state", retval 892