Package rosmaster :: Module master_api

Source Code for Module rosmaster.master_api

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2008, Willow Garage, Inc.
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  #
 
 33  # Revision $Id$
 
 34  """
 
 35  ROS Master API. 
 
 36  
 
 37  L{ROSMasterHandler} provides the API implementation of the
 
 38  Master. Python allows an API to be introspected from a Python class,
 
 39  so the handler has a 1-to-1 mapping with the actual XMLRPC API.
 
 40  
 
 41  API return convention: (statusCode, statusMessage, returnValue)
 
 42  
 
 43   - statusCode: an integer indicating the completion condition of the method. 
 
 44   - statusMessage: a human-readable string message for debugging
 
 45   - returnValue: the return value of the method; method-specific.
 
 46  
 
 47  Current status codes: 
 
 48  
 
 49   - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
 
 50   - 0: FAILURE: Method was attempted but failed to complete correctly.
 
 51   - 1: SUCCESS: Method completed successfully.
 
 52  
 
 53  Individual methods may assign additional meaning/semantics to statusCode.
 
 54  """ 
 55  
 
 56  from __future__ import print_function 
 57  
 
 58  import os 
 59  import sys 
 60  import logging 
 61  import threading 
 62  import time 
 63  import traceback 
 64  
 
 65  from rosgraph.xmlrpc import XmlRpcHandler 
 66  
 
 67  import rosgraph.names 
 68  from rosgraph.names import resolve_name 
 69  import rosmaster.paramserver 
 70  import rosmaster.threadpool 
 71  
 
 72  from rosmaster.util import xmlrpcapi 
 73  from rosmaster.registrations import RegistrationManager 
 74  from rosmaster.validators import non_empty, non_empty_str, not_none, is_api, is_topic, is_service, valid_type_name, valid_name, empty_or_valid_name, ParameterInvalid 
 75  
 
 76  NUM_WORKERS = 3 #number of threads we use to send publisher_update notifications 
 77  
 
 78  # Return code slots
 
 79  STATUS = 0 
 80  MSG = 1 
 81  VAL = 2 
 82  
 
 83  _logger = logging.getLogger("rosmaster.master") 
 84  
 
 85  LOG_API = False 
