Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import threading
00011
00012 import rospy
00013 import gateway_msgs.msg as gateway_msgs
00014 import rocon_hub_client
00015
00016 from .exceptions import GatewayUnavailableError
00017 from . import gateway_hub
00018 from . import utils
00019
00020
00021
00022
00023
00024
00025 class HubManager(object):
00026
00027
00028
00029
00030
00031 def __init__(self, hub_whitelist, hub_blacklist):
00032 self._param = {}
00033 self._param['hub_whitelist'] = hub_whitelist
00034 self._param['hub_blacklist'] = hub_blacklist
00035 self.hubs = []
00036 self._hub_lock = threading.Lock()
00037
00038 def shutdown(self):
00039 for hub in self.hubs:
00040 hub.unregister_gateway()
00041
00042 def is_connected(self):
00043 return True if self.hubs else False
00044
00045
00046
00047
00048
00049 def list_remote_gateway_names(self):
00050 '''
00051 Parse all the hubs and retrieve the list of remote gateway names.
00052
00053 Note: not sure where is most convenient, here or in gateway class.
00054
00055 @return list of remote gateway names (with hashes), e.g. gateway345ae2c...
00056 @rtype list of str
00057 '''
00058 remote_gateway_names = []
00059 self._hub_lock.acquire()
00060 for hub in self.hubs:
00061 remote_gateway_names.extend(hub.list_remote_gateway_names())
00062 self._hub_lock.release()
00063
00064 return list(set(remote_gateway_names))
00065
00066 def create_remote_gateway_hub_index(self):
00067 '''
00068 Utility function to parse all hubs for the remote gateways and
00069 create a dictionary of the type:
00070
00071 dic['remote_gateway_name'] = ['hub1', 'hub2']
00072
00073 where the hub list is a list of actual hub object references.
00074 '''
00075 dic = {}
00076 self._hub_lock.acquire()
00077 for hub in self.hubs:
00078 for remote_gateway in hub.list_remote_gateway_names():
00079 if remote_gateway in dic:
00080 dic[remote_gateway].append(hub)
00081 else:
00082 dic[remote_gateway] = [hub]
00083 self._hub_lock.release()
00084 return dic
00085
00086 def get_flip_requests(self):
00087 '''
00088 Returns all unblocked flip requests received by this hub
00089
00090 @return list of flip registration requests
00091 @rtype list of utils.Registration
00092 '''
00093 registrations = []
00094 self._hub_lock.acquire()
00095 for hub in self.hubs:
00096 registrations.extend(hub.get_unblocked_flipped_in_connections())
00097 self._hub_lock.release()
00098 return registrations
00099
00100 def remote_gateway_info(self, remote_gateway_name):
00101 '''
00102 Return information that a remote gateway has posted on the hub(s).
00103
00104 @param remote_gateway_name : the hash name for the remote gateway
00105 @type str
00106
00107 @return remote gateway information
00108 @rtype gateway_msgs.RemotGateway or None
00109 '''
00110 remote_gateway_info = None
00111 self._hub_lock.acquire()
00112 for hub in self.hubs:
00113 if remote_gateway_name in hub.list_remote_gateway_names():
00114
00115 remote_gateway_info = hub.remote_gateway_info(remote_gateway_name)
00116 if remote_gateway_info is not None:
00117 break
00118 self._hub_lock.release()
00119 return remote_gateway_info
00120
00121 def get_remote_gateway_firewall_flag(self, remote_gateway_name):
00122 '''
00123 Return information that a remote gateway has posted on the hub(s).
00124
00125 @param remote_gateway_name : the hash name for the remote gateway
00126 @type string
00127
00128 @return True, false if the flag is set or not, None if remote
00129 gateway information cannot found
00130 @rtype Bool
00131 '''
00132 firewall_flag = None
00133 self._hub_lock.acquire()
00134 for hub in self.hubs:
00135 if remote_gateway_name in hub.list_remote_gateway_names():
00136
00137 try:
00138 firewall_flag = hub.get_remote_gateway_firewall_flag(remote_gateway_name)
00139 break
00140 except GatewayUnavailableError:
00141 pass
00142 self._hub_lock.release()
00143 return firewall_flag
00144
00145 def send_unflip_request(self, remote_gateway_name, remote_rule):
00146 '''
00147 Send an unflip request to the specified gateway through all available
00148 hubs.
00149
00150 Doesn't raise GatewayUnavailableError if nothing got sent as the higher level
00151 doesn't need any logic there yet (only called from gateway.shutdown).
00152
00153 @param remote_gateway_name : the hash name for the remote gateway
00154 @type string
00155
00156 @param remote_rule : the remote rule to unflip
00157 @type gateway_msgs.RemoteRule
00158 '''
00159 self._hub_lock.acquire()
00160 for hub in self.hubs:
00161 if remote_gateway_name in hub.list_remote_gateway_names():
00162 try:
00163 if hub.send_unflip_request(remote_gateway_name, remote_rule):
00164 self._hub_lock.release()
00165 return
00166 except GatewayUnavailableError:
00167 pass
00168 self._hub_lock.release()
00169
00170
00171
00172
00173
00174 def connect_to_hub(self,
00175 ip,
00176 port,
00177 firewall_flag,
00178 gateway_unique_name,
00179 gateway_disengage_hub,
00180 gateway_ip,
00181 existing_advertisements
00182 ):
00183 '''
00184 Attempts to make a connection and register the gateway with a hub.
00185 This is called from the gateway node's _register_gateway method.
00186
00187 @param ip
00188 @param port
00189 @param firewall_flag
00190 @param gateway_unique_name
00191 @param remote_gateway_request_callbacks
00192 @type method : Gateway.remote_gateway_request_callbacks()
00193 @param gateway_disengage_hub : this is the hub connection lost hook
00194 @type method : Gateway.disengage_hub()
00195 @param gateway_ip
00196 @param existing advertisements
00197 @type { utils.ConnectionTypes : utils.Connection[] }
00198
00199 @return an integer indicating error (important for the service call)
00200 @rtype gateway_msgs.ErrorCodes
00201
00202 @raise
00203 '''
00204 try:
00205 new_hub = gateway_hub.GatewayHub(ip, port, self._param['hub_whitelist'], self._param['hub_blacklist'])
00206 except rocon_hub_client.HubError as e:
00207 return None, e.id, str(e)
00208 already_exists_error = False
00209 self._hub_lock.acquire()
00210 for hub in self.hubs:
00211 if hub == new_hub:
00212 already_exists_error = True
00213 break
00214 self._hub_lock.release()
00215 if not already_exists_error:
00216 self._hub_lock.acquire()
00217 new_hub.register_gateway(firewall_flag,
00218 gateway_unique_name,
00219 gateway_disengage_hub,
00220 gateway_ip,
00221 )
00222 for connection_type in utils.connection_types:
00223 for advertisement in existing_advertisements[connection_type]:
00224 new_hub.advertise(advertisement)
00225 self.hubs.append(new_hub)
00226 self._hub_lock.release()
00227 return new_hub, gateway_msgs.ErrorCodes.SUCCESS, "success"
00228 else:
00229 return None, gateway_msgs.ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS, "already connected to this hub"
00230
00231 def disengage_hub(self, hub_to_be_disengaged):
00232 '''
00233 Disengages a hub. Make sure all necessary connections
00234 are cleaned up before calling this (Gateway.disengage_hub).
00235
00236 @param hub_to_be_disengaged
00237 '''
00238
00239
00240 hub_to_be_disengaged.disconnect()
00241 self._hub_lock.acquire()
00242 if hub_to_be_disengaged in self.hubs:
00243 rospy.loginfo("Gateway : disengaged connection with the hub [%s][%s]" % (
00244 hub_to_be_disengaged.name, hub_to_be_disengaged.uri))
00245 self.hubs[:] = [hub for hub in self.hubs if hub != hub_to_be_disengaged]
00246 self._hub_lock.release()
00247
00248 def advertise(self, connection):
00249 self._hub_lock.acquire()
00250 for hub in self.hubs:
00251 hub.advertise(connection)
00252 self._hub_lock.release()
00253
00254 def unadvertise(self, connection):
00255 self._hub_lock.acquire()
00256 for hub in self.hubs:
00257 hub.unadvertise(connection)
00258 self._hub_lock.release()
00259
00260 def match_remote_gateway_name(self, remote_gateway_name):
00261 '''
00262 Parses the hub lists looking for strong (identical) and
00263 weak (matches the name without the uuid hash) matches.
00264 '''
00265 matches = []
00266 weak_matches = []
00267 self._hub_lock.acquire()
00268 for hub in self.hubs:
00269 matches.extend(hub.matches_remote_gateway_name(remote_gateway_name))
00270 weak_matches.extend(hub.matches_remote_gateway_basename(remote_gateway_name))
00271 self._hub_lock.release()
00272
00273 matches = list(set(matches))
00274 weak_matches = list(set(weak_matches))
00275 return matches, weak_matches
00276
00277 def publish_network_statistics(self, statistics):
00278 '''
00279 Publish network statistics to every hub this gateway is connected to.
00280
00281 @param statistics
00282 @type gateway_msgs.ConnectionStatistics
00283 '''
00284 self._hub_lock.acquire()
00285 for hub in self.hubs:
00286 hub.publish_network_statistics(statistics)
00287 self._hub_lock.release()