hub_discovery.py
Go to the documentation of this file.
1 #
2 # License: BSD
3 # https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
4 #
5 ###############################################################################
6 # Imports
7 ###############################################################################
8 
9 import threading
10 from urlparse import urlparse
11 import rospy
12 import time
13 import zeroconf_msgs.srv as zeroconf_srvs
14 from gateway_msgs.msg import ErrorCodes
15 
16 from . import hub_client
17 
18 ###############################################################################
19 # Thread
20 ###############################################################################
21 
22 
23 class HubDiscovery(threading.Thread):
24 
25  gateway_hub_service = "_ros-multimaster-hub._tcp"
26 
27  '''
28  Used to discover hubs via zeroconf.
29  '''
30  def __init__(self, verify_connection_hook, direct_hub_uri_list=[], disable_zeroconf=False, blacklisted_hubs={}):
31  '''
32  :param external_discovery_update: is a callback function that takes action on a discovery
33  :type external_discovery_update: GatewayNode.register_gateway(ip, port)
34 
35  :param str[] direct_hub_uri_list: list of uri's to hubs (e.g. http://localhost:6380)
36 
37  :param disallowed_hubs:
38  :type disallowed_hubs: # 'ip:port' : (error_code, error_code_str) dictionary of hubs that have been blacklisted (maintained by manager of this class)
39  '''
40  threading.Thread.__init__(self)
41  self.verify_connection_hook = verify_connection_hook
42  self._trigger_shutdown = False
43  self.trigger_update = False
44  self._direct_hub_uri_list = direct_hub_uri_list
46  self._zeroconf_services_available = False if disable_zeroconf else _zeroconf_services_available()
47  self._blacklisted_hubs = blacklisted_hubs
49  self._discovery_request = zeroconf_srvs.ListDiscoveredServicesRequest()
50  self._discovery_request.service_type = HubDiscovery.gateway_hub_service
52  self._list_discovered_services = rospy.ServiceProxy("zeroconf/list_discovered_services", zeroconf_srvs.ListDiscoveredServices, persistent=True)
54  self._discovered_hubs_modification_mutex = threading.Lock()
55  # Only run the thread if we need to.
57  self.start()
58 
59  def shutdown(self):
60  '''
61  Called from the main program to shutdown this thread.
62  '''
63 
64  self._trigger_shutdown = True
65  self.trigger_update = True # causes it to interrupt a sleep and drop back to check shutdown condition
66 
67  # Avoiding local zombies
68  if self.is_alive(): # python complains if you join a non-started thread
69  self.join() # wait for the thread to finish
70 
71  def run(self):
72  '''
73  The hub discovery thread worker function. Monitors zeroconf for the presence of new hubs.
74 
75  Note that the zeroconf service is persistent. Alternatively we could use the zeroconf
76  subscriber to be a wee bit more efficient.
77  '''
78  half_sec = 0.5 # rospy.Duration(0, 500000000)
79  self._loop_period = half_sec
80  self._internal_sleep_period = half_sec
81  self._last_loop_timestamp = time.time() # rospy.Time.now()
82  # error codes which inform the client is should stop scanning for this hub
83  reasons_not_to_keep_scanning = [
84  # ErrorCodes.SUCCESS, # success now might still be broken by the otherside => we always need to check
85  # ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS, # might exist now but not a bit later => always need to check
86  ErrorCodes.HUB_CONNECTION_NOT_IN_NONEMPTY_WHITELIST,
87  # this now has to be permitted as we will often have zeroconf failing for gateways
88  # that have dropped out of wireless range.
89  # ErrorCodes.HUB_CONNECTION_UNRESOLVABLE
90  ]
91  unresolvable_hub = []
92  while not rospy.is_shutdown() and not self._trigger_shutdown:
93  self._discovered_hubs_modification_mutex.acquire()
94  # Zeroconf scanning
96  new_services, unused_lost_services = self._zeroconf_scan()
97  for service in new_services:
98  (ip, port) = _resolve_address(service)
99  service_uri = str(ip) + ':' + str(port)
100  if service_uri not in self._blacklisted_hubs.keys():
101  result, reason = self.verify_connection_hook(ip, port)
102  if result == ErrorCodes.HUB_CONNECTION_UNRESOLVABLE:
103  if service_uri not in unresolvable_hub:
104  rospy.loginfo("Gateway : unresolvable hub [%s]" % reason)
105  unresolvable_hub.append(service_uri)
106  elif result == ErrorCodes.HUB_CONNECTION_FAILED:
107  rospy.logwarn("Gateway : hub connection failed. [%s][%s]" %(service_uri, reason))
108  elif result == ErrorCodes.SUCCESS:
109  # we're good
110  if service_uri in unresolvable_hub:
111  unresolvable_hub.remove(service_uri)
112  else: # any of the other reasons not to keep scanning
113  rospy.loginfo("Gateway : removing hub from the list to be resolved via zeroconf [%s]" % reason)
114  self._zeroconf_discovered_hubs.append(service)
115  # Direct scanning
116  new_hubs, unused_lost_hubs = self._direct_scan()
117  for hub_uri in new_hubs:
118  hostname, port = _resolve_url(hub_uri)
119  result, _ = self.verify_connection_hook(hostname, port)
120  if result in reasons_not_to_keep_scanning:
121  rospy.loginfo("Gateway : ignoring discovered hub [%s]" % hub_uri)
122  self._direct_discovered_hubs.append(hub_uri)
123  self._discovered_hubs_modification_mutex.release()
124  if not self._zeroconf_services_available and not self._direct_hub_uri_list:
125  rospy.logfatal("Gateway : zeroconf unavailable and no valid direct hub uris. Stopping hub discovery.")
126  break # nothing left to do
127  self._sleep()
129  self._list_discovered_services.close()
130 
131  def disengage_hub(self, hub):
132  '''
133  Called when a discovered hub is lost in the upstream application.
134 
135  This method should remove the hub from the list of discovered hubs.
136  When the hub comes back up again, the hub discovery thread will
137  call the discovery_update_hook again
138 
139  @param hub: hub to be disengage
140  @type Hub
141  '''
142  self._discovered_hubs_modification_mutex.acquire()
143  self._direct_discovered_hubs[:] = [x for x in self._direct_discovered_hubs
144  if not _match_url_to_hub_url(x, hub.uri)]
146  self._zeroconf_discovered_hubs[:] = [x for x in self._zeroconf_discovered_hubs
147  if not _match_zeroconf_address_to_hub_url(x, hub.uri)]
148  self._discovered_hubs_modification_mutex.release()
149 
150  def _sleep(self):
151  '''
152  Internal non-interruptible sleep loop to check for shutdown and update triggers.
153  This lets us set a really long watch_loop update if we wish.
154  '''
155  while not rospy.is_shutdown() and not self.trigger_update and (time.time() - self._last_loop_timestamp < self._loop_period):
156  rospy.rostime.wallsleep(self._internal_sleep_period)
157  self.trigger_update = False
158  self._last_loop_timestamp = time.time()
159 
160  #############################
161  # Private methods
162  #############################
163 
164  def _direct_scan(self):
165  '''
166  Ping the list of hubs we are directly looking for to see if they are alive.
167  Also check if the gateway there is listed to determine if the connection should be refreshed
168  '''
169  discovered_hubs = []
170  already_used_hubs = []
171  remove_uris = []
172  for uri in self._direct_hub_uri_list:
173  (hostname, port) = _resolve_url(uri)
174  if not hostname:
175  rospy.logerr("Gateway : Unable to parse direct hub uri [%s]" % uri)
176  remove_uris.append(uri)
177  continue
178  (ping_result, unused_ping_error_message) = hub_client.ping_hub(hostname, port)
179  if ping_result:
180  discovered_hubs.append(uri)
181  difference = lambda l1, l2: [x for x in l1 if x not in l2]
182  self._direct_hub_uri_list[:] = difference(self._direct_hub_uri_list, remove_uris)
183  new_hubs = difference(discovered_hubs, self._direct_discovered_hubs)
184  lost_hubs = difference(self._direct_discovered_hubs, discovered_hubs)
185  # self._direct_discovered_hubs = discovered_hubs
186  # self._direct_discovered_hubs.extend(discovered_hubs)
187  return new_hubs, lost_hubs
188 
189  def _zeroconf_scan(self):
190  '''
191  This checks for new services and adds them. I'm not taking any
192  action when a discovered service disappears yet though. Probably
193  should take of that at some time.
194  '''
195  # rospy.loginfo("Gateway : checking for autodiscovered gateway hubs")
196  try:
197  response = self._list_discovered_services(self._discovery_request)
198  except (rospy.service.ServiceException, rospy.exceptions.ROSInterruptException):
199  # means we've shut down, just return so it can cleanly shutdown back in run()
200  return [], []
201  except (rospy.exceptions.TransportTerminated, AttributeError) as unused_e:
202  # We should never ever see this - but we're calling the zeroconf node
203  # when we may be in a shutdown hook. Instead of getting one of the standard
204  # exceptions above, it gives us anyone of these curious errors.
205  # Rospy could handle this better...
206  return [], []
207  difference = lambda l1, l2: [x for x in l1 if x not in l2]
208  new_services = difference(response.services, self._zeroconf_discovered_hubs)
209  lost_services = difference(self._zeroconf_discovered_hubs, response.services)
210  return new_services, lost_services
211 
212 ###############################################################################
213 # Functions
214 ###############################################################################
215 
216 
217 def _resolve_url(url):
218  '''
219  Resolved a url into ip/port portions using urlparse
220  @var url : The url to parse (may or may not have a scheme)
221  @return (string,int) : ip, port pair
222  '''
223  o = urlparse(url)
224  ip = None
225  port = None
226  try:
227  if o.hostname is not None and o.port is not None:
228  ip, port = str(o.hostname), int(o.port)
229  else:
230  # Explicit attempt to parse hostname:port
231  values = url.split(':')
232  if len(values) == 2:
233  ip, port = str(values[0]), int(values[1])
234  except ValueError:
235  ip, port = None, None
236  return ip, port
237 
238 
239 def _match_url_to_hub_url(url, hub_uri):
240  '''
241  @param url: The original url used to specify the hub
242  @type string
243 
244  @param hub_uri: The uri constructed by the hub, devoid of any URL scheme
245  @type string: of the form ip:port
246  '''
247  (ip, port) = _resolve_url(url)
248  return (hub_uri == str(ip) + ":" + str(port))
249 
250 
252  '''
253  Resolves a zeroconf address into ip/port portions.
254  @var msg : zeroconf_msgs.DiscoveredService
255  @return (string,int) : ip, port pair.
256  '''
257  ip = "localhost"
258  if not msg.is_local:
259  ip = msg.ipv4_addresses[0]
260  return (ip, msg.port)
261 
262 
264  '''
265  @param msg: The original zeroconf address used to specify the hub
266  @type zeroconf_msgs.DiscoveredService
267 
268  @param hub_uri: The uri constructed by the hub, devoid of any URL scheme
269  @type string: of the form ip:port
270  '''
271  (ip, port) = _resolve_address(msg)
272  return (hub_uri == str(ip) + ":" + str(port))
273 
274 
276  '''
277  Check for zeroconf services on startup. If none is found within a suitable
278  timeout, disable this module.
279  '''
280  zeroconf_timeout = 15 # Amount of time to wait for the zeroconf services to appear
281  rospy.loginfo("Gateway : checking if zeroconf services are available...")
282  try:
283  rospy.wait_for_service("zeroconf/add_listener", timeout=zeroconf_timeout)
284  except rospy.ROSException:
285  rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.")
286  return False
287  return True
288 
289 
291  '''
292  Looks for the zeroconf services and attempts to add a rocon hub listener.
293  Make sure this is called only after _zeroconf_services_available returns true.
294  '''
295  try:
296  add_listener = rospy.ServiceProxy("zeroconf/add_listener", zeroconf_srvs.AddListener)
297  if not add_listener(service_type=HubDiscovery.gateway_hub_service):
298  return False
299  except rospy.ROSException:
300  rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.")
301  return False
302  except rospy.ServiceException:
303  rospy.logwarn("Gateway : unable to connect to zeroconf/add_listener service [timeout||crashed]].")
304  return False
305  return True
def _direct_scan(self)
Private methods.
def __init__(self, verify_connection_hook, direct_hub_uri_list=[], disable_zeroconf=False, blacklisted_hubs={})
def _match_zeroconf_address_to_hub_url(msg, hub_uri)
def _match_url_to_hub_url(url, hub_uri)


rocon_hub_client
Author(s): Daniel Stonier , Jihoon Lee
autogenerated on Mon Jun 10 2019 14:40:08