Package rosmaster :: Module master_api

Source Code for Module rosmaster.master_api

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2008, Willow Garage, Inc.
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  #
 
 33  # Revision $Id$
 
 34  """
 
 35  ROS Master API. 
 
 36  
 
 37  L{ROSMasterHandler} provides the API implementation of the
 
 38  Master. Python allows an API to be introspected from a Python class,
 
 39  so the handler has a 1-to-1 mapping with the actual XMLRPC API.
 
 40  
 
 41  API return convention: (statusCode, statusMessage, returnValue)
 
 42  
 
 43   - statusCode: an integer indicating the completion condition of the method. 
 
 44   - statusMessage: a human-readable string message for debugging
 
 45   - returnValue: the return value of the method; method-specific.
 
 46  
 
 47  Current status codes: 
 
 48  
 
 49   - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
 
 50   - 0: FAILURE: Method was attempted but failed to complete correctly.
 
 51   - 1: SUCCESS: Method completed successfully.
 
 52  
 
 53  Individual methods may assign additional meaning/semantics to statusCode.
 
 54  """ 
 55  
 
 56  from __future__ import print_function 
 57  
 
 58  import os 
 59  import sys 
 60  import logging 
 61  import threading 
 62  import time 
 63  import traceback 
 64  
 
 65  from rosgraph.xmlrpc import XmlRpcHandler 
 66  
 
 67  import rosgraph.names 
 68  from rosgraph.names import resolve_name 
 69  import rosmaster.paramserver 
 70  import rosmaster.threadpool 
 71  
 
 72  from rosmaster.util import xmlrpcapi 
 73  from rosmaster.registrations import RegistrationManager 
 74  from rosmaster.validators import non_empty, non_empty_str, not_none, is_api, is_topic, is_service, valid_type_name, valid_name, empty_or_valid_name, ParameterInvalid 
 75  
 
 76  NUM_WORKERS = 3 #number of threads we use to send publisher_update notifications 
 77  
 
 78  # Return code slots
 
 79  STATUS = 0 
 80  MSG = 1 
 81  VAL = 2 
 82  
 
 83  _logger = logging.getLogger("rosmaster.master") 
 84  
 
 85  LOG_API = False 
