hub_manager.py
Go to the documentation of this file.
1 #!/usr/bin/env pythonupdate
2 #
3 # License: BSD
4 # https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
5 #
6 ###############################################################################
7 # Imports
8 ###############################################################################
9 
10 import threading
11 
12 import rospy
13 import gateway_msgs.msg as gateway_msgs
14 import rocon_hub_client
15 
16 from .exceptions import GatewayUnavailableError
17 from . import gateway_hub
18 from . import utils
19 
20 ##############################################################################
21 # Hub Manager
22 ##############################################################################
23 
24 
25 class HubManager(object):
26  """
27  :ivar hubs: list of gateway hub instances
28  :vartype hubs: [rocon_gateway.GatewayHub]
29  """
30 
31  ##########################################################################
32  # Init & Shutdown
33  ##########################################################################
34 
35  def __init__(self, hub_whitelist, hub_blacklist):
36  self._param = {}
37  self._param['hub_whitelist'] = hub_whitelist
38  self._param['hub_blacklist'] = hub_blacklist
39  self.hubs = []
40  self._hub_lock = threading.Lock()
41 
42  def is_connected(self):
43  return True if self.hubs else False
44 
45  ##########################################################################
46  # Introspection
47  ##########################################################################
48 
50  '''
51  Parse all the hubs and retrieve the list of remote gateway names.
52 
53  Note: not sure where is most convenient, here or in gateway class.
54 
55  @return list of remote gateway names (with hashes), e.g. gateway345ae2c...
56  @rtype list of str
57  '''
58  remote_gateway_names = []
59  self._hub_lock.acquire()
60  for hub in self.hubs:
61  remote_gateway_names.extend(hub.list_remote_gateway_names())
62  self._hub_lock.release()
63  # return the list without duplicates
64  return list(set(remote_gateway_names))
65 
67  '''
68  Utility function to parse all hubs for the remote gateways and
69  create a dictionary of the type:
70 
71  dic['remote_gateway_name'] = ['hub1', 'hub2']
72 
73  where the hub list is a list of actual hub object references.
74  '''
75  dic = {}
76  self._hub_lock.acquire()
77  for hub in self.hubs:
78  for remote_gateway in hub.list_remote_gateway_names():
79  if remote_gateway in dic:
80  dic[remote_gateway].append(hub)
81  else:
82  dic[remote_gateway] = [hub]
83  self._hub_lock.release()
84  return dic
85 
86  def get_flip_requests(self):
87  '''
88  Returns all unblocked flip requests received by this hub
89 
90  @return list of flip registration requests
91  @rtype list of utils.Registration
92  '''
93  registrations = []
94  self._hub_lock.acquire()
95  for hub in self.hubs:
96  registrations.extend(hub.get_unblocked_flipped_in_connections())
97  self._hub_lock.release()
98  return registrations
99 
100  def remote_gateway_info(self, remote_gateway_name):
101  '''
102  Return information that a remote gateway has posted on the hub(s).
103 
104  @param remote_gateway_name : the hash name for the remote gateway
105  @type str
106 
107  @return remote gateway information
108  @rtype gateway_msgs.RemotGateway or None
109  '''
110  remote_gateway_info = None
111  self._hub_lock.acquire()
112  for hub in self.hubs:
113  if remote_gateway_name in hub.list_remote_gateway_names():
114  # I don't think we need more than one hub's info....
115  remote_gateway_info = hub.remote_gateway_info(remote_gateway_name)
116  if remote_gateway_info is not None:
117  break
118  self._hub_lock.release()
119  return remote_gateway_info
120 
121  def get_remote_gateway_firewall_flag(self, remote_gateway_name):
122  '''
123  Return information that a remote gateway has posted on the hub(s).
124 
125  @param remote_gateway_name : the hash name for the remote gateway
126  @type string
127 
128  @return True, false if the flag is set or not, None if remote
129  gateway information cannot found
130  @rtype Bool
131  '''
132  firewall_flag = None
133  self._hub_lock.acquire()
134  for hub in self.hubs:
135  if remote_gateway_name in hub.list_remote_gateway_names():
136  # I don't think we need more than one hub's info....
137  try:
138  firewall_flag = hub.get_remote_gateway_firewall_flag(remote_gateway_name)
139  break
140  except GatewayUnavailableError:
141  pass # cycle through the other hubs looking as well.
142  self._hub_lock.release()
143  return firewall_flag
144 
145  def send_unflip_request(self, remote_gateway_name, remote_rule):
146  '''
147  Send an unflip request to the specified gateway through all available
148  hubs.
149 
150  Doesn't raise GatewayUnavailableError if nothing got sent as the higher level
151  doesn't need any logic there yet (only called from gateway.shutdown).
152 
153  @param remote_gateway_name : the hash name for the remote gateway
154  @type string
155 
156  @param remote_rule : the remote rule to unflip
157  @type gateway_msgs.RemoteRule
158  '''
159  self._hub_lock.acquire()
160  for hub in self.hubs:
161  if remote_gateway_name in hub.list_remote_gateway_names():
162  try:
163  if hub.send_unflip_request(remote_gateway_name, remote_rule):
164  self._hub_lock.release()
165  return
166  except GatewayUnavailableError:
167  pass # cycle through the other hubs looking as well.
168  self._hub_lock.release()
169 
170  ##########################################################################
171  # Hub Connections
172  ##########################################################################
173 
174  def connect_to_hub(self,
175  new_hub,
176  firewall_flag,
177  gateway_unique_name,
178  gateway_disengage_hub, # hub connection lost hook
179  gateway_ip,
180  existing_advertisements
181  ):
182  '''
183  Attempts to make a connection and register the gateway with a hub.
184  This is called from the gateway node's _register_gateway method.
185 
186  @param ip
187  @param port
188  @param firewall_flag
189  @param gateway_unique_name
190  @param remote_gateway_request_callbacks
191  @type method : Gateway.remote_gateway_request_callbacks()
192  @param gateway_disengage_hub : this is the hub connection lost hook
193  @type method : Gateway.disengage_hub()
194  @param gateway_ip
195  @param existing advertisements
196  @type { utils.ConnectionTypes : utils.Connection[] }
197 
198  @return an integer indicating error (important for the service call)
199  @rtype gateway_msgs.ErrorCodes
200 
201  @raise
202  '''
203  self._hub_lock.acquire()
204  try:
205  new_hub.register_gateway(firewall_flag,
206  gateway_unique_name,
207  gateway_disengage_hub, # hub connection lost hook
208  gateway_ip,
209  )
210  for connection_type in utils.connection_types:
211  for advertisement in existing_advertisements[connection_type]:
212  new_hub.advertise(advertisement)
213 
214  # forcefully replace obsolete hub if needed
215  if new_hub in self.hubs:
216  self.hubs.remove(new_hub)
217 
218  self.hubs.append(new_hub)
219  except rocon_hub_client.HubError as e:
220  return None, e.id, str(e)
221  finally:
222  self._hub_lock.release()
223  return new_hub, gateway_msgs.ErrorCodes.SUCCESS, "success"
224 
225  def is_connected_to_hub(self, ip, port):
226  '''
227  Check if the gateway is properly connected to the hub.
228  '''
229  hub = None
230  # Retrieve existing hub from set
231  for h in self.hubs:
232  if h.ip == ip and h.port == port:
233  hub = h
234  break
235 
236  if not hub: # if needed we create a new one
237  try:
238  hub = gateway_hub.GatewayHub(ip, port, self._param['hub_whitelist'], self._param['hub_blacklist'])
239  except rocon_hub_client.HubError as e:
240  return None, e.id, str(e)
241 
242  self._hub_lock.acquire()
243  registered = hub.is_gateway_registered()
244  self._hub_lock.release()
245  if registered:
246  return hub, gateway_msgs.ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS, "already connected to this hub"
247  else:
248  return hub, gateway_msgs.ErrorCodes.NO_HUB_CONNECTION, "not connected to this hub"
249 
250  def disengage_hub(self, hub_to_be_disengaged):
251  '''
252  Disengages a hub. Make sure all necessary connections
253  are cleaned up before calling this (Gateway.disengage_hub).
254 
255  @param hub_to_be_disengaged
256  '''
257  # uri = str(ip) + ":" + str(port)
258  # Could dig in and find the name here, but not worth the bother.
259  hub_to_be_disengaged.disconnect() # necessary to kill failing socket receives
260  self._hub_lock.acquire()
261  if hub_to_be_disengaged in self.hubs:
262  rospy.loginfo("Gateway : disengaged connection with the hub [%s][%s]" % (
263  hub_to_be_disengaged.name, hub_to_be_disengaged.uri))
264  self.hubs[:] = [hub for hub in self.hubs if hub != hub_to_be_disengaged]
265  self._hub_lock.release()
266 
267  def advertise(self, connection):
268  self._hub_lock.acquire()
269  for hub in self.hubs:
270  hub.advertise(connection)
271  self._hub_lock.release()
272 
273  def unadvertise(self, connection):
274  self._hub_lock.acquire()
275  for hub in self.hubs:
276  hub.unadvertise(connection)
277  self._hub_lock.release()
278 
279  def match_remote_gateway_name(self, remote_gateway_name):
280  '''
281  Parses the hub lists looking for strong (identical) and
282  weak (matches the name without the uuid hash) matches.
283  '''
284  matches = []
285  weak_matches = [] # doesn't match any hash names, but matches a base name
286  self._hub_lock.acquire()
287  for hub in self.hubs:
288  matches.extend(hub.matches_remote_gateway_name(remote_gateway_name))
289  weak_matches.extend(hub.matches_remote_gateway_basename(remote_gateway_name))
290  self._hub_lock.release()
291  # these are hash name lists, make sure they didn't pick up matches for a single hash name from multiple hubs
292  matches = list(set(matches))
293  weak_matches = list(set(weak_matches))
294  return matches, weak_matches
295 
296  def publish_network_statistics(self, statistics):
297  '''
298  Publish network statistics to every hub this gateway is connected to.
299 
300  @param statistics
301  @type gateway_msgs.ConnectionStatistics
302  '''
303  self._hub_lock.acquire()
304  for hub in self.hubs:
305  hub.publish_network_statistics(statistics)
306  self._hub_lock.release()
def match_remote_gateway_name(self, remote_gateway_name)
Definition: hub_manager.py:279
def advertise(self, connection)
Definition: hub_manager.py:267
def disengage_hub(self, hub_to_be_disengaged)
Definition: hub_manager.py:250
def send_unflip_request(self, remote_gateway_name, remote_rule)
Definition: hub_manager.py:145
def unadvertise(self, connection)
Definition: hub_manager.py:273
def get_remote_gateway_firewall_flag(self, remote_gateway_name)
Definition: hub_manager.py:121
def connect_to_hub(self, new_hub, firewall_flag, gateway_unique_name, gateway_disengage_hub, gateway_ip, existing_advertisements)
Hub Connections.
Definition: hub_manager.py:181
def is_connected_to_hub(self, ip, port)
Definition: hub_manager.py:225
def publish_network_statistics(self, statistics)
Definition: hub_manager.py:296
def list_remote_gateway_names(self)
Introspection.
Definition: hub_manager.py:49
def __init__(self, hub_whitelist, hub_blacklist)
Init & Shutdown.
Definition: hub_manager.py:35
def remote_gateway_info(self, remote_gateway_name)
Definition: hub_manager.py:100


rocon_gateway
Author(s): Daniel Stonier , Jihoon Lee , Piyush Khandelwal
autogenerated on Mon Jun 10 2019 14:40:10