Package rosmaster :: Module registrations

Source Code for Module rosmaster.registrations

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from rosmaster.util import remove_server_proxy 
 36  from rosmaster.util import xmlrpcapi 
 37  import rosmaster.exceptions 
 38   
 39  """Data structures for representing registration data in the Master""" 
 40   
41 -class NodeRef(object):
42 """ 43 Container for node registration information. Used in master's 44 self.nodes data structure. This is effectively a reference 45 counter for the node registration information: when the 46 subscriptions and publications are empty the node registration can 47 be deleted. 48 """
49 - def __init__(self, id, api):
50 """ 51 ctor 52 @param api str: node XML-RPC API 53 """ 54 self.id = id 55 self.api = api 56 self.param_subscriptions = [] 57 self.topic_subscriptions = [] 58 self.topic_publications = [] 59 self.services = []
60
61 - def clear(self):
62 """ 63 Delete all state from this NodeRef except for the api location 64 """ 65 self.param_subscriptions = [] 66 self.topic_subscriptions = [] 67 self.topic_publications = [] 68 self.services = []
69
70 - def is_empty(self):
71 """ 72 @return: True if node has no active registrations 73 """ 74 return sum((len(x) for x in 75 [self.param_subscriptions, 76 self.topic_subscriptions, 77 self.topic_publications, 78 self.services,])) == 0
79
80 - def add(self, type_, key):
81 if type_ == Registrations.TOPIC_SUBSCRIPTIONS: 82 if not key in self.topic_subscriptions: 83 self.topic_subscriptions.append(key) 84 elif type_ == Registrations.TOPIC_PUBLICATIONS: 85 if not key in self.topic_publications: 86 self.topic_publications.append(key) 87 elif type_ == Registrations.SERVICE: 88 if not key in self.services: 89 self.services.append(key) 90 elif type_ == Registrations.PARAM_SUBSCRIPTIONS: 91 if not key in self.param_subscriptions: 92 self.param_subscriptions.append(key) 93 else: 94 raise rosmaster.exceptions.InternalException("internal bug")
95
96 - def remove(self, type_, key):
97 if type_ == Registrations.TOPIC_SUBSCRIPTIONS: 98 if key in self.topic_subscriptions: 99 self.topic_subscriptions.remove(key) 100 elif type_ == Registrations.TOPIC_PUBLICATIONS: 101 if key in self.topic_publications: 102 self.topic_publications.remove(key) 103 elif type_ == Registrations.SERVICE: 104 if key in self.services: 105 self.services.remove(key) 106 elif type_ == Registrations.PARAM_SUBSCRIPTIONS: 107 if key in self.param_subscriptions: 108 self.param_subscriptions.remove(key) 109 else: 110 raise rosmaster.exceptions.InternalException("internal bug")
111 112 # NOTE: I'm not terribly happy that this task has leaked into the data model. need 113 # to refactor to get this back into masterslave. 114
115 -def shutdown_node_task(api, caller_id, reason):
116 """ 117 Method to shutdown another ROS node. Generally invoked within a 118 separate thread as this is used to cleanup hung nodes. 119 120 @param api: XML-RPC API of node to shutdown 121 @type api: str 122 @param caller_id: name of node being shutdown 123 @type caller_id: str 124 @param reason: human-readable reason why node is being shutdown 125 @type reason: str 126 """ 127 try: 128 xmlrpcapi(api).shutdown('/master', reason) 129 except: 130 pass #expected in many common cases 131 remove_server_proxy(api)
132
133 -class Registrations(object):
134 """ 135 All calls may result in access/modifications to node registrations 136 dictionary, so be careful to guarantee appropriate thread-safeness. 137 138 Data structure for storing a set of registrations (e.g. publications, services). 139 The underlying data storage is the same except for services, which have the 140 constraint that only one registration may be active for a given key. 141 """ 142 143 TOPIC_SUBSCRIPTIONS = 1 144 TOPIC_PUBLICATIONS = 2 145 SERVICE = 3 146 PARAM_SUBSCRIPTIONS = 4 147
148 - def __init__(self, type_):
149 """ 150 ctor. 151 @param type_: one of [ TOPIC_SUBSCRIPTIONS, 152 TOPIC_PUBLICATIONS, SERVICE, PARAM_SUBSCRIPTIONS ] 153 @type type_: int 154 """ 155 if not type_ in [ 156 Registrations.TOPIC_SUBSCRIPTIONS, 157 Registrations.TOPIC_PUBLICATIONS, 158 Registrations.SERVICE, 159 Registrations.PARAM_SUBSCRIPTIONS ]: 160 raise rosmaster.exceptions.InternalException("invalid registration type: %s"%type_) 161 self.type = type_ 162 ## { key: [(caller_id, caller_api)] } 163 self.map = {} 164 self.service_api_map = None
165
166 - def __nonzero__(self):
167 """ 168 @return: True if there are no registrations 169 """ 170 return len(self.map) != 0
171
172 - def iterkeys(self):
173 """ 174 Iterate over registration keys 175 @return: iterator for registration keys 176 """ 177 return self.map.iterkeys()
178
179 - def get_service_api(self, service):
180 """ 181 Lookup service API URI. NOTE: this should only be valid if type==SERVICE as 182 service Registrations instances are the only ones that track service API URIs. 183 @param service: service name 184 @type service: str 185 @return str: service_api for registered key or None if 186 registration is no longer valid. 187 @type: str 188 """ 189 if self.service_api_map and service in self.service_api_map: 190 caller_id, service_api = self.service_api_map[service] 191 return service_api 192 return None
193
194 - def get_apis(self, key):
195 """ 196 Only valid if self.type != SERVICE. 197 @param key: registration key (e.g. topic/service/param name) 198 @type key: str 199 @return: caller_apis for registered key, empty list if registration is not valid 200 @rtype: [str] 201 """ 202 return [api for _, api in self.map.get(key, [])]
203
204 - def __contains__(self, key):
205 """ 206 Emulate mapping type for has_key() 207 """ 208 return key in self.map
209
210 - def __getitem__(self, key):
211 """ 212 @param key: registration key (e.g. topic/service/param name) 213 @type key: str 214 @return: (caller_id, caller_api) for registered 215 key, empty list if registration is not valid 216 @rtype: [(str, str),] 217 """ 218 # unlike get_apis, returns the caller_id to prevent any race 219 # conditions that can occur if caller_id/caller_apis change 220 # due to a new node. 221 return self.map.get(key, [])
222
223 - def has_key(self, key):
224 """ 225 @param key: registration key (e.g. topic/service/param name) 226 @type key: str 227 @return: True if key is registered 228 @rtype: bool 229 """ 230 return key in self.map
231
232 - def get_state(self):
233 """ 234 @return: state in getSystemState()-friendly format [ [key, [callerId1...callerIdN]] ... ] 235 @rtype: [str, [str]...] 236 """ 237 retval = [] 238 for k in self.map.iterkeys(): 239 retval.append([k, [id for id, _ in self.map[k]]]) 240 return retval
241
242 - def register(self, key, caller_id, caller_api, service_api=None):
243 """ 244 Add caller_id into the map as a provider of the specified 245 service (key). caller_id must not have been previously 246 registered with a different caller_api. 247 248 Subroutine for managing provider map data structure (essentially a multimap). 249 @param key: registration key (e.g. topic/service/param name) 250 @type key: str 251 @param caller_id: caller_id of provider 252 @type caller_id: str 253 @param caller_api: API URI of provider 254 @type caller_api: str 255 @param service_api: (keyword) ROS service API URI if registering a service 256 @type service_api: str 257 """ 258 map = self.map 259 if key in map and not service_api: 260 providers = map[key] 261 if not (caller_id, caller_api) in providers: 262 providers.append((caller_id, caller_api)) 263 else: 264 map[key] = providers = [(caller_id, caller_api)] 265 266 if service_api: 267 if self.service_api_map is None: 268 self.service_api_map = {} 269 self.service_api_map[key] = (caller_id, service_api) 270 elif self.type == Registrations.SERVICE: 271 raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE")
272
273 - def unregister_all(self, caller_id):
274 """ 275 Remove all registrations associated with caller_id 276 @param caller_id: caller_id of provider 277 @type caller_id: str 278 """ 279 map = self.map 280 # fairly expensive 281 dead_keys = [] 282 for key in map: 283 providers = map[key] 284 # find all matching entries 285 to_remove = [(id, api) for id, api in providers if id == caller_id] 286 # purge them 287 for r in to_remove: 288 providers.remove(r) 289 if not providers: 290 dead_keys.append(key) 291 for k in dead_keys: 292 del self.map[k] 293 if self.type == Registrations.SERVICE and self.service_api_map: 294 del dead_keys[:] 295 for key, val in self.service_api_map.iteritems(): 296 if val[0] == caller_id: 297 dead_keys.append(key) 298 for k in dead_keys: 299 del self.service_api_map[k]
300
301 - def unregister(self, key, caller_id, caller_api, service_api=None):
302 """ 303 Remove caller_id from the map as a provider of the specified service (key). 304 Subroutine for managing provider map data structure, essentially a multimap 305 @param key: registration key (e.g. topic/service/param name) 306 @type key: str 307 @param caller_id: caller_id of provider 308 @type caller_id: str 309 @param caller_api: API URI of provider 310 @type caller_api: str 311 @param service_api: (keyword) ROS service API URI if registering a service 312 @type service_api: str 313 @return: for ease of master integration, directly returns unregister value for 314 higher-level XMLRPC API. val is the number of APIs unregistered (0 or 1) 315 @rtype: code, msg, val 316 """ 317 # if we are unregistering a topic, validate against the caller_api 318 if service_api: 319 # validate against the service_api 320 if self.service_api_map is None: 321 return 1, "[%s] is not a provider of [%s]"%(caller_id, key), 0 322 if self.service_api_map.get(key, None) != (caller_id, service_api): 323 return 1, "[%s] is no longer the current service api handle for [%s]"%(service_api, key), 0 324 else: 325 del self.service_api_map[key] 326 del self.map[key] 327 # caller_api is None for unregister service, so we can't validate as well 328 return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1 329 elif self.type == Registrations.SERVICE: 330 raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE") 331 else: 332 providers = self.map.get(key, []) 333 if (caller_id, caller_api) in providers: 334 providers.remove((caller_id, caller_api)) 335 if not providers: 336 del self.map[key] 337 return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1 338 else: 339 return 1, "[%s] is not a known provider of [%s]"%(caller_id, key), 0
340
341 -class RegistrationManager(object):
342 """ 343 Stores registrations for Master. 344 345 RegistrationManager is not threadsafe, so access must be externally locked as appropriate 346 """ 347
348 - def __init__(self, thread_pool):
349 """ 350 ctor. 351 @param thread_pool: thread pool for queueing tasks 352 @type thread_pool: ThreadPool 353 """ 354 self.nodes = {} 355 self.thread_pool = thread_pool 356 357 self.publishers = Registrations(Registrations.TOPIC_PUBLICATIONS) 358 self.subscribers = Registrations(Registrations.TOPIC_SUBSCRIPTIONS) 359 self.services = Registrations(Registrations.SERVICE) 360 self.param_subscribers = Registrations(Registrations.PARAM_SUBSCRIPTIONS)
361 362
363 - def reverse_lookup(self, caller_api):
364 """ 365 Get a NodeRef by caller_api 366 @param caller_api: caller XML RPC URI 367 @type caller_api: str 368 @return: nodes that declare caller_api as their 369 API. 99.9% of the time this should only be one node, but we 370 allow for multiple matches as the master API does not restrict 371 this. 372 @rtype: [NodeRef] 373 """ 374 matches = [n for n in self.nodes.iteritems() if n.api == caller_api] 375 if matches: 376 return matches
377
378 - def get_node(self, caller_id):
379 return self.nodes.get(caller_id, None)
380
381 - def _register(self, r, key, caller_id, caller_api, service_api=None):
382 # update node information 383 node_ref, changed = self._register_node_api(caller_id, caller_api) 384 node_ref.add(r.type, key) 385 # update pub/sub/service indicies 386 if changed: 387 self.publishers.unregister_all(caller_id) 388 self.subscribers.unregister_all(caller_id) 389 self.services.unregister_all(caller_id) 390 self.param_subscribers.unregister_all(caller_id) 391 r.register(key, caller_id, caller_api, service_api)
392
393 - def _unregister(self, r, key, caller_id, caller_api, service_api=None):
394 node_ref = self.nodes.get(caller_id, None) 395 if node_ref != None: 396 retval = r.unregister(key, caller_id, caller_api, service_api) 397 # check num removed field, if 1, unregister is valid 398 if retval[2] == 1: 399 node_ref.remove(r.type, key) 400 if node_ref.is_empty(): 401 del self.nodes[caller_id] 402 else: 403 retval = 1, "[%s] is not a registered node"%caller_id, 0 404 return retval
405
406 - def register_service(self, service, caller_id, caller_api, service_api):
407 """ 408 Register service provider 409 @return: None 410 """ 411 self._register(self.services, service, caller_id, caller_api, service_api)
412 - def register_publisher(self, topic, caller_id, caller_api):
413 """ 414 Register topic publisher 415 @return: None 416 """ 417 self._register(self.publishers, topic, caller_id, caller_api)
418 - def register_subscriber(self, topic, caller_id, caller_api):
419 """ 420 Register topic subscriber 421 @return: None 422 """ 423 self._register(self.subscribers, topic, caller_id, caller_api)
424 - def register_param_subscriber(self, param, caller_id, caller_api):
425 """ 426 Register param subscriber 427 @return: None 428 """ 429 self._register(self.param_subscribers, param, caller_id, caller_api)
430
431 - def unregister_service(self, service, caller_id, service_api):
432 caller_api = None 433 return self._unregister(self.services, service, caller_id, caller_api, service_api)
434
435 - def unregister_subscriber(self, topic, caller_id, caller_api):
436 return self._unregister(self.subscribers, topic, caller_id, caller_api)
437 - def unregister_publisher(self, topic, caller_id, caller_api):
438 return self._unregister(self.publishers, topic, caller_id, caller_api)
439 - def unregister_param_subscriber(self, param, caller_id, caller_api):
440 return self._unregister(self.param_subscribers, param, caller_id, caller_api)
441
442 - def _register_node_api(self, caller_id, caller_api):
443 """ 444 @param caller_id: caller_id of provider 445 @type caller_id: str 446 @param caller_api: caller_api of provider 447 @type caller_api: str 448 @return: (registration_information, changed_registration). changed_registration is true if 449 caller_api is differet than the one registered with caller_id 450 @rtype: (NodeRef, bool) 451 """ 452 node_ref = self.nodes.get(caller_id, None) 453 454 bumped_api = None 455 if node_ref is not None: 456 if node_ref.api == caller_api: 457 return node_ref, False 458 else: 459 bumped_api = node_ref.api 460 self.thread_pool.queue_task(bumped_api, shutdown_node_task, 461 (bumped_api, caller_id, "new node registered with same name")) 462 463 node_ref = NodeRef(caller_id, caller_api) 464 self.nodes[caller_id] = node_ref 465 return (node_ref, bumped_api != None)
466