86 87 -def mloginfo(msg, *args):
88 """ 89 Info-level master log statements. These statements may be printed 90 to screen so they should be user-readable. 91 @param msg: Message string 92 @type msg: str 93 @param args: arguments for msg if msg is a format string 94 """ 95 #mloginfo is in core so that it is accessible to master and masterdata 96 _logger.info(msg, *args)
97
98 -def mlogwarn(msg, *args):
99 """ 100 Warn-level master log statements. These statements may be printed 101 to screen so they should be user-readable. 102 @param msg: Message string 103 @type msg: str 104 @param args: arguments for msg if msg is a format string 105 """ 106 #mloginfo is in core so that it is accessible to master and masterdata 107 _logger.warn(msg, *args) 108 if args: 109 print("WARN: " + msg % args) 110 else: 111 print("WARN: " + str(msg))
112
113 114 -def apivalidate(error_return_value, validators=()):
115 """ 116 ROS master/slave arg-checking decorator. Applies the specified 117 validator to the corresponding argument and also remaps each 118 argument to be the value returned by the validator. Thus, 119 arguments can be simultaneously validated and canonicalized prior 120 to actual function call. 121 @param error_return_value: API value to return if call unexpectedly fails 122 @param validators: sequence of validators to apply to each 123 arg. None means no validation for the parameter is required. As all 124 api methods take caller_id as the first parameter, the validators 125 start with the second param. 126 @type validators: sequence 127 """ 128 def check_validates(f): 129 try: 130 func_code = f.__code__ 131 func_name = f.__name__ 132 except AttributeError: 133 func_code = f.func_code 134 func_name = f.func_name 135 assert len(validators) == func_code.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id 136 def validated_f(*args, **kwds): 137 if LOG_API: 138 _logger.debug("%s%s", func_name, str(args[1:])) 139 #print "%s%s"%(func_name, str(args[1:])) 140 if len(args) == 1: 141 _logger.error("%s invoked without caller_id paramter" % func_name) 142 return -1, "missing required caller_id parameter", error_return_value 143 elif len(args) != func_code.co_argcount: 144 return -1, "Error: bad call arity", error_return_value 145 146 instance = args[0] 147 caller_id = args[1] 148 def isstring(s): 149 """Small helper version to check an object is a string in 150 a way that works for both Python 2 and 3 151 """ 152 try: 153 return isinstance(s, basestring) 154 except NameError: 155 return isinstance(s, str)
156 if not isstring(caller_id): 157 _logger.error("%s: invalid caller_id param type", func_name) 158 return -1, "caller_id must be a string", error_return_value 159 160 newArgs = [instance, caller_id] #canonicalized args 161 try: 162 for (v, a) in zip(validators, args[2:]): 163 if v: 164 try: 165 newArgs.append(v(a, caller_id)) 166 except ParameterInvalid as e: 167 _logger.error("%s: invalid parameter: %s", func_name, str(e) or 'error') 168 return -1, str(e) or 'error', error_return_value 169 else: 170 newArgs.append(a) 171 172 if LOG_API: 173 retval = f(*newArgs, **kwds) 174 _logger.debug("%s%s returns %s", func_name, args[1:], retval) 175 return retval 176 else: 177 code, msg, val = f(*newArgs, **kwds) 178 if val is None: 179 return -1, "Internal error (None value returned)", error_return_value 180 return code, msg, val 181 except TypeError as te: #most likely wrong arg number 182 _logger.error(traceback.format_exc()) 183 return -1, "Error: invalid arguments: %s"%te, error_return_value 184 except Exception as e: #internal failure 185 _logger.error(traceback.format_exc()) 186 return 0, "Internal failure: %s"%e, error_return_value 187 try: 188 validated_f.__name__ = func_name 189 except AttributeError: 190 validated_f.func_name = func_name 191 validated_f.__doc__ = f.__doc__ #preserve doc 192 return validated_f 193 return check_validates 194
195 -def publisher_update_task(api, topic, pub_uris):
196 """ 197 Contact api.publisherUpdate with specified parameters 198 @param api: XML-RPC URI of node to contact 199 @type api: str 200 @param topic: Topic name to send to node 201 @type topic: str 202 @param pub_uris: list of publisher APIs to send to node 203 @type pub_uris: [str] 204 """ 205 msg = "publisherUpdate[%s] -> %s %s" % (topic, api, pub_uris) 206 mloginfo(msg) 207 start_sec = time.time() 208 try: 209 #TODO: check return value for errors so we can unsubscribe if stale 210 ret = xmlrpcapi(api).publisherUpdate('/master', topic, pub_uris) 211 msg_suffix = "result=%s" % ret 212 except Exception as ex: 213 msg_suffix = "exception=%s" % ex 214 raise 215 finally: 216 delta_sec = time.time() - start_sec 217 mloginfo("%s: sec=%0.2f, %s", msg, delta_sec, msg_suffix)
218
219 220 -def service_update_task(api, service, uri):
221 """ 222 Contact api.serviceUpdate with specified parameters 223 @param api: XML-RPC URI of node to contact 224 @type api: str 225 @param service: Service name to send to node 226 @type service: str 227 @param uri: URI to send to node 228 @type uri: str 229 """ 230 mloginfo("serviceUpdate[%s, %s] -> %s",service, uri, api) 231 xmlrpcapi(api).serviceUpdate('/master', service, uri)
232
233 ################################################### 234 # Master Implementation 235 236 -class ROSMasterHandler(object):
237 """ 238 XML-RPC handler for ROS master APIs. 239 API routines for the ROS Master Node. The Master Node is a 240 superset of the Slave Node and contains additional API methods for 241 creating and monitoring a graph of slave nodes. 242 243 By convention, ROS nodes take in caller_id as the first parameter 244 of any API call. The setting of this parameter is rarely done by 245 client code as ros::msproxy::MasterProxy automatically inserts 246 this parameter (see ros::client::getMaster()). 247 """ 248
249 - def __init__(self, num_workers=NUM_WORKERS):
250 """ctor.""" 251 252 self.uri = None 253 self.done = False 254 255 self.thread_pool = rosmaster.threadpool.MarkedThreadPool(num_workers) 256 # pub/sub/providers: dict { topicName : [publishers/subscribers names] } 257 self.ps_lock = threading.Condition(threading.Lock()) 258 259 self.reg_manager = RegistrationManager(self.thread_pool) 260 261 # maintain refs to reg_manager fields 262 self.publishers = self.reg_manager.publishers 263 self.subscribers = self.reg_manager.subscribers 264 self.services = self.reg_manager.services 265 self.param_subscribers = self.reg_manager.param_subscribers 266 267 self.topics_types = {} #dict { topicName : type } 268 269 # parameter server dictionary 270 self.param_server = rosmaster.paramserver.ParamDictionary(self.reg_manager)
271
272 - def _shutdown(self, reason=''):
273 if self.thread_pool is not None: 274 self.thread_pool.join_all(wait_for_tasks=False, wait_for_threads=False) 275 self.thread_pool = None 276 self.done = True
277
278 - def _ready(self, uri):
279 """ 280 Initialize the handler with the XMLRPC URI. This is a standard callback from the XmlRpcNode API. 281 282 @param uri: XML-RPC URI 283 @type uri: str 284 """ 285 self.uri = uri
286
287 - def _ok(self):
288 return not self.done
289 290 ############################################################################### 291 # EXTERNAL API 292 293 @apivalidate(0, (None, ))
294 - def shutdown(self, caller_id, msg=''):
295 """ 296 Stop this server 297 @param caller_id: ROS caller id 298 @type caller_id: str 299 @param msg: a message describing why the node is being shutdown. 300 @type msg: str 301 @return: [code, msg, 0] 302 @rtype: [int, str, int] 303 """ 304 if msg: 305 print("shutdown request: %s" % msg, file=sys.stdout) 306 else: 307 print("shutdown requst", file=sys.stdout) 308 self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)) 309 return 1, "shutdown", 0
310 311 @apivalidate('')
312 - def getUri(self, caller_id):
313 """ 314 Get the XML-RPC URI of this server. 315 @param caller_id str: ROS caller id 316 @return [int, str, str]: [1, "", xmlRpcUri] 317 """ 318 return 1, "", self.uri
319 320 321 @apivalidate(-1)
322 - def getPid(self, caller_id):
323 """ 324 Get the PID of this server 325 @param caller_id: ROS caller id 326 @type caller_id: str 327 @return: [1, "", serverProcessPID] 328 @rtype: [int, str, int] 329 """ 330 return 1, "", os.getpid()
331 332 333 ################################################################ 334 # PARAMETER SERVER ROUTINES 335 336 @apivalidate(0, (non_empty_str('key'),))
337 - def deleteParam(self, caller_id, key):
338 """ 339 Parameter Server: delete parameter 340 @param caller_id: ROS caller id 341 @type caller_id: str 342 @param key: parameter name 343 @type key: str 344 @return: [code, msg, 0] 345 @rtype: [int, str, int] 346 """ 347 try: 348 key = resolve_name(key, caller_id) 349 self.param_server.delete_param(key, self._notify_param_subscribers) 350 mloginfo("-PARAM [%s] by %s",key, caller_id) 351 return 1, "parameter %s deleted"%key, 0 352 except KeyError as e: 353 return -1, "parameter [%s] is not set"%key, 0
354 355 @apivalidate(0, (non_empty_str('key'), not_none('value')))
356 - def setParam(self, caller_id, key, value):
357 """ 358 Parameter Server: set parameter. NOTE: if value is a 359 dictionary it will be treated as a parameter tree, where key 360 is the parameter namespace. For example::: 361 {'x':1,'y':2,'sub':{'z':3}} 362 363 will set key/x=1, key/y=2, and key/sub/z=3. Furthermore, it 364 will replace all existing parameters in the key parameter 365 namespace with the parameters in value. You must set 366 parameters individually if you wish to perform a union update. 367 368 @param caller_id: ROS caller id 369 @type caller_id: str 370 @param key: parameter name 371 @type key: str 372 @param value: parameter value. 373 @type value: XMLRPCLegalValue 374 @return: [code, msg, 0] 375 @rtype: [int, str, int] 376 """ 377 key = resolve_name(key, caller_id) 378 self.param_server.set_param(key, value, self._notify_param_subscribers) 379 mloginfo("+PARAM [%s] by %s",key, caller_id) 380 return 1, "parameter %s set"%key, 0
381 382 @apivalidate(0, (non_empty_str('key'),))
383 - def getParam(self, caller_id, key):
384 """ 385 Retrieve parameter value from server. 386 @param caller_id: ROS caller id 387 @type caller_id: str 388 @param key: parameter to lookup. If key is a namespace, 389 getParam() will return a parameter tree. 390 @type key: str 391 getParam() will return a parameter tree. 392 393 @return: [code, statusMessage, parameterValue]. If code is not 394 1, parameterValue should be ignored. If key is a namespace, 395 the return value will be a dictionary, where each key is a 396 parameter in that namespace. Sub-namespaces are also 397 represented as dictionaries. 398 @rtype: [int, str, XMLRPCLegalValue] 399 """ 400 try: 401 key = resolve_name(key, caller_id) 402 return 1, "Parameter [%s]"%key, self.param_server.get_param(key) 403 except KeyError as e: 404 return -1, "Parameter [%s] is not set"%key, 0
405 406 @apivalidate(0, (non_empty_str('key'),))
407 - def searchParam(self, caller_id, key):
408 """ 409 Search for parameter key on parameter server. Search starts in caller's namespace and proceeds 410 upwards through parent namespaces until Parameter Server finds a matching key. 411 412 searchParam's behavior is to search for the first partial match. 413 For example, imagine that there are two 'robot_description' parameters:: 414 415 /robot_description 416 /robot_description/arm 417 /robot_description/base 418 /pr2/robot_description 419 /pr2/robot_description/base 420 421 If I start in the namespace /pr2/foo and search for 422 'robot_description', searchParam will match 423 /pr2/robot_description. If I search for 'robot_description/arm' 424 it will return /pr2/robot_description/arm, even though that 425 parameter does not exist (yet). 426 427 @param caller_id str: ROS caller id 428 @type caller_id: str 429 @param key: parameter key to search for. 430 @type key: str 431 @return: [code, statusMessage, foundKey]. If code is not 1, foundKey should be 432 ignored. 433 @rtype: [int, str, str] 434 """ 435 search_key = self.param_server.search_param(caller_id, key) 436 if search_key: 437 return 1, "Found [%s]"%search_key, search_key 438 else: 439 return -1, "Cannot find parameter [%s] in an upwards search"%key, ''
440 441 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
442 - def subscribeParam(self, caller_id, caller_api, key):
443 """ 444 Retrieve parameter value from server and subscribe to updates to that param. See 445 paramUpdate() in the Node API. 446 @param caller_id str: ROS caller id 447 @type caller_id: str 448 @param key: parameter to lookup. 449 @type key: str 450 @param caller_api: API URI for paramUpdate callbacks. 451 @type caller_api: str 452 @return: [code, statusMessage, parameterValue]. If code is not 453 1, parameterValue should be ignored. parameterValue is an empty dictionary if the parameter 454 has not been set yet. 455 @rtype: [int, str, XMLRPCLegalValue] 456 """ 457 key = resolve_name(key, caller_id) 458 try: 459 # ps_lock has precedence and is required due to 460 # potential self.reg_manager modification 461 self.ps_lock.acquire() 462 val = self.param_server.subscribe_param(key, (caller_id, caller_api)) 463 finally: 464 self.ps_lock.release() 465 return 1, "Subscribed to parameter [%s]"%key, val
466 467 @apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
468 - def unsubscribeParam(self, caller_id, caller_api, key):
469 """ 470 Retrieve parameter value from server and subscribe to updates to that param. See 471 paramUpdate() in the Node API. 472 @param caller_id str: ROS caller id 473 @type caller_id: str 474 @param key: parameter to lookup. 475 @type key: str 476 @param caller_api: API URI for paramUpdate callbacks. 477 @type caller_api: str 478 @return: [code, statusMessage, numUnsubscribed]. 479 If numUnsubscribed is zero it means that the caller was not subscribed to the parameter. 480 @rtype: [int, str, int] 481 """ 482 key = resolve_name(key, caller_id) 483 try: 484 # ps_lock is required due to potential self.reg_manager modification 485 self.ps_lock.acquire() 486 retval = self.param_server.unsubscribe_param(key, (caller_id, caller_api)) 487 finally: 488 self.ps_lock.release() 489 return 1, "Unsubscribe to parameter [%s]"%key, 1
490 491 492 @apivalidate(False, (non_empty_str('key'),))
493 - def hasParam(self, caller_id, key):
494 """ 495 Check if parameter is stored on server. 496 @param caller_id str: ROS caller id 497 @type caller_id: str 498 @param key: parameter to check 499 @type key: str 500 @return: [code, statusMessage, hasParam] 501 @rtype: [int, str, bool] 502 """ 503 key = resolve_name(key, caller_id) 504 if self.param_server.has_param(key): 505 return 1, key, True 506 else: 507 return 1, key, False
508 509 @apivalidate([])
510 - def getParamNames(self, caller_id):
511 """ 512 Get list of all parameter names stored on this server. 513 This does not adjust parameter names for caller's scope. 514 515 @param caller_id: ROS caller id 516 @type caller_id: str 517 @return: [code, statusMessage, parameterNameList] 518 @rtype: [int, str, [str]] 519 """ 520 return 1, "Parameter names", self.param_server.get_param_names()
521 522 ################################################################################## 523 # NOTIFICATION ROUTINES 524
525 - def _notify(self, registrations, task, key, value, node_apis):
526 """ 527 Generic implementation of callback notification 528 @param registrations: Registrations 529 @type registrations: L{Registrations} 530 @param task: task to queue 531 @type task: fn 532 @param key: registration key 533 @type key: str 534 @param value: value to pass to task 535 @type value: Any 536 """ 537 # cache thread_pool for thread safety 538 thread_pool = self.thread_pool 539 if not thread_pool: 540 return 541 542 try: 543 for node_api in node_apis: 544 # use the api as a marker so that we limit one thread per subscriber 545 thread_pool.queue_task(node_api, task, (node_api, key, value)) 546 except KeyError: 547 _logger.warn('subscriber data stale (key [%s], listener [%s]): node API unknown'%(key, s))
548
549 - def _notify_param_subscribers(self, updates):
550 """ 551 Notify parameter subscribers of new parameter value 552 @param updates [([str], str, any)*]: [(subscribers, param_key, param_value)*] 553 @param param_value str: parameter value 554 """ 555 # cache thread_pool for thread safety 556 thread_pool = self.thread_pool 557 if not thread_pool: 558 return 559 560 for subscribers, key, value in updates: 561 # use the api as a marker so that we limit one thread per subscriber 562 for caller_id, caller_api in subscribers: 563 self.thread_pool.queue_task(caller_api, self.param_update_task, (caller_id, caller_api, key, value))
564
565 - def param_update_task(self, caller_id, caller_api, param_key, param_value):
566 """ 567 Contact api.paramUpdate with specified parameters 568 @param caller_id: caller ID 569 @type caller_id: str 570 @param caller_api: XML-RPC URI of node to contact 571 @type caller_api: str 572 @param param_key: parameter key to pass to node 573 @type param_key: str 574 @param param_value: parameter value to pass to node 575 @type param_value: str 576 """ 577 mloginfo("paramUpdate[%s]", param_key) 578 code, _, _ = xmlrpcapi(caller_api).paramUpdate('/master', param_key, param_value) 579 if code == -1: 580 try: 581 # ps_lock is required due to potential self.reg_manager modification 582 self.ps_lock.acquire() 583 # reverse lookup to figure out who we just called 584 matches = self.reg_manager.reverse_lookup(caller_api) 585 for m in matches: 586 retval = self.param_server.unsubscribe_param(param_key, (m.id, caller_api)) 587 finally: 588 self.ps_lock.release()
589
590 - def _notify_topic_subscribers(self, topic, pub_uris, sub_uris):
591 """ 592 Notify subscribers with new publisher list 593 @param topic: name of topic 594 @type topic: str 595 @param pub_uris: list of URIs of publishers. 596 @type pub_uris: [str] 597 """ 598 self._notify(self.subscribers, publisher_update_task, topic, pub_uris, sub_uris)
599 600 ################################################################################## 601 # SERVICE PROVIDER 602 603 @apivalidate(0, ( is_service('service'), is_api('service_api'), is_api('caller_api')))
604 - def registerService(self, caller_id, service, service_api, caller_api):
605 """ 606 Register the caller as a provider of the specified service. 607 @param caller_id str: ROS caller id 608 @type caller_id: str 609 @param service: Fully-qualified name of service 610 @type service: str 611 @param service_api: Service URI 612 @type service_api: str 613 @param caller_api: XML-RPC URI of caller node 614 @type caller_api: str 615 @return: (code, message, ignore) 616 @rtype: (int, str, int) 617 """ 618 try: 619 self.ps_lock.acquire() 620 self.reg_manager.register_service(service, caller_id, caller_api, service_api) 621 mloginfo("+SERVICE [%s] %s %s", service, caller_id, caller_api) 622 finally: 623 self.ps_lock.release() 624 return 1, "Registered [%s] as provider of [%s]"%(caller_id, service), 1
625 626 @apivalidate(0, (is_service('service'),))
627 - def lookupService(self, caller_id, service):
628 """ 629 Lookup all provider of a particular service. 630 @param caller_id str: ROS caller id 631 @type caller_id: str 632 @param service: fully-qualified name of service to lookup. 633 @type: service: str 634 @return: (code, message, serviceUrl). service URL is provider's 635 ROSRPC URI with address and port. Fails if there is no provider. 636 @rtype: (int, str, str) 637 """ 638 try: 639 self.ps_lock.acquire() 640 service_url = self.services.get_service_api(service) 641 finally: 642 self.ps_lock.release() 643 if service_url: 644 return 1, "rosrpc URI: [%s]"%service_url, service_url 645 else: 646 return -1, "no provider", ''
647 648 @apivalidate(0, ( is_service('service'), is_api('service_api')))
649 - def unregisterService(self, caller_id, service, service_api):
650 """ 651 Unregister the caller as a provider of the specified service. 652 @param caller_id str: ROS caller id 653 @type caller_id: str 654 @param service: Fully-qualified name of service 655 @type service: str 656 @param service_api: API URI of service to unregister. Unregistration will only occur if current 657 registration matches. 658 @type service_api: str 659 @return: (code, message, numUnregistered). Number of unregistrations (either 0 or 1). 660 If this is zero it means that the caller was not registered as a service provider. 661 The call still succeeds as the intended final state is reached. 662 @rtype: (int, str, int) 663 """ 664 try: 665 self.ps_lock.acquire() 666 retval = self.reg_manager.unregister_service(service, caller_id, service_api) 667 mloginfo("-SERVICE [%s] %s %s", service, caller_id, service_api) 668 return retval 669 finally: 670 self.ps_lock.release()
671 672 ################################################################################## 673 # PUBLISH/SUBSCRIBE 674 675 @apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
676 - def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
677 """ 678 Subscribe the caller to the specified topic. In addition to receiving 679 a list of current publishers, the subscriber will also receive notifications 680 of new publishers via the publisherUpdate API. 681 @param caller_id: ROS caller id 682 @type caller_id: str 683 @param topic str: Fully-qualified name of topic to subscribe to. 684 @param topic_type: Datatype for topic. Must be a package-resource name, i.e. the .msg name. 685 @type topic_type: str 686 @param caller_api: XML-RPC URI of caller node for new publisher notifications 687 @type caller_api: str 688 @return: (code, message, publishers). Publishers is a list of XMLRPC API URIs 689 for nodes currently publishing the specified topic. 690 @rtype: (int, str, [str]) 691 """ 692 #NOTE: subscribers do not get to set topic type 693 try: 694 self.ps_lock.acquire() 695 self.reg_manager.register_subscriber(topic, caller_id, caller_api) 696 697 # ROS 1.1: subscriber can now set type if it is not already set 698 # - don't let '*' type squash valid typing 699 if not topic in self.topics_types and topic_type != rosgraph.names.ANYTYPE: 700 self.topics_types[topic] = topic_type 701 702 mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api) 703 pub_uris = self.publishers.get_apis(topic) 704 finally: 705 self.ps_lock.release() 706 return 1, "Subscribed to [%s]"%topic, pub_uris
707 708 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
709 - def unregisterSubscriber(self, caller_id, topic, caller_api):
710 """ 711 Unregister the caller as a publisher of the topic. 712 @param caller_id: ROS caller id 713 @type caller_id: str 714 @param topic: Fully-qualified name of topic to unregister. 715 @type topic: str 716 @param caller_api: API URI of service to unregister. Unregistration will only occur if current 717 registration matches. 718 @type caller_api: str 719 @return: (code, statusMessage, numUnsubscribed). 720 If numUnsubscribed is zero it means that the caller was not registered as a subscriber. 721 The call still succeeds as the intended final state is reached. 722 @rtype: (int, str, int) 723 """ 724 try: 725 self.ps_lock.acquire() 726 retval = self.reg_manager.unregister_subscriber(topic, caller_id, caller_api) 727 mloginfo("-SUB [%s] %s %s",topic, caller_id, caller_api) 728 return retval 729 finally: 730 self.ps_lock.release()
731 732 @apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
733 - def registerPublisher(self, caller_id, topic, topic_type, caller_api):
734 """ 735 Register the caller as a publisher the topic. 736 @param caller_id: ROS caller id 737 @type caller_id: str 738 @param topic: Fully-qualified name of topic to register. 739 @type topic: str 740 @param topic_type: Datatype for topic. Must be a 741 package-resource name, i.e. the .msg name. 742 @type topic_type: str 743 @param caller_api str: ROS caller XML-RPC API URI 744 @type caller_api: str 745 @return: (code, statusMessage, subscriberApis). 746 List of current subscribers of topic in the form of XMLRPC URIs. 747 @rtype: (int, str, [str]) 748 """ 749 #NOTE: we need topic_type for getPublishedTopics. 750 try: 751 self.ps_lock.acquire() 752 self.reg_manager.register_publisher(topic, caller_id, caller_api) 753 # don't let '*' type squash valid typing 754 if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types: 755 self.topics_types[topic] = topic_type 756 pub_uris = self.publishers.get_apis(topic) 757 sub_uris = self.subscribers.get_apis(topic) 758 self._notify_topic_subscribers(topic, pub_uris, sub_uris) 759 mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api) 760 sub_uris = self.subscribers.get_apis(topic) 761 finally: 762 self.ps_lock.release() 763 return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris
764 765 766 @apivalidate(0, (is_topic('topic'), is_api('caller_api')))
767 - def unregisterPublisher(self, caller_id, topic, caller_api):
768 """ 769 Unregister the caller as a publisher of the topic. 770 @param caller_id: ROS caller id 771 @type caller_id: str 772 @param topic: Fully-qualified name of topic to unregister. 773 @type topic: str 774 @param caller_api str: API URI of service to 775 unregister. Unregistration will only occur if current 776 registration matches. 777 @type caller_api: str 778 @return: (code, statusMessage, numUnregistered). 779 If numUnregistered is zero it means that the caller was not registered as a publisher. 780 The call still succeeds as the intended final state is reached. 781 @rtype: (int, str, int) 782 """ 783 try: 784 self.ps_lock.acquire() 785 retval = self.reg_manager.unregister_publisher(topic, caller_id, caller_api) 786 if retval[VAL]: 787 self._notify_topic_subscribers(topic, self.publishers.get_apis(topic), self.subscribers.get_apis(topic)) 788 mloginfo("-PUB [%s] %s %s",topic, caller_id, caller_api) 789 finally: 790 self.ps_lock.release() 791 return retval
792 793 ################################################################################## 794 # GRAPH STATE APIS 795 796 @apivalidate('', (valid_name('node'),))
797 - def lookupNode(self, caller_id, node_name):
798 """ 799 Get the XML-RPC URI of the node with the associated 800 name/caller_id. This API is for looking information about 801 publishers and subscribers. Use lookupService instead to lookup 802 ROS-RPC URIs. 803 @param caller_id: ROS caller id 804 @type caller_id: str 805 @param node: name of node to lookup 806 @type node: str 807 @return: (code, msg, URI) 808 @rtype: (int, str, str) 809 """ 810 try: 811 self.ps_lock.acquire() 812 node = self.reg_manager.get_node(node_name) 813 if node is not None: 814 retval = 1, "node api", node.api 815 else: 816 retval = -1, "unknown node [%s]"%node_name, '' 817 finally: 818 self.ps_lock.release() 819 return retval
820 821 @apivalidate(0, (empty_or_valid_name('subgraph'),))
822 - def getPublishedTopics(self, caller_id, subgraph):
823 """ 824 Get list of topics that can be subscribed to. This does not return topics that have no publishers. 825 See L{getSystemState()} to get more comprehensive list. 826 @param caller_id: ROS caller id 827 @type caller_id: str 828 @param subgraph: Restrict topic names to match within the specified subgraph. Subgraph namespace 829 is resolved relative to the caller's namespace. Use '' to specify all names. 830 @type subgraph: str 831 @return: (code, msg, [[topic1, type1]...[topicN, typeN]]) 832 @rtype: (int, str, [[str, str],]) 833 """ 834 try: 835 self.ps_lock.acquire() 836 # force subgraph to be a namespace with trailing slash 837 if subgraph and subgraph[-1] != rosgraph.names.SEP: 838 subgraph = subgraph + rosgraph.names.SEP 839 #we don't bother with subscribers as subscribers don't report topic types. also, the intended 840 #use case is for subscribe-by-topic-type 841 retval = [[t, self.topics_types[t]] for t in self.publishers.iterkeys() if t.startswith(subgraph)] 842 finally: 843 self.ps_lock.release() 844 return 1, "current topics", retval
845 846 @apivalidate([])
847 - def getTopicTypes(self, caller_id):
848 """ 849 Retrieve list topic names and their types. 850 @param caller_id: ROS caller id 851 @type caller_id: str 852 @rtype: (int, str, [[str,str]] ) 853 @return: (code, statusMessage, topicTypes). topicTypes is a list of [topicName, topicType] pairs. 854 """ 855 try: 856 self.ps_lock.acquire() 857 retval = list(self.topics_types.items()) 858 finally: 859 self.ps_lock.release() 860 return 1, "current system state", retval
861 862 @apivalidate([[],[], []])
863 - def getSystemState(self, caller_id):
864 """ 865 Retrieve list representation of system state (i.e. publishers, subscribers, and services). 866 @param caller_id: ROS caller id 867 @type caller_id: str 868 @rtype: (int, str, [[str,[str]], [str,[str]], [str,[str]]]) 869 @return: (code, statusMessage, systemState). 870 871 System state is in list representation:: 872 [publishers, subscribers, services]. 873 874 publishers is of the form:: 875 [ [topic1, [topic1Publisher1...topic1PublisherN]] ... ] 876 877 subscribers is of the form:: 878 [ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ] 879 880 services is of the form:: 881 [ [service1, [service1Provider1...service1ProviderN]] ... ] 882 """ 883 edges = [] 884 try: 885 self.ps_lock.acquire() 886 retval = [r.get_state() for r in (self.publishers, self.subscribers, self.services)] 887 finally: 888 self.ps_lock.release() 889 return 1, "current system state", retval 890