Package rocon_gateway :: Module gateway
[frames] | no frames]

Source Code for Module rocon_gateway.gateway

  1  #!/usr/bin/env python 
  2  # 
  3  # License: BSD 
  4  #   https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro-devel/rocon_gateway/LICENSE 
  5  # 
  6   
  7  ############################################################################### 
  8  # Imports 
  9  ############################################################################### 
 10   
 11  import rospy 
 12  import gateway_msgs.msg as gateway_msgs 
 13  import gateway_msgs.srv as gateway_srvs 
 14   
 15  # local imports 
 16  import utils 
 17  import ros_parameters 
 18  from .watcher_thread import WatcherThread 
 19  from .flipped_interface import FlippedInterface 
 20  from .public_interface import PublicInterface 
 21  from .pulled_interface import PulledInterface 
 22  from .master_api import LocalMaster 
 23   
 24  ############################################################################### 
 25  # Thread 
 26  ############################################################################### 
 27   
 28   
29 -class Gateway(object):
30 ''' 31 Used to synchronise with hubs. 32 '''
33 - def __init__(self, hub_manager, param, unique_name, publish_gateway_info_callback):
34 ''' 35 @param hub_manager : container for all the hubs this gateway connects to 36 @type hub_api.HubManmager 37 38 @param param : parameters set by ros_parameters.py 39 @type : dictionary of parameter key-value pairs 40 41 @param unique_name : gateway name (param['name']) with unique uuid hash appended 42 43 @param publish_gateway_info_callback : callback for publishing gateway info 44 ''' 45 self.hub_manager = hub_manager 46 self.master = LocalMaster() 47 self.ip = self.master.get_ros_ip() 48 self._param = param 49 self._unique_name = unique_name 50 self._publish_gateway_info = publish_gateway_info_callback 51 default_rule_blacklist = ros_parameters.generate_rules(self._param["default_blacklist"]) 52 default_rules, all_targets = ros_parameters.generate_remote_rules(self._param["default_flips"]) 53 self.flipped_interface = FlippedInterface( 54 firewall=self._param['firewall'], 55 default_rule_blacklist=default_rule_blacklist, 56 default_rules=default_rules, 57 all_targets=all_targets) 58 default_rules, all_targets = ros_parameters.generate_remote_rules(self._param["default_pulls"]) 59 self.pulled_interface = PulledInterface(default_rule_blacklist=default_rule_blacklist, 60 default_rules=default_rules, 61 all_targets=all_targets) 62 self.public_interface = PublicInterface(default_rule_blacklist=default_rule_blacklist, 63 default_rules=ros_parameters.generate_rules(self._param['default_advertisements']) 64 ) 65 if self._param['advertise_all']: 66 self.public_interface.advertise_all([]) # no extra blacklist beyond the default (keeping it simple in yaml for now) 67 self.remote_gateway_request_callbacks = {} 68 self.remote_gateway_request_callbacks['flip'] = self.process_remote_gateway_flip_request 69 self.remote_gateway_request_callbacks['unflip'] = self.process_remote_gateway_unflip_request 70 71 self.watcher_thread = WatcherThread(self, self._param['watch_loop_period'])
72
73 - def spin(self):
74 self.watcher_thread.start()
75
76 - def shutdown(self):
77 for connection_type in utils.connection_types: 78 for flip in self.flipped_interface.flipped[connection_type]: 79 self.hub_manager.send_unflip_request(flip.gateway, flip.rule) 80 for registration in self.flipped_interface.registrations[connection_type]: 81 self.master.unregister(registration) 82 for registration in self.pulled_interface.registrations[connection_type]: 83 self.master.unregister(registration)
84
85 - def is_connected(self):
86 ''' 87 We often check if we're connected to any hubs often just to ensure we 88 don't waste time processing if there is no-one listening. 89 90 @return True if at least one hub is connected, False otherwise 91 @rtype Bool 92 ''' 93 return self.hub_manager.is_connected()
94
95 - def disengage_hub(self, hub):
96 ''' 97 Disengage from the specified hub. Don't actually need to clean up connections 98 here like we do in shutdown - that can be handled from the watcher thread itself. 99 100 @param hub : the hub that will be deleted. 101 ''' 102 self.hub_manager.disengage_hub(hub)
103 104 ########################################################################## 105 # Update interface states (jobs assigned from watcher thread) 106 ########################################################################## 107
108 - def update_flipped_interface(self, local_connection_index, remote_gateway_hub_index):
109 ''' 110 Process the list of local connections and check against 111 the current flip rules and patterns for changes. If a rule 112 has become (un)available take appropriate action. 113 114 @param local_connection_index : list of current local connections parsed from the master 115 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections 116 117 @param gateways : list of remote gateway string id's 118 @type string 119 ''' 120 state_changed = False 121 new_flips, lost_flips = self.flipped_interface.update(local_connection_index, remote_gateway_hub_index, self._unique_name) 122 for connection_type in utils.connection_types: 123 for flip in new_flips[connection_type]: 124 firewall_flag = self.hub_manager.get_remote_gateway_firewall_flag(flip.gateway) 125 if firewall_flag == True: 126 continue 127 state_changed = True 128 # for actions, need to post flip details here 129 connections = self.master.generate_connection_details(flip.rule.type, flip.rule.name, flip.rule.node) 130 if connection_type == utils.ConnectionType.ACTION_CLIENT or connection_type == utils.ConnectionType.ACTION_SERVER: 131 rospy.loginfo("Gateway : sending flip request [%s]%s" % (flip.gateway, utils.format_rule(flip.rule))) 132 hub = remote_gateway_hub_index[flip.gateway][0] 133 hub.post_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node) 134 for connection in connections: 135 hub.send_flip_request(flip.gateway, connection) # flip the individual pubs/subs 136 else: 137 for connection in connections: 138 rospy.loginfo("Gateway : sending flip request [%s]%s" % (flip.gateway, utils.format_rule(connection.rule))) 139 hub = remote_gateway_hub_index[flip.gateway][0] 140 hub.send_flip_request(flip.gateway, connection) 141 hub.post_flip_details(flip.gateway, connection.rule.name, connection.rule.type, connection.rule.node) 142 for flip in lost_flips[connection_type]: 143 state_changed = True 144 rospy.loginfo("Gateway : sending unflip request [%s]%s" % (flip.gateway, utils.format_rule(flip.rule))) 145 hub = remote_gateway_hub_index[flip.gateway][0] # first one should be enough 146 hub.send_unflip_request(flip.gateway, flip.rule) 147 hub.remove_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node) 148 if state_changed: 149 self._publish_gateway_info()
150
151 - def update_pulled_interface(self, unused_connections, remote_gateway_hub_index):
152 ''' 153 Process the list of local connections and check against 154 the current pull rules and patterns for changes. If a rule 155 has become (un)available take appropriate action. 156 157 This is called by the watcher thread. The remote_gateway_hub_index 158 is always a full dictionary of all remote gateway and hub key-value 159 pairs - it is only included as an argument here to save 160 processing doubly in the watcher thread. 161 162 @param connections : list of current local connections parsed from the master 163 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections 164 165 @param remote_gateway_hub_index : key-value remote gateway name-hub list pairs 166 @type dictionary of remote_gateway_name-list of hub_api.Hub objects key-value pairs 167 ''' 168 state_changed = False 169 remote_connections = {} 170 for remote_gateway in remote_gateway_hub_index.keys() + self.pulled_interface.list_remote_gateway_names(): 171 # this should probably be better....we *should* only need one hub's info, but things could 172 # go very wrong here - keep an eye on it. 173 try: 174 hub = remote_gateway_hub_index[remote_gateway][0] 175 remote_connections[remote_gateway] = hub.get_remote_connection_state(remote_gateway) 176 except KeyError: 177 pass # remote gateway no longer exists on the hub network 178 new_pulls, lost_pulls = self.pulled_interface.update(remote_connections, self._unique_name) 179 for connection_type in utils.connection_types: 180 for pull in new_pulls[connection_type]: 181 # dig out the corresponding connection (bit inefficient plouging back into this again 182 connection = None 183 for remote_gateway in remote_connections.keys(): 184 for c in remote_connections[remote_gateway][pull.rule.type]: 185 if c.rule.name == pull.rule.name and \ 186 c.rule.node == pull.rule.node: 187 connection = c 188 break 189 if connection: 190 break 191 # Register this pull 192 existing_registration = self.pulled_interface.find_registration_match(pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type) 193 if not existing_registration: 194 rospy.loginfo("Gateway : pulling in connection %s[%s]" % (utils.format_rule(pull.rule), remote_gateway)) 195 registration = utils.Registration(connection, pull.gateway) 196 new_registration = self.master.register(registration) 197 self.pulled_interface.registrations[registration.connection.rule.type].append(new_registration) 198 hub = remote_gateway_hub_index[pull.gateway][0] 199 hub.post_pull_details(pull.gateway, pull.rule.name, pull.rule.type, pull.rule.node) 200 state_changed = True 201 for pull in lost_pulls[connection_type]: 202 # Unregister this pull 203 existing_registration = self.pulled_interface.find_registration_match(pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type) 204 if existing_registration: 205 rospy.loginfo("Gateway : abandoning pulled connection %s[%s]" % (utils.format_rule(pull.rule), pull.gateway)) 206 self.master.unregister(existing_registration) 207 if hub: 208 hub.remove_pull_details(pull.gateway, pull.rule.name, pull.rule.type, pull.rule.node) 209 self.pulled_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration) 210 state_changed = True 211 if state_changed: 212 self._publish_gateway_info()
213
214 - def update_public_interface(self, local_connection_index):
215 ''' 216 Process the list of local connections and check against 217 the current rules and patterns for changes. If a rule 218 has become (un)available take appropriate action. 219 220 @param local_connection_index : list of current local connections parsed from the master 221 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections 222 ''' 223 new_conns, lost_conns = self.public_interface.update(local_connection_index) 224 public_interface = self.public_interface.getInterface() 225 for connection_type in utils.connection_types: 226 for new_connection in new_conns[connection_type]: 227 connection = self.master.generate_advertisement_connection_details(new_connection.rule.type, new_connection.rule.name, new_connection.rule.node) 228 rospy.loginfo("Gateway : adding connection to public interface %s" % utils.format_rule(connection.rule)) 229 self.hub_manager.advertise(connection) 230 for lost_connection in lost_conns[connection_type]: 231 rospy.loginfo("Gateway : removing connection from public interface %s" % utils.format_rule(lost_connection.rule)) 232 self.hub_manager.unadvertise(lost_connection) 233 if new_conns or lost_conns: 234 self._publish_gateway_info() 235 return public_interface
236 237 ########################################################################## 238 # Incoming commands from remote gateways 239 ########################################################################## 240
241 - def process_remote_gateway_flip_request(self, registration):
242 ''' 243 Used as a callback for incoming requests on redis pubsub channels. 244 It gets assigned to RedisManager.callback. 245 246 @param registration : fully detailed registration to be processed 247 @type utils.Registration 248 ''' 249 pass 250 if self.flipped_interface.firewall: 251 rospy.logwarn("Gateway : firewalling a flip request %s" % registration) 252 else: 253 rospy.loginfo("Gateway : received a flip request %s" % registration) 254 # probably not necessary as the flipping gateway will already check this 255 existing_registration = self.flipped_interface.find_registration_match(registration.remote_gateway, registration.connection.rule.name, registration.connection.rule.node, registration.connection.rule.type) 256 if not existing_registration: 257 new_registration = self.master.register(registration) 258 if new_registration: 259 self.flipped_interface.registrations[registration.connection.rule.type].append(new_registration) 260 self._publish_gateway_info()
261
262 - def process_remote_gateway_unflip_request(self, rule, remote_gateway):
263 rospy.loginfo("Gateway : received an unflip request from gateway %s: %s" % (remote_gateway, utils.format_rule(rule))) 264 existing_registration = self.flipped_interface.find_registration_match(remote_gateway, rule.name, rule.node, rule.type) 265 if existing_registration: 266 self.master.unregister(existing_registration) 267 self.flipped_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration) 268 self._publish_gateway_info()
269 270 ########################################################################## 271 # Incoming commands from local system (ros service callbacks) 272 ########################################################################## 273
274 - def ros_service_advertise(self, request):
275 ''' 276 Puts/Removes a number of rules on the public interface watchlist. 277 As local rules matching these rules become available/go away, 278 the public interface is modified accordingly. A manual update is done 279 at the end of the advertise call to quickly capture existing 280 rules 281 282 @param request 283 @type gateway_srvs.AdvertiseRequest 284 @return service response 285 @rtgateway_srvs.srv.AdvertiseReponse 286 ''' 287 response = gateway_srvs.AdvertiseResponse() 288 try: 289 if not request.cancel: 290 for rule in request.rules: 291 if not self.public_interface.add_rule(rule): 292 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS 293 response.error_message = "advertisment rule already exists [%s:(%s,%s)]" % (rule.name, rule.type, rule.node) 294 else: 295 for rule in request.rules: 296 if not self.public_interface.remove_rule(rule): 297 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_NOT_FOUND 298 response.error_message = "advertisment not found [%s:(%s,%s)]" % (rule.name, rule.type, rule.node) 299 except Exception as e: 300 rospy.logerr("Gateway : unknown advertise error [%s]." % str(e)) 301 response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR 302 303 # Let the watcher get on with the update asap 304 if response.result == gateway_msgs.ErrorCodes.SUCCESS: 305 self.watcher_thread.trigger_update = True 306 self._publish_gateway_info() 307 else: 308 rospy.logerr("Gateway : %s." % response.error_message) 309 response.watchlist = self.public_interface.getWatchlist() 310 return response
311
312 - def ros_service_advertise_all(self, request):
313 ''' 314 Toggles the advertise all mode. If advertising all, an additional 315 blacklist parameter can be supplied which includes all the topics that 316 will not be advertised/watched for. This blacklist is added to the 317 default blacklist of the public interface 318 319 @param request 320 @type gateway_srvs.AdvertiseAllRequest 321 @return service response 322 @rtype gateway_srvs.AdvertiseAllReponse 323 ''' 324 response = gateway_srvs.AdvertiseAllResponse() 325 try: 326 if not request.cancel: 327 if not self.public_interface.advertise_all(request.blacklist): 328 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS 329 response.error_message = "already advertising all." 330 else: 331 self.public_interface.unadvertise_all() 332 except Exception as e: 333 response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR 334 response.error_message = "unknown advertise all error [%s]" % (str(e)) 335 336 # Let the watcher get on with the update asap 337 if response.result == gateway_msgs.ErrorCodes.SUCCESS: 338 self.watcher_thread.trigger_update = True 339 self._publish_gateway_info() 340 else: 341 rospy.logerr("Gateway : %s." % response.error_message) 342 response.blacklist = self.public_interface.getBlacklist() 343 return response
344
345 - def ros_service_flip(self, request):
346 ''' 347 Puts flip rules on a watchlist which (un)flips them when they 348 become (un)available. 349 350 @param request 351 @type gateway_srvs.RemoteRequest 352 @return service response 353 @rtype gateway_srvs.RemoteResponse 354 ''' 355 response = gateway_srvs.RemoteResponse() 356 # could move this below and if any are fails, just abort adding the rules. 357 for remote in request.remotes: 358 remote.gateway, response.result, response.error_message = self._ros_service_remote_checks(remote.gateway) 359 if response.result != gateway_msgs.ErrorCodes.SUCCESS: 360 rospy.logerr("Gateway : %s." % response.error_message) 361 return response 362 # result is currently SUCCESS 363 added_rules = [] 364 for remote in request.remotes: 365 if not request.cancel: 366 flip_rule = self.flipped_interface.add_rule(remote) 367 if flip_rule: 368 added_rules.append(flip_rule) 369 rospy.loginfo("Gateway : added flip rule [%s:(%s,%s)]" % (flip_rule.gateway, flip_rule.rule.name, flip_rule.rule.type)) 370 else: 371 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS 372 response.error_message = "flip rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type) 373 break 374 else: # request.cancel 375 removed_flip_rules = self.flipped_interface.remove_rule(remote) 376 if removed_flip_rules: 377 rospy.loginfo("Gateway : removed flip rule [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)) 378 379 if response.result == gateway_msgs.ErrorCodes.SUCCESS: 380 self._publish_gateway_info() 381 self.watcher_thread.trigger_update = True 382 else: 383 if added_rules: # completely abort any added rules 384 for added_rule in added_rules: 385 self.flipped_interface.remove_rule(added_rule) 386 rospy.logerr("Gateway : %s." % response.error_message) 387 return response
388
389 - def ros_service_flip_all(self, request):
390 ''' 391 Flips everything except a specified blacklist to a particular gateway, 392 or if the cancel flag is set, clears all flips to that gateway. 393 394 @param request 395 @type gateway_srvs.RemoteAllRequest 396 @return service response 397 @rtype gateway_srvs.RemoteAllResponse 398 ''' 399 response = gateway_srvs.RemoteAllResponse() 400 remote_gateway_target_hash_name, response.result, response.error_message = self._ros_service_remote_checks(request.gateway) 401 if response.result == gateway_msgs.ErrorCodes.SUCCESS: 402 if not request.cancel: 403 if self.flipped_interface.flip_all(remote_gateway_target_hash_name, request.blacklist): 404 rospy.loginfo("Gateway : flipping all to gateway '%s'" % (remote_gateway_target_hash_name)) 405 else: 406 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS 407 response.error_message = "already flipping all to gateway '%s' " + remote_gateway_target_hash_name 408 else: # request.cancel 409 self.flipped_interface.unflip_all(remote_gateway_target_hash_name) 410 rospy.loginfo("Gateway : cancelling a previous flip all request [%s]" % (request.gateway)) 411 if response.result == gateway_msgs.ErrorCodes.SUCCESS: 412 self._publish_gateway_info() 413 self.watcher_thread.trigger_update = True 414 else: 415 rospy.logerr("Gateway : %s." % response.error_message) 416 return response
417
418 - def ros_service_pull(self, request):
419 ''' 420 Puts a single rule on a watchlist and pulls it from a particular 421 gateway when it becomes (un)available. 422 423 @param request 424 @type gateway_srvs.RemoteRequest 425 @return service response 426 @rtype gateway_srvs.RemoteResponse 427 ''' 428 response = gateway_srvs.RemoteResponse() 429 # could move this below and if any are fails, just abort adding the rules. 430 for remote in request.remotes: 431 remote.gateway, response.result, response.error_message = self._ros_service_remote_checks(remote.gateway) 432 if response.result != gateway_msgs.ErrorCodes.SUCCESS: 433 rospy.logerr("Gateway : %s." % response.error_message) 434 return response 435 436 # result is currently SUCCESS 437 added_rules = [] 438 for remote in request.remotes: 439 if not request.cancel: 440 pull_rule = self.pulled_interface.add_rule(remote) 441 if pull_rule: 442 added_rules.append(pull_rule) 443 rospy.loginfo("Gateway : added pull rule [%s:(%s,%s)]" % (pull_rule.gateway, pull_rule.rule.name, pull_rule.rule.type)) 444 else: 445 response.result = gateway_msgs.ErrorCodes.PULL_RULE_ALREADY_EXISTS 446 response.error_message = "pull rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type) 447 break 448 else: # request.cancel 449 for remote in request.remotes: 450 removed_pull_rules = self.pulled_interface.remove_rule(remote) 451 if removed_pull_rules: 452 rospy.loginfo("Gateway : removed pull rule [%s:%s]" % (remote.gateway, remote.rule.name)) 453 if response.result == gateway_msgs.ErrorCodes.SUCCESS: 454 self._publish_gateway_info() 455 self.watcher_thread.trigger_update = True 456 else: 457 if added_rules: # completely abort any added rules 458 for added_rule in added_rules: 459 self.pulled_interface.remove_rule(added_rule) 460 rospy.logerr("Gateway : %s." % response.error_message) 461 return response
462
463 - def ros_service_pull_all(self, request):
464 ''' 465 Pull everything except a specified blacklist from a particular gateway, 466 or if the cancel flag is set, clears all pulls from that gateway. 467 468 @param request 469 @type gateway_srvs.RemoteAllRequest 470 @return service response 471 @rtype gateway_srvs.RemoteAllResponse 472 ''' 473 response = gateway_srvs.RemoteAllResponse() 474 remote_gateway_target_hash_name, response.result, response.error_message = self._ros_service_remote_checks(request.gateway) 475 if response.result == gateway_msgs.ErrorCodes.SUCCESS: 476 if not request.cancel: 477 if self.pulled_interface.pull_all(remote_gateway_target_hash_name, request.blacklist): 478 rospy.loginfo("Gateway : pulling all from gateway '%s'" % (request.gateway)) 479 else: 480 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS 481 response.error_message = "already pulling all from gateway '%s' " + request.gateway 482 else: # request.cancel 483 self.pulled_interface.unpull_all(remote_gateway_target_hash_name) 484 rospy.loginfo("Gateway : cancelling a previous pull all request [%s]" % (request.gateway)) 485 if response.result == gateway_msgs.ErrorCodes.SUCCESS: 486 self._publish_gateway_info() 487 self.watcher_thread.trigger_update = True 488 else: 489 rospy.logerr("Gateway : %s." % response.error_message) 490 return response
491
492 - def _ros_service_remote_checks(self, gateway):
493 ''' 494 Some simple checks when pulling or flipping to make sure that the remote gateway is visible. It 495 does a strict check on the hash names first, then falls back to looking for weak matches on the 496 human friendly name. 497 498 @param gateway : remote gateway target name (can be hash name, basename or regex pattern) 499 @type string 500 @return pair of result type and message 501 @rtype gateway_msgs.ErrorCodes.xxx, string 502 ''' 503 if not self.is_connected(): 504 return None, gateway_msgs.ErrorCodes.NO_HUB_CONNECTION, "not connected to hub, aborting" 505 if gateway == self._unique_name: 506 return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_SELF_IS_NOT, "gateway cannot flip/pull to itself" 507 return gateway, gateway_msgs.ErrorCodes.SUCCESS, ""
508 # matches, weak_matches = self.hub_manager.match_remote_gateway_name(gateway) 509 # if len(matches) > 1: 510 # return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_TARGET_HAS_MULTIPLE_MATCHES, "remote gateway target has multiple matches, invalid [%s][%s]" % (gateway, matches) 511 # elif len(matches) == 1: 512 # return matches[0], gateway_msgs.ErrorCodes.SUCCESS, "" 513 # # Fallback to checking for weak matches 514 # if len(weak_matches) > 1: 515 # return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_TARGET_HAS_MULTIPLE_MATCHES, "remote gateway target has multiple matches against hashed names, invalid [%s]" % weak_matches 516 # elif len(weak_matches) == 1: 517 # return weak_matches[0], gateway_msgs.ErrorCodes.SUCCESS, "" 518 # # Not visible 519 # return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_NOT_VISIBLE, "remote gateway is currently not visible on the hubs [%s]" % gateway 520