hub_manager.py
Go to the documentation of this file.
00001 #!/usr/bin/env pythonupdate
00002 #
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
00005 #
00006 ###############################################################################
00007 # Imports
00008 ###############################################################################
00009 
00010 import threading
00011 
00012 import rospy
00013 import gateway_msgs.msg as gateway_msgs
00014 import rocon_hub_client
00015 
00016 from .exceptions import GatewayUnavailableError
00017 from . import gateway_hub
00018 from . import utils
00019 
00020 ##############################################################################
00021 # Hub Manager
00022 ##############################################################################
00023 
00024 
00025 class HubManager(object):
00026 
00027     ##########################################################################
00028     # Init & Shutdown
00029     ##########################################################################
00030 
00031     def __init__(self, hub_whitelist, hub_blacklist):
00032         self._param = {}
00033         self._param['hub_whitelist'] = hub_whitelist
00034         self._param['hub_blacklist'] = hub_blacklist
00035         self.hubs = []
00036         self._hub_lock = threading.Lock()
00037 
00038     def shutdown(self):
00039         for hub in self.hubs:
00040             hub.unregister_gateway()
00041 
00042     def is_connected(self):
00043         return True if self.hubs else False
00044 
00045     ##########################################################################
00046     # Introspection
00047     ##########################################################################
00048 
00049     def list_remote_gateway_names(self):
00050         '''
00051           Parse all the hubs and retrieve the list of remote gateway names.
00052 
00053           Note: not sure where is most convenient, here or in gateway class.
00054 
00055           @return list of remote gateway names (with hashes), e.g. gateway345ae2c...
00056           @rtype list of str
00057         '''
00058         remote_gateway_names = []
00059         self._hub_lock.acquire()
00060         for hub in self.hubs:
00061             remote_gateway_names.extend(hub.list_remote_gateway_names())
00062         self._hub_lock.release()
00063         # return the list without duplicates
00064         return list(set(remote_gateway_names))
00065 
00066     def create_remote_gateway_hub_index(self):
00067         '''
00068           Utility function to parse all hubs for the remote gateways and
00069           create a dictionary of the type:
00070 
00071             dic['remote_gateway_name'] = ['hub1', 'hub2']
00072 
00073           where the hub list is a list of actual hub object references.
00074         '''
00075         dic = {}
00076         self._hub_lock.acquire()
00077         for hub in self.hubs:
00078             for remote_gateway in hub.list_remote_gateway_names():
00079                 if remote_gateway in dic:
00080                     dic[remote_gateway].append(hub)
00081                 else:
00082                     dic[remote_gateway] = [hub]
00083         self._hub_lock.release()
00084         return dic
00085 
00086     def get_flip_requests(self):
00087         '''
00088           Returns all unblocked flip requests received by this hub
00089 
00090           @return list of flip registration requests
00091           @rtype list of utils.Registration
00092         '''
00093         registrations = []
00094         self._hub_lock.acquire()
00095         for hub in self.hubs:
00096             registrations.extend(hub.get_unblocked_flipped_in_connections())
00097         self._hub_lock.release()
00098         return registrations
00099 
00100     def remote_gateway_info(self, remote_gateway_name):
00101         '''
00102           Return information that a remote gateway has posted on the hub(s).
00103 
00104           @param remote_gateway_name : the hash name for the remote gateway
00105           @type str
00106 
00107           @return remote gateway information
00108           @rtype gateway_msgs.RemotGateway or None
00109         '''
00110         remote_gateway_info = None
00111         self._hub_lock.acquire()
00112         for hub in self.hubs:
00113             if remote_gateway_name in hub.list_remote_gateway_names():
00114                 # I don't think we need more than one hub's info....
00115                 remote_gateway_info = hub.remote_gateway_info(remote_gateway_name)
00116                 if remote_gateway_info is not None:
00117                     break
00118         self._hub_lock.release()
00119         return remote_gateway_info
00120 
00121     def get_remote_gateway_firewall_flag(self, remote_gateway_name):
00122         '''
00123           Return information that a remote gateway has posted on the hub(s).
00124 
00125           @param remote_gateway_name : the hash name for the remote gateway
00126           @type string
00127 
00128           @return True, false if the flag is set or not, None if remote
00129                   gateway information cannot found
00130           @rtype Bool
00131         '''
00132         firewall_flag = None
00133         self._hub_lock.acquire()
00134         for hub in self.hubs:
00135             if remote_gateway_name in hub.list_remote_gateway_names():
00136                 # I don't think we need more than one hub's info....
00137                 try:
00138                     firewall_flag = hub.get_remote_gateway_firewall_flag(remote_gateway_name)
00139                     break
00140                 except GatewayUnavailableError:
00141                     pass  # cycle through the other hubs looking as well.
00142         self._hub_lock.release()
00143         return firewall_flag
00144 
00145     def send_unflip_request(self, remote_gateway_name, remote_rule):
00146         '''
00147           Send an unflip request to the specified gateway through all available
00148           hubs.
00149 
00150           Doesn't raise GatewayUnavailableError if nothing got sent as the higher level
00151           doesn't need any logic there yet (only called from gateway.shutdown).
00152 
00153           @param remote_gateway_name : the hash name for the remote gateway
00154           @type string
00155 
00156           @param remote_rule : the remote rule to unflip
00157           @type gateway_msgs.RemoteRule
00158         '''
00159         self._hub_lock.acquire()
00160         for hub in self.hubs:
00161             if remote_gateway_name in hub.list_remote_gateway_names():
00162                 try:
00163                     if hub.send_unflip_request(remote_gateway_name, remote_rule):
00164                         self._hub_lock.release()
00165                         return
00166                 except GatewayUnavailableError:
00167                     pass  # cycle through the other hubs looking as well.
00168         self._hub_lock.release()
00169 
00170     ##########################################################################
00171     # Hub Connections
00172     ##########################################################################
00173 
00174     def connect_to_hub(self,
00175                        ip,
00176                        port,
00177                        firewall_flag,
00178                        gateway_unique_name,
00179                        gateway_disengage_hub,  # hub connection lost hook
00180                        gateway_ip,
00181                        existing_advertisements
00182                        ):
00183         '''
00184           Attempts to make a connection and register the gateway with a hub.
00185           This is called from the gateway node's _register_gateway method.
00186 
00187           @param ip
00188           @param port
00189           @param firewall_flag
00190           @param gateway_unique_name
00191           @param remote_gateway_request_callbacks
00192           @type method : Gateway.remote_gateway_request_callbacks()
00193           @param gateway_disengage_hub : this is the hub connection lost hook
00194           @type method : Gateway.disengage_hub()
00195           @param gateway_ip
00196           @param existing advertisements
00197           @type { utils.ConnectionTypes : utils.Connection[] }
00198 
00199           @return an integer indicating error (important for the service call)
00200           @rtype gateway_msgs.ErrorCodes
00201 
00202           @raise
00203         '''
00204         try:
00205             new_hub = gateway_hub.GatewayHub(ip, port, self._param['hub_whitelist'], self._param['hub_blacklist'])
00206         except rocon_hub_client.HubError as e:
00207             return None, e.id, str(e)
00208         already_exists_error = False
00209         self._hub_lock.acquire()
00210         for hub in self.hubs:
00211             if hub == new_hub:
00212                 already_exists_error = True
00213                 break
00214         self._hub_lock.release()
00215         if not already_exists_error:
00216             self._hub_lock.acquire()
00217             new_hub.register_gateway(firewall_flag,
00218                                      gateway_unique_name,
00219                                      gateway_disengage_hub,  # hub connection lost hook
00220                                      gateway_ip,
00221                                      )
00222             for connection_type in utils.connection_types:
00223                 for advertisement in existing_advertisements[connection_type]:
00224                     new_hub.advertise(advertisement)
00225             self.hubs.append(new_hub)
00226             self._hub_lock.release()
00227             return new_hub, gateway_msgs.ErrorCodes.SUCCESS, "success"
00228         else:
00229             return None, gateway_msgs.ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS, "already connected to this hub"
00230 
00231     def disengage_hub(self, hub_to_be_disengaged):
00232         '''
00233           Disengages a hub. Make sure all necessary connections
00234           are cleaned up before calling this (Gateway.disengage_hub).
00235 
00236           @param hub_to_be_disengaged
00237         '''
00238         #uri = str(ip) + ":" + str(port)
00239         # Could dig in and find the name here, but not worth the bother.
00240         hub_to_be_disengaged.disconnect()  # necessary to kill failing socket receives
00241         self._hub_lock.acquire()
00242         if hub_to_be_disengaged in self.hubs:
00243             rospy.loginfo("Gateway : disengaged connection with the hub [%s][%s]" % (
00244                 hub_to_be_disengaged.name, hub_to_be_disengaged.uri))
00245             self.hubs[:] = [hub for hub in self.hubs if hub != hub_to_be_disengaged]
00246         self._hub_lock.release()
00247 
00248     def advertise(self, connection):
00249         self._hub_lock.acquire()
00250         for hub in self.hubs:
00251             hub.advertise(connection)
00252         self._hub_lock.release()
00253 
00254     def unadvertise(self, connection):
00255         self._hub_lock.acquire()
00256         for hub in self.hubs:
00257             hub.unadvertise(connection)
00258         self._hub_lock.release()
00259 
00260     def match_remote_gateway_name(self, remote_gateway_name):
00261         '''
00262           Parses the hub lists looking for strong (identical) and
00263           weak (matches the name without the uuid hash) matches.
00264         '''
00265         matches = []
00266         weak_matches = []  # doesn't match any hash names, but matches a base name
00267         self._hub_lock.acquire()
00268         for hub in self.hubs:
00269             matches.extend(hub.matches_remote_gateway_name(remote_gateway_name))
00270             weak_matches.extend(hub.matches_remote_gateway_basename(remote_gateway_name))
00271         self._hub_lock.release()
00272         # these are hash name lists, make sure they didn't pick up matches for a single hash name from multiple hubs
00273         matches = list(set(matches))
00274         weak_matches = list(set(weak_matches))
00275         return matches, weak_matches
00276 
00277     def publish_network_statistics(self, statistics):
00278         '''
00279           Publish network statistics to every hub this gateway is connected to.
00280 
00281           @param statistics
00282           @type gateway_msgs.ConnectionStatistics
00283         '''
00284         self._hub_lock.acquire()
00285         for hub in self.hubs:
00286             hub.publish_network_statistics(statistics)
00287         self._hub_lock.release()


rocon_gateway
Author(s): Daniel Stonier , Jihoon Lee , Piyush Khandelwal
autogenerated on Sat Jun 8 2019 18:48:44