Package rosmaster :: Module registrations

Source Code for Module rosmaster.registrations

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from rosmaster.util import remove_server_proxy 
 36  from rosmaster.util import xmlrpcapi 
 37  import rosmaster.exceptions 
 38   
 39  """Data structures for representing registration data in the Master""" 
 40   
41 -class NodeRef(object):
42 """ 43 Container for node registration information. Used in master's 44 self.nodes data structure. This is effectively a reference 45 counter for the node registration information: when the 46 subscriptions and publications are empty the node registration can 47 be deleted. 48 """
49 - def __init__(self, id, api):
50 """ 51 ctor 52 @param api str: node XML-RPC API 53 """ 54 self.id = id 55 self.api = api 56 self.param_subscriptions = [] 57 self.topic_subscriptions = [] 58 self.topic_publications = [] 59 self.services = []
60
61 - def clear(self):
62 """ 63 Delete all state from this NodeRef except for the api location 64 """ 65 self.param_subscriptions = [] 66 self.topic_subscriptions = [] 67 self.topic_publications = [] 68 self.services = []
69
70 - def is_empty(self):
71 """ 72 @return: True if node has no active registrations 73 """ 74 return sum((len(x) for x in 75 [self.param_subscriptions, 76 self.topic_subscriptions, 77 self.topic_publications, 78 self.services,])) == 0
79
80 - def add(self, type_, key):
81 if type_ == Registrations.TOPIC_SUBSCRIPTIONS: 82 if not key in self.topic_subscriptions: 83 self.topic_subscriptions.append(key) 84 elif type_ == Registrations.TOPIC_PUBLICATIONS: 85 if not key in self.topic_publications: 86 self.topic_publications.append(key) 87 elif type_ == Registrations.SERVICE: 88 if not key in self.services: 89 self.services.append(key) 90 elif type_ == Registrations.PARAM_SUBSCRIPTIONS: 91 if not key in self.param_subscriptions: 92 self.param_subscriptions.append(key) 93 else: 94 raise rosmaster.exceptions.InternalException("internal bug")
95
96 - def remove(self, type_, key):
97 if type_ == Registrations.TOPIC_SUBSCRIPTIONS: 98 if key in self.topic_subscriptions: 99 self.topic_subscriptions.remove(key) 100 elif type_ == Registrations.TOPIC_PUBLICATIONS: 101 if key in self.topic_publications: 102 self.topic_publications.remove(key) 103 elif type_ == Registrations.SERVICE: 104 if key in self.services: 105 self.services.remove(key) 106 elif type_ == Registrations.PARAM_SUBSCRIPTIONS: 107 if key in self.param_subscriptions: 108 self.param_subscriptions.remove(key) 109 else: 110 raise rosmaster.exceptions.InternalException("internal bug")
111 112 # NOTE: I'm not terribly happy that this task has leaked into the data model. need 113 # to refactor to get this back into masterslave. 114
115 -def shutdown_node_task(api, caller_id, reason):
116 """ 117 Method to shutdown another ROS node. Generally invoked within a 118 separate thread as this is used to cleanup hung nodes. 119 120 @param api: XML-RPC API of node to shutdown 121 @type api: str 122 @param caller_id: name of node being shutdown 123 @type caller_id: str 124 @param reason: human-readable reason why node is being shutdown 125 @type reason: str 126 """ 127 try: 128 xmlrpcapi(api).shutdown('/master', "[{}] Reason: {}".format(caller_id, reason)) 129 except: 130 pass #expected in many common cases 131 remove_server_proxy(api)
132
133 -class Registrations(object):
134 """ 135 All calls may result in access/modifications to node registrations 136 dictionary, so be careful to guarantee appropriate thread-safeness. 137 138 Data structure for storing a set of registrations (e.g. publications, services). 139 The underlying data storage is the same except for services, which have the 140 constraint that only one registration may be active for a given key. 141 """ 142 143 TOPIC_SUBSCRIPTIONS = 1 144 TOPIC_PUBLICATIONS = 2 145 SERVICE = 3 146 PARAM_SUBSCRIPTIONS = 4 147
148 - def __init__(self, type_):
149 """ 150 ctor. 151 @param type_: one of [ TOPIC_SUBSCRIPTIONS, 152 TOPIC_PUBLICATIONS, SERVICE, PARAM_SUBSCRIPTIONS ] 153 @type type_: int 154 """ 155 if not type_ in [ 156 Registrations.TOPIC_SUBSCRIPTIONS, 157 Registrations.TOPIC_PUBLICATIONS, 158 Registrations.SERVICE, 159 Registrations.PARAM_SUBSCRIPTIONS ]: 160 raise rosmaster.exceptions.InternalException("invalid registration type: %s"%type_) 161 self.type = type_ 162 ## { key: [(caller_id, caller_api)] } 163 self.map = {} 164 self.service_api_map = None
165
166 - def __bool__(self):
167 """ 168 @return: True if there are registrations 169 """ 170 return len(self.map) != 0
171
172 - def __nonzero__(self):
173 """ 174 @return: True if there are registrations 175 """ 176 return len(self.map) != 0
177
178 - def iterkeys(self):
179 """ 180 Iterate over registration keys 181 @return: iterator for registration keys 182 """ 183 return self.map.keys()
184
185 - def get_service_api(self, service):
186 """ 187 Lookup service API URI. NOTE: this should only be valid if type==SERVICE as 188 service Registrations instances are the only ones that track service API URIs. 189 @param service: service name 190 @type service: str 191 @return str: service_api for registered key or None if 192 registration is no longer valid. 193 @type: str 194 """ 195 if self.service_api_map and service in self.service_api_map: 196 caller_id, service_api = self.service_api_map[service] 197 return service_api 198 return None
199
200 - def get_apis(self, key):
201 """ 202 Only valid if self.type != SERVICE. 203 @param key: registration key (e.g. topic/service/param name) 204 @type key: str 205 @return: caller_apis for registered key, empty list if registration is not valid 206 @rtype: [str] 207 """ 208 return [api for _, api in self.map.get(key, [])]
209
210 - def __contains__(self, key):
211 """ 212 Emulate mapping type for has_key() 213 """ 214 return key in self.map
215
216 - def __getitem__(self, key):
217 """ 218 @param key: registration key (e.g. topic/service/param name) 219 @type key: str 220 @return: (caller_id, caller_api) for registered 221 key, empty list if registration is not valid 222 @rtype: [(str, str),] 223 """ 224 # unlike get_apis, returns the caller_id to prevent any race 225 # conditions that can occur if caller_id/caller_apis change 226 # due to a new node. 227 return self.map.get(key, [])
228
229 - def has_key(self, key):
230 """ 231 @param key: registration key (e.g. topic/service/param name) 232 @type key: str 233 @return: True if key is registered 234 @rtype: bool 235 """ 236 return key in self.map
237
238 - def get_state(self):
239 """ 240 @return: state in getSystemState()-friendly format [ [key, [callerId1...callerIdN]] ... ] 241 @rtype: [str, [str]...] 242 """ 243 retval = [] 244 for k in self.map.keys(): 245 retval.append([k, [id for id, _ in self.map[k]]]) 246 return retval
247
248 - def register(self, key, caller_id, caller_api, service_api=None):
249 """ 250 Add caller_id into the map as a provider of the specified 251 service (key). caller_id must not have been previously 252 registered with a different caller_api. 253 254 Subroutine for managing provider map data structure (essentially a multimap). 255 @param key: registration key (e.g. topic/service/param name) 256 @type key: str 257 @param caller_id: caller_id of provider 258 @type caller_id: str 259 @param caller_api: API URI of provider 260 @type caller_api: str 261 @param service_api: (keyword) ROS service API URI if registering a service 262 @type service_api: str 263 """ 264 map = self.map 265 if key in map and not service_api: 266 providers = map[key] 267 if not (caller_id, caller_api) in providers: 268 providers.append((caller_id, caller_api)) 269 else: 270 map[key] = providers = [(caller_id, caller_api)] 271 272 if service_api: 273 if self.service_api_map is None: 274 self.service_api_map = {} 275 self.service_api_map[key] = (caller_id, service_api) 276 elif self.type == Registrations.SERVICE: 277 raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE")
278
279 - def unregister_all(self, caller_id):
280 """ 281 Remove all registrations associated with caller_id 282 @param caller_id: caller_id of provider 283 @type caller_id: str 284 """ 285 map = self.map 286 # fairly expensive 287 dead_keys = [] 288 for key in map: 289 providers = map[key] 290 # find all matching entries 291 to_remove = [(id, api) for id, api in providers if id == caller_id] 292 # purge them 293 for r in to_remove: 294 providers.remove(r) 295 if not providers: 296 dead_keys.append(key) 297 for k in dead_keys: 298 del self.map[k] 299 if self.type == Registrations.SERVICE and self.service_api_map: 300 del dead_keys[:] 301 for key, val in self.service_api_map.items(): 302 if val[0] == caller_id: 303 dead_keys.append(key) 304 for k in dead_keys: 305 del self.service_api_map[k]
306
307 - def unregister(self, key, caller_id, caller_api, service_api=None):
308 """ 309 Remove caller_id from the map as a provider of the specified service (key). 310 Subroutine for managing provider map data structure, essentially a multimap 311 @param key: registration key (e.g. topic/service/param name) 312 @type key: str 313 @param caller_id: caller_id of provider 314 @type caller_id: str 315 @param caller_api: API URI of provider 316 @type caller_api: str 317 @param service_api: (keyword) ROS service API URI if registering a service 318 @type service_api: str 319 @return: for ease of master integration, directly returns unregister value for 320 higher-level XMLRPC API. val is the number of APIs unregistered (0 or 1) 321 @rtype: code, msg, val 322 """ 323 # if we are unregistering a topic, validate against the caller_api 324 if service_api: 325 # validate against the service_api 326 if self.service_api_map is None: 327 return 1, "[%s] is not a provider of [%s]"%(caller_id, key), 0 328 if self.service_api_map.get(key, None) != (caller_id, service_api): 329 return 1, "[%s] is no longer the current service api handle for [%s]"%(service_api, key), 0 330 else: 331 del self.service_api_map[key] 332 del self.map[key] 333 # caller_api is None for unregister service, so we can't validate as well 334 return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1 335 elif self.type == Registrations.SERVICE: 336 raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE") 337 else: 338 providers = self.map.get(key, []) 339 if (caller_id, caller_api) in providers: 340 providers.remove((caller_id, caller_api)) 341 if not providers: 342 del self.map[key] 343 return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1 344 else: 345 return 1, "[%s] is not a known provider of [%s]"%(caller_id, key), 0
346
347 -class RegistrationManager(object):
348 """ 349 Stores registrations for Master. 350 351 RegistrationManager is not threadsafe, so access must be externally locked as appropriate 352 """ 353
354 - def __init__(self, thread_pool):
355 """ 356 ctor. 357 @param thread_pool: thread pool for queueing tasks 358 @type thread_pool: ThreadPool 359 """ 360 self.nodes = {} 361 self.thread_pool = thread_pool 362 363 self.publishers = Registrations(Registrations.TOPIC_PUBLICATIONS) 364 self.subscribers = Registrations(Registrations.TOPIC_SUBSCRIPTIONS) 365 self.services = Registrations(Registrations.SERVICE) 366 self.param_subscribers = Registrations(Registrations.PARAM_SUBSCRIPTIONS)
367 368
369 - def reverse_lookup(self, caller_api):
370 """ 371 Get a NodeRef by caller_api 372 @param caller_api: caller XML RPC URI 373 @type caller_api: str 374 @return: nodes that declare caller_api as their 375 API. 99.9% of the time this should only be one node, but we 376 allow for multiple matches as the master API does not restrict 377 this. 378 @rtype: [NodeRef] 379 """ 380 matches = [n for n in self.nodes.items() if n.api == caller_api] 381 if matches: 382 return matches
383
384 - def get_node(self, caller_id):
385 return self.nodes.get(caller_id, None)
386
387 - def _register(self, r, key, caller_id, caller_api, service_api=None):
388 # update node information 389 node_ref, changed = self._register_node_api(caller_id, caller_api) 390 node_ref.add(r.type, key) 391 # update pub/sub/service indicies 392 if changed: 393 self.publishers.unregister_all(caller_id) 394 self.subscribers.unregister_all(caller_id) 395 self.services.unregister_all(caller_id) 396 self.param_subscribers.unregister_all(caller_id) 397 r.register(key, caller_id, caller_api, service_api)
398
399 - def _unregister(self, r, key, caller_id, caller_api, service_api=None):
400 node_ref = self.nodes.get(caller_id, None) 401 if node_ref != None: 402 retval = r.unregister(key, caller_id, caller_api, service_api) 403 # check num removed field, if 1, unregister is valid 404 if retval[2] == 1: 405 node_ref.remove(r.type, key) 406 if node_ref.is_empty(): 407 del self.nodes[caller_id] 408 else: 409 retval = 1, "[%s] is not a registered node"%caller_id, 0 410 return retval
411
412 - def register_service(self, service, caller_id, caller_api, service_api):
413 """ 414 Register service provider 415 @return: None 416 """ 417 self._register(self.services, service, caller_id, caller_api, service_api)
418 - def register_publisher(self, topic, caller_id, caller_api):
419 """ 420 Register topic publisher 421 @return: None 422 """ 423 self._register(self.publishers, topic, caller_id, caller_api)
424 - def register_subscriber(self, topic, caller_id, caller_api):
425 """ 426 Register topic subscriber 427 @return: None 428 """ 429 self._register(self.subscribers, topic, caller_id, caller_api)
430 - def register_param_subscriber(self, param, caller_id, caller_api):
431 """ 432 Register param subscriber 433 @return: None 434 """ 435 self._register(self.param_subscribers, param, caller_id, caller_api)
436
437 - def unregister_service(self, service, caller_id, service_api):
438 caller_api = None 439 return self._unregister(self.services, service, caller_id, caller_api, service_api)
440
441 - def unregister_subscriber(self, topic, caller_id, caller_api):
442 return self._unregister(self.subscribers, topic, caller_id, caller_api)
443 - def unregister_publisher(self, topic, caller_id, caller_api):
444 return self._unregister(self.publishers, topic, caller_id, caller_api)
445 - def unregister_param_subscriber(self, param, caller_id, caller_api):
446 return self._unregister(self.param_subscribers, param, caller_id, caller_api)
447
448 - def _register_node_api(self, caller_id, caller_api):
449 """ 450 @param caller_id: caller_id of provider 451 @type caller_id: str 452 @param caller_api: caller_api of provider 453 @type caller_api: str 454 @return: (registration_information, changed_registration). changed_registration is true if 455 caller_api is differet than the one registered with caller_id 456 @rtype: (NodeRef, bool) 457 """ 458 node_ref = self.nodes.get(caller_id, None) 459 460 bumped_api = None 461 if node_ref is not None: 462 if node_ref.api == caller_api: 463 return node_ref, False 464 else: 465 bumped_api = node_ref.api 466 self.thread_pool.queue_task(bumped_api, shutdown_node_task, 467 (bumped_api, caller_id, "new node registered with same name")) 468 469 node_ref = NodeRef(caller_id, caller_api) 470 self.nodes[caller_id] = node_ref 471 return (node_ref, bumped_api != None)
472