86 87 -def mloginfo(msg, *args):
88 """ 89 Info-level master log statements. These statements may be printed 90 to screen so they should be user-readable. 91 @param msg: Message string 92 @type msg: str 93 @param args: arguments for msg if msg is a format string 94 """ 95 #mloginfo is in core so that it is accessible to master and masterdata 96 _logger.info(msg, *args)
97
98 -def mlogwarn(msg, *args):
99 """ 100 Warn-level master log statements. These statements may be printed 101 to screen so they should be user-readable. 102 @param msg: Message string 103 @type msg: str 104 @param args: arguments for msg if msg is a format string 105 """ 106 #mloginfo is in core so that it is accessible to master and masterdata 107 _logger.warn(msg, *args) 108 if args: 109 print("WARN: " + msg % args) 110 else: 111 print("WARN: " + str(msg))
112
113 114 -def apivalidate(error_return_value, validators=()):
115 """ 116 ROS master/slave arg-checking decorator. Applies the specified 117 validator to the corresponding argument and also remaps each 118 argument to be the value returned by the validator. Thus, 119 arguments can be simultaneously validated and canonicalized prior 120 to actual function call. 121 @param error_return_value: API value to return if call unexpectedly fails 122 @param validators: sequence of validators to apply to each 123 arg. None means no validation for the parameter is required. As all 124 api methods take caller_id as the first parameter, the validators 125 start with the second param. 126 @type validators: sequence 127 """ 128 def check_validates(f): 129 try: 130 func_code = f.__code__ 131 func_name = f.__name__ 132 except AttributeError: 133 func_code = f.func_code 134 func_name = f.func_name 135 assert len(validators) == func_code.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id 136 def validated_f(*args, **kwds): 137 if LOG_API: 138 _logger.debug("%s%s", func_name, str(args[1:])) 139 #print "%s%s"%(func_name, str(args[1:])) 140 if len(args) == 1: 141 _logger.error("%s invoked without caller_id paramter" % func_name) 142 return -1, "missing required caller_id parameter", error_return_value 143 elif len(args) != func_code.co_argcount: 144 return -1, "Error: bad call arity", error_return_value 145 146 instance = args[0] 147 caller_id = args[1] 148 def isstring(s): 149 """Small helper version to check an object is a string in 150 a way that works for both Python 2 and 3 151 """ 152 try: 153 return isinstance(s, basestring) 154 except NameError: 155 return isinstance(s, str)
156 if not isstring(caller_id): 157 _logger.error("%s: invalid caller_id param type", func_name) 158 return -1, "caller_id must be a string", error_return_value 159 160 newArgs = [instance, caller_id] #canonicalized args 161 try: 162 for (v, a) in zip(validators, args[2:]): 163 if v: 164 try: 165 newArgs.append(v(a, caller_id)) 166 except ParameterInvalid as e: 167 _logger.error("%s: invalid parameter: %s", func_name, str(e) or 'error') 168 return -1, str(e) or 'error', error_return_value 169 else: 170 newArgs.append(a) 171 172 if LOG_API: 173 retval = f(*newArgs, **kwds) 174 _logger.debug("%s%s returns %s", func_name, args[1:], retval) 175 return retval 176 else: 177 code, msg, val = f(*newArgs, **kwds) 178 if val is None: 179 return -1, "Internal error (None value returned)", error_return_value 180 return code, msg, val 181 except TypeError as te: #most likely wrong arg number 182 _logger.error(traceback.format_exc()) 183 return -1, "Error: invalid arguments: %s"%te, error_return_value 184 except Exception as e: #internal failure 185 _logger.error(traceback.format_exc()) 186 return 0, "Internal failure: %s"%e, error_return_value 187 try: 188 validated_f.__name__ = func_name 189 except AttributeError: 190 validated_f.func_name = func_name 191 validated_f.__doc__ = f.__doc__ #preserve doc 192 return validated_f 193 return check_validates 194
195 -def publisher_update_task(api, topic, pub_uris):
196 """ 197 Contact api.publisherUpdate with specified parameters 198 @param api: XML-RPC URI of node to contact 199 @type api: str 200 @param topic: Topic name to send to node 201 @type topic: str 202 @param pub_uris: list of publisher APIs to send to node 203 @type pub_uris: [str] 204 """ 205 206 mloginfo("publisherUpdate[%s] -> %s", topic, api) 207 #TODO: check return value for errors so we can unsubscribe if stale 208 xmlrpcapi(api).publisherUpdate('/master', topic, pub_uris)
209
210 -def service_update_task(api, service, uri):
211 """ 212 Contact api.serviceUpdate with specified parameters 213 @param api: XML-RPC URI of node to contact 214 @type api: str 215 @param service: Service name to send to node 216 @type service: str 217 @param uri: URI to send to node 218 @type uri: str 219 """ 220 mloginfo("serviceUpdate[%s, %s] -> %s",service, uri, api) 221 xmlrpcapi(api).serviceUpdate('/master', service, uri)
222
223 ################################################### 224 # Master Implementation 225 226 -class ROSMasterHandler(object):
227 """ 228 XML-RPC handler for ROS master APIs. 229 API routines for the ROS Master Node. The Master Node is a 230 superset of the Slave Node and contains additional API methods for 231 creating and monitoring a graph of slave nodes. 232 233 By convention, ROS nodes take in caller_id as the first parameter 234 of any API call. The setting of this parameter is rarely done by 235 client code as ros::msproxy::MasterProxy automatically inserts 236 this parameter (see ros::client::getMaster()). 237 """ 238
239 - def __init__(self, num_workers=NUM_WORKERS):
240 """ctor.""" 241 242 self.uri = None 243 self.done = False 244 245 self.thread_pool = rosmaster.threadpool.MarkedThreadPool(num_workers) 246 # pub/sub/providers: dict { topicName : [publishers/subscribers names] } 247 self.ps_lock = threading.Condition(threading.Lock()) 248 249 self.reg_manager = RegistrationManager(self.thread_pool) 250 251 # maintain refs to reg_manager fields 252 self.publishers = self.reg_manager.publishers 253 self.subscribers = self.reg_manager.subscribers 254 self.services = self.reg_manager.services 255 self.param_subscribers = self.reg_manager.param_subscribers 256 257 self.topics_types = {} #dict { topicName : type } 258 259 # parameter server dictionary 260 self.param_server = rosmaster.paramserver.ParamDictionary(self.reg_manager)
261
262 - def _shutdown(self, reason=''):
263 if self.thread_pool is not None: 264 self.thread_pool.join_all(wait_for_tasks=False, wait_for_threads=False) 265 self.thread_pool = None 266 self.done = True
267
268 - def _ready(self, uri):
269 """ 270 Initialize the handler with the XMLRPC URI. This is a standard callback from the XmlRpcNode API. 271 272 @param uri: XML-RPC URI 273 @type uri: str 274 """ 275 self.uri = uri
276
277 - def _ok(self):
278 return not self.done
279 280 ############################################################################### 281 # EXTERNAL API 282 283 @apivalidate(0, (None, ))
284 - def shutdown(self, caller_id, msg=''):
285 """ 286 Stop this server 287 @param caller_id: ROS caller id 288 @type caller_id: str 289 @param msg: a message describing why the node is being shutdown. 290 @type msg: str 291 @return: [code, msg, 0] 292 @rtype: [int, str, int] 293 """ 294 if msg: 295 print("shutdown request: %s" % msg, file=sys.stdout) 296 else: 297 print("shutdown requst", file=sys.stdout) 298 self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)) 299 return 1, "shutdown", 0
300 301 @apivalidate('')
302 - def getUri(self, caller_id):
303 """ 304 Get the XML-RPC URI of this server. 305 @param caller_id str: ROS caller id 306 @return [int, str, str]: [1, "", xmlRpcUri] 307 """ 308 return 1, "", self.uri
309 310 311 @apivalidate(-1)
312 - def getPid(self, caller_id):
313 """ 314 Get the PID of this server 315 @param caller_id: ROS caller id 316 @type caller_id: str 317 @return: [1, "", serverProcessPID] 318 @rtype: [int, str, int] 319 """ 320 return 1, "", os.getpid()
321 322 323 ################################################################ 324 # PARAMETER SERVER ROUTINES 325 326 @apivalidate(0, (non_empty_str('key'),))
327 - def deleteParam(self, caller_id, key):
328 """ 329 Parameter Server: delete parameter 330 @param caller_id: ROS caller id 331 @type caller_id: str 332 @param key: parameter name 333 @type key: str 334 @return: [code, msg, 0] 335 @rtype: [int, str, int] 336 """ 337 try: 338 key = resolve_name(key, caller_id) 339 self.param_server.delete_param(key, self._notify_param_subscribers) 340 mloginfo("-PARAM [%s] by %s",key, caller_id) 341 return 1, "parameter %s deleted"%key, 0 342 except KeyError as e: 343 return -1, "parameter [%s] is not set"%key, 0
344 345 @apivalidate(0, (non_empty_str('key'), not_none('value')))
346 - def setParam(self, caller_id, key, value):
347 """ 348 Parameter Server: set parameter. NOTE: if value is a 349 dictionary it will be treated as a parameter tree, where key 350 is the parameter namespace. For example::: 351 {'x':1,'y':2,'sub':{'z':3}} 352 353 will set key/x=1, key/y=2, and key/sub/z=3. Furthermore, it 354 will replace all existing parameters in the key parameter 355 namespace with the parameters in value. You must set 356 parameters individually if you wish to perform a union update. 357 358 @param caller_id: ROS caller id 359 @type caller_id: str 360 @param key: parameter name 361 @type key: str 362 @param value: parameter value. 363 @type value: XMLRPCLegalValue 364 @return: [code, msg, 0] 365 @rtype: [int, str, int] 366 """ 367 key = resolve_name(key, caller_id) 368 self.param_server.set_param(key, value, self._notify_param_subscribers) 369 mloginfo("+PARAM [%s] by %s",key, caller_id) 370 return 1, "parameter %s set"%key, 0
371 372 @apivalidate(0, (non_empty_str('key'),))
373 - def getParam(self, caller_id, key):
374 """ 375 Retrieve parameter value from server. 376 @param caller_id: ROS caller id 377 @type caller_id: str 378 @param key: parameter to lookup. If key is a namespace, 379 getParam() will return a parameter tree. 380 @type key: str 381 getParam() will return a parameter tree. 382 383 @return: [code, statusMessage, parameterValue]. If code is not 384 1, parameterValue should be ignored. If key is a namespace, 385 the return value will be a dictionary, where each key is a 386 parameter in that namespace. Sub-namespaces are also 387 represented as dictionaries. 388 @rtype: [int, str, XMLRPCLegalValue] 389 """ 390 try: 391 key = resolve_name(key, caller_id) 392 return 1, "Parameter [%s]"%key, self.param_server.get_param(key) 393 except KeyError as e: 394 return -1, "Parameter [%s] is not set"%key, 0
395 396 @apivalidate(0, (non_empty_str('key'),))
397 - def searchParam(self, caller_id, key):
398 """ 399 Search for parameter key on parameter server. Search starts in caller's namespace and proceeds 400 upwards through parent namespaces until Parameter Server finds a matching key. 401 402 searchParam's behavior is to search for the first partial match. 403 For example, imagine that there are two 'robot_description' parameters:: 404 405 /robot_description 406 /robot_description/arm 407 /robot_description/base 408 /pr2/robot_description 409 /pr2/robot_description/base 410 411 If I start in the namespace /pr2/foo and search for 412 'robot_description', searchParam will match 413 /pr2/robot_description. If I search for 'robot_description/arm' 414 it will return /pr2/robot_description/arm, even though that 415 parameter does not exist (yet). 416 417 @param caller_id str: ROS caller id 418 @type caller_id: str 419 @param key: parameter key to search for. 420 @type key: str 421 @return: [code, statusMessage, foundKey]. If code is not 1, foundKey should be 422 ignored. 423 @rtype: [int, str, str] 424 """ 425 search_key = self.param_server.search_param(caller_id, key) 426 if search_key: 427 return 1, "Found [%s]"%search_key, search_key 428 else: 429 return -1, "Cannot find parameter [%s] in an upwards search"%key, ''
430 431 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
432 - def subscribeParam(self, caller_id, caller_api, key):
433 """ 434 Retrieve parameter value from server and subscribe to updates to that param. See 435 paramUpdate() in the Node API. 436 @param caller_id str: ROS caller id 437 @type caller_id: str 438 @param key: parameter to lookup. 439 @type key: str 440 @param caller_api: API URI for paramUpdate callbacks. 441 @type caller_api: str 442 @return: [code, statusMessage, parameterValue]. If code is not 443 1, parameterValue should be ignored. parameterValue is an empty dictionary if the parameter 444 has not been set yet. 445 @rtype: [int, str, XMLRPCLegalValue] 446 """ 447 key = resolve_name(key, caller_id) 448 try: 449 # ps_lock has precedence and is required due to 450 # potential self.reg_manager modification 451 self.ps_lock.acquire() 452 val = self.param_server.subscribe_param(key, (caller_id, caller_api)) 453 finally: 454 self.ps_lock.release() 455 return 1, "Subscribed to parameter [%s]"%key, val
456 457 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
458 - def unsubscribeParam(self, caller_id, caller_api, key):
459 """ 460 Retrieve parameter value from server and subscribe to updates to that param. See 461 paramUpdate() in the Node API. 462 @param caller_id str: ROS caller id 463 @type caller_id: str 464 @param key: parameter to lookup. 465 @type key: str 466 @param caller_api: API URI for paramUpdate callbacks. 467 @type caller_api: str 468 @return: [code, statusMessage, numUnsubscribed]. 469 If numUnsubscribed is zero it means that the caller was not subscribed to the parameter. 470 @rtype: [int, str, int] 471 """ 472 key = resolve_name(key, caller_id) 473 try: 474 # ps_lock is required due to potential self.reg_manager modification 475 self.ps_lock.acquire() 476 retval = self.param_server.unsubscribe_param(key, (caller_id, caller_api)) 477 finally: 478 self.ps_lock.release() 479 return 1, "Unsubscribe to parameter [%s]"%key, 1
480 481 482 @apivalidate(False, (non_empty_str('key'),))
483 - def hasParam(self, caller_id, key):
484 """ 485 Check if parameter is stored on server. 486 @param caller_id str: ROS caller id 487 @type caller_id: str 488 @param key: parameter to check 489 @type key: str 490 @return: [code, statusMessage, hasParam] 491 @rtype: [int, str, bool] 492 """ 493 key = resolve_name(key, caller_id) 494 if self.param_server.has_param(key): 495 return 1, key, True 496 else: 497 return 1, key, False
498 499 @apivalidate([])
500 - def getParamNames(self, caller_id):
501 """ 502 Get list of all parameter names stored on this server. 503 This does not adjust parameter names for caller's scope. 504 505 @param caller_id: ROS caller id 506 @type caller_id: str 507 @return: [code, statusMessage, parameterNameList] 508 @rtype: [int, str, [str]] 509 """ 510 return 1, "Parameter names", self.param_server.get_param_names()
511 512 ################################################################################## 513 # NOTIFICATION ROUTINES 514
515 - def _notify(self, registrations, task, key, value, node_apis):
516 """ 517 Generic implementation of callback notification 518 @param registrations: Registrations 519 @type registrations: L{Registrations} 520 @param task: task to queue 521 @type task: fn 522 @param key: registration key 523 @type key: str 524 @param value: value to pass to task 525 @type value: Any 526 """ 527 # cache thread_pool for thread safety 528 thread_pool = self.thread_pool 529 if not thread_pool: 530 return 531 532 try: 533 for node_api in node_apis: 534 # use the api as a marker so that we limit one thread per subscriber 535 thread_pool.queue_task(node_api, task, (node_api, key, value)) 536 except KeyError: 537 _logger.warn('subscriber data stale (key [%s], listener [%s]): node API unknown'%(key, s))
538
539 - def _notify_param_subscribers(self, updates):
540 """ 541 Notify parameter subscribers of new parameter value 542 @param updates [([str], str, any)*]: [(subscribers, param_key, param_value)*] 543 @param param_value str: parameter value 544 """ 545 # cache thread_pool for thread safety 546 thread_pool = self.thread_pool 547 if not thread_pool: 548 return 549 550 for subscribers, key, value in updates: 551 # use the api as a marker so that we limit one thread per subscriber 552 for caller_id, caller_api in subscribers: 553 self.thread_pool.queue_task(caller_api, self.param_update_task, (caller_id, caller_api, key, value))
554
555 - def param_update_task(self, caller_id, caller_api, param_key, param_value):
556 """ 557 Contact api.paramUpdate with specified parameters 558 @param caller_id: caller ID 559 @type caller_id: str 560 @param caller_api: XML-RPC URI of node to contact 561 @type caller_api: str 562 @param param_key: parameter key to pass to node 563 @type param_key: str 564 @param param_value: parameter value to pass to node 565 @type param_value: str 566 """ 567 mloginfo("paramUpdate[%s]", param_key) 568 code, _, _ = xmlrpcapi(caller_api).paramUpdate('/master', param_key, param_value) 569 if code == -1: 570 try: 571 # ps_lock is required due to potential self.reg_manager modification 572 self.ps_lock.acquire() 573 # reverse lookup to figure out who we just called 574 matches = self.reg_manager.reverse_lookup(caller_api) 575 for m in matches: 576 retval = self.param_server.unsubscribe_param(param_key, (m.id, caller_api)) 577 finally: 578 self.ps_lock.release()
579
580 - def _notify_topic_subscribers(self, topic, pub_uris, sub_uris):
581 """ 582 Notify subscribers with new publisher list 583 @param topic: name of topic 584 @type topic: str 585 @param pub_uris: list of URIs of publishers. 586 @type pub_uris: [str] 587 """ 588 self._notify(self.subscribers, publisher_update_task, topic, pub_uris, sub_uris)
589 590 ################################################################################## 591 # SERVICE PROVIDER 592 593 @apivalidate(0, ( is_service('service'), is_api('service_api'), is_api('caller_api')))
594 - def registerService(self, caller_id, service, service_api, caller_api):
595 """ 596 Register the caller as a provider of the specified service. 597 @param caller_id str: ROS caller id 598 @type caller_id: str 599 @param service: Fully-qualified name of service 600 @type service: str 601 @param service_api: Service URI 602 @type service_api: str 603 @param caller_api: XML-RPC URI of caller node 604 @type caller_api: str 605 @return: (code, message, ignore) 606 @rtype: (int, str, int) 607 """ 608 try: 609 self.ps_lock.acquire() 610 self.reg_manager.register_service(service, caller_id, caller_api, service_api) 611 mloginfo("+SERVICE [%s] %s %s", service, caller_id, caller_api) 612 finally: 613 self.ps_lock.release() 614 return 1, "Registered [%s] as provider of [%s]"%(caller_id, service), 1
615 616 @apivalidate(0, (is_service('service'),))
617 - def lookupService(self, caller_id, service):
618 """ 619 Lookup all provider of a particular service. 620 @param caller_id str: ROS caller id 621 @type caller_id: str 622 @param service: fully-qualified name of service to lookup. 623 @type: service: str 624 @return: (code, message, serviceUrl). service URL is provider's 625 ROSRPC URI with address and port. Fails if there is no provider. 626 @rtype: (int, str, str) 627 """ 628 try: 629 self.ps_lock.acquire() 630 service_url = self.services.get_service_api(service) 631 finally: 632 self.ps_lock.release() 633 if service_url: 634 return 1, "rosrpc URI: [%s]"%service_url, service_url 635 else: 636 return -1, "no provider", ''
637 638 @apivalidate(0, ( is_service('service'), is_api('service_api')))
639 - def unregisterService(self, caller_id, service, service_api):
640 """ 641 Unregister the caller as a provider of the specified service. 642 @param caller_id str: ROS caller id 643 @type caller_id: str 644 @param service: Fully-qualified name of service 645 @type service: str 646 @param service_api: API URI of service to unregister. Unregistration will only occur if current 647 registration matches. 648 @type service_api: str 649 @return: (code, message, numUnregistered). Number of unregistrations (either 0 or 1). 650 If this is zero it means that the caller was not registered as a service provider. 651 The call still succeeds as the intended final state is reached. 652 @rtype: (int, str, int) 653 """ 654 try: 655 self.ps_lock.acquire() 656 retval = self.reg_manager.unregister_service(service, caller_id, service_api) 657 mloginfo("-SERVICE [%s] %s %s", service, caller_id, service_api) 658 return retval 659 finally: 660 self.ps_lock.release()
661 662 ################################################################################## 663 # PUBLISH/SUBSCRIBE 664 665 @apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
666 - def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
667 """ 668 Subscribe the caller to the specified topic. In addition to receiving 669 a list of current publishers, the subscriber will also receive notifications 670 of new publishers via the publisherUpdate API. 671 @param caller_id: ROS caller id 672 @type caller_id: str 673 @param topic str: Fully-qualified name of topic to subscribe to. 674 @param topic_type: Datatype for topic. Must be a package-resource name, i.e. the .msg name. 675 @type topic_type: str 676 @param caller_api: XML-RPC URI of caller node for new publisher notifications 677 @type caller_api: str 678 @return: (code, message, publishers). Publishers is a list of XMLRPC API URIs 679 for nodes currently publishing the specified topic. 680 @rtype: (int, str, [str]) 681 """ 682 #NOTE: subscribers do not get to set topic type 683 try: 684 self.ps_lock.acquire() 685 self.reg_manager.register_subscriber(topic, caller_id, caller_api) 686 687 # ROS 1.1: subscriber can now set type if it is not already set 688 # - don't let '*' type squash valid typing 689 if not topic in self.topics_types and topic_type != rosgraph.names.ANYTYPE: 690 self.topics_types[topic] = topic_type 691 692 mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api) 693 pub_uris = self.publishers.get_apis(topic) 694 finally: 695 self.ps_lock.release() 696 return 1, "Subscribed to [%s]"%topic, pub_uris
697 698 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
699 - def unregisterSubscriber(self, caller_id, topic, caller_api):
700 """ 701 Unregister the caller as a publisher of the topic. 702 @param caller_id: ROS caller id 703 @type caller_id: str 704 @param topic: Fully-qualified name of topic to unregister. 705 @type topic: str 706 @param caller_api: API URI of service to unregister. Unregistration will only occur if current 707 registration matches. 708 @type caller_api: str 709 @return: (code, statusMessage, numUnsubscribed). 710 If numUnsubscribed is zero it means that the caller was not registered as a subscriber. 711 The call still succeeds as the intended final state is reached. 712 @rtype: (int, str, int) 713 """ 714 try: 715 self.ps_lock.acquire() 716 retval = self.reg_manager.unregister_subscriber(topic, caller_id, caller_api) 717 mloginfo("-SUB [%s] %s %s",topic, caller_id, caller_api) 718 return retval 719 finally: 720 self.ps_lock.release()
721 722 @apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
723 - def registerPublisher(self, caller_id, topic, topic_type, caller_api):
724 """ 725 Register the caller as a publisher the topic. 726 @param caller_id: ROS caller id 727 @type caller_id: str 728 @param topic: Fully-qualified name of topic to register. 729 @type topic: str 730 @param topic_type: Datatype for topic. Must be a 731 package-resource name, i.e. the .msg name. 732 @type topic_type: str 733 @param caller_api str: ROS caller XML-RPC API URI 734 @type caller_api: str 735 @return: (code, statusMessage, subscriberApis). 736 List of current subscribers of topic in the form of XMLRPC URIs. 737 @rtype: (int, str, [str]) 738 """ 739 #NOTE: we need topic_type for getPublishedTopics. 740 try: 741 self.ps_lock.acquire() 742 self.reg_manager.register_publisher(topic, caller_id, caller_api) 743 # don't let '*' type squash valid typing 744 if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types: 745 self.topics_types[topic] = topic_type 746 pub_uris = self.publishers.get_apis(topic) 747 sub_uris = self.subscribers.get_apis(topic) 748 self._notify_topic_subscribers(topic, pub_uris, sub_uris) 749 mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api) 750 sub_uris = self.subscribers.get_apis(topic) 751 finally: 752 self.ps_lock.release() 753 return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris
754 755 756 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
757 - def unregisterPublisher(self, caller_id, topic, caller_api):
758 """ 759 Unregister the caller as a publisher of the topic. 760 @param caller_id: ROS caller id 761 @type caller_id: str 762 @param topic: Fully-qualified name of topic to unregister. 763 @type topic: str 764 @param caller_api str: API URI of service to 765 unregister. Unregistration will only occur if current 766 registration matches. 767 @type caller_api: str 768 @return: (code, statusMessage, numUnregistered). 769 If numUnregistered is zero it means that the caller was not registered as a publisher. 770 The call still succeeds as the intended final state is reached. 771 @rtype: (int, str, int) 772 """ 773 try: 774 self.ps_lock.acquire() 775 retval = self.reg_manager.unregister_publisher(topic, caller_id, caller_api) 776 if retval[VAL]: 777 self._notify_topic_subscribers(topic, self.publishers.get_apis(topic), self.subscribers.get_apis(topic)) 778 mloginfo("-PUB [%s] %s %s",topic, caller_id, caller_api) 779 finally: 780 self.ps_lock.release() 781 return retval
782 783 ################################################################################## 784 # GRAPH STATE APIS 785 786 @apivalidate('', (valid_name('node'),))
787 - def lookupNode(self, caller_id, node_name):
788 """ 789 Get the XML-RPC URI of the node with the associated 790 name/caller_id. This API is for looking information about 791 publishers and subscribers. Use lookupService instead to lookup 792 ROS-RPC URIs. 793 @param caller_id: ROS caller id 794 @type caller_id: str 795 @param node: name of node to lookup 796 @type node: str 797 @return: (code, msg, URI) 798 @rtype: (int, str, str) 799 """ 800 try: 801 self.ps_lock.acquire() 802 node = self.reg_manager.get_node(node_name) 803 if node is not None: 804 retval = 1, "node api", node.api 805 else: 806 retval = -1, "unknown node [%s]"%node_name, '' 807 finally: 808 self.ps_lock.release() 809 return retval
810 811 @apivalidate(0, (empty_or_valid_name('subgraph'),))
812 - def getPublishedTopics(self, caller_id, subgraph):
813 """ 814 Get list of topics that can be subscribed to. This does not return topics that have no publishers. 815 See L{getSystemState()} to get more comprehensive list. 816 @param caller_id: ROS caller id 817 @type caller_id: str 818 @param subgraph: Restrict topic names to match within the specified subgraph. Subgraph namespace 819 is resolved relative to the caller's namespace. Use '' to specify all names. 820 @type subgraph: str 821 @return: (code, msg, [[topic1, type1]...[topicN, typeN]]) 822 @rtype: (int, str, [[str, str],]) 823 """ 824 try: 825 self.ps_lock.acquire() 826 # force subgraph to be a namespace with trailing slash 827 if subgraph and subgraph[-1] != rosgraph.names.SEP: 828 subgraph = subgraph + rosgraph.names.SEP 829 #we don't bother with subscribers as subscribers don't report topic types. also, the intended 830 #use case is for subscribe-by-topic-type 831 retval = [[t, self.topics_types[t]] for t in self.publishers.iterkeys() if t.startswith(subgraph)] 832 finally: 833 self.ps_lock.release() 834 return 1, "current topics", retval
835 836 @apivalidate([])
837 - def getTopicTypes(self, caller_id):
838 """ 839 Retrieve list topic names and their types. 840 @param caller_id: ROS caller id 841 @type caller_id: str 842 @rtype: (int, str, [[str,str]] ) 843 @return: (code, statusMessage, topicTypes). topicTypes is a list of [topicName, topicType] pairs. 844 """ 845 try: 846 self.ps_lock.acquire() 847 retval = list(self.topics_types.items()) 848 finally: 849 self.ps_lock.release() 850 return 1, "current system state", retval
851 852 @apivalidate([[],[], []])
853 - def getSystemState(self, caller_id):
854 """ 855 Retrieve list representation of system state (i.e. publishers, subscribers, and services). 856 @param caller_id: ROS caller id 857 @type caller_id: str 858 @rtype: (int, str, [[str,[str]], [str,[str]], [str,[str]]]) 859 @return: (code, statusMessage, systemState). 860 861 System state is in list representation:: 862 [publishers, subscribers, services]. 863 864 publishers is of the form:: 865 [ [topic1, [topic1Publisher1...topic1PublisherN]] ... ] 866 867 subscribers is of the form:: 868 [ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ] 869 870 services is of the form:: 871 [ [service1, [service1Provider1...service1ProviderN]] ... ] 872 """ 873 edges = [] 874 try: 875 self.ps_lock.acquire() 876 retval = [r.get_state() for r in (self.publishers, self.subscribers, self.services)] 877 finally: 878 self.ps_lock.release() 879 return 1, "current system state", retval 880