hub_discovery.py
Go to the documentation of this file.
00001 #
00002 # License: BSD
00003 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
00004 #
00005 ###############################################################################
00006 # Imports
00007 ###############################################################################
00008 
00009 import threading
00010 from urlparse import urlparse
00011 import rospy
00012 import time
00013 import zeroconf_msgs.srv as zeroconf_srvs
00014 from gateway_msgs.msg import ErrorCodes
00015 
00016 from . import hub_client
00017 
00018 ###############################################################################
00019 # Thread
00020 ###############################################################################
00021 
00022 
00023 class HubDiscovery(threading.Thread):
00024 
00025     gateway_hub_service = "_ros-multimaster-hub._tcp"
00026 
00027     '''
00028       Used to discover hubs via zeroconf.
00029     '''
00030     def __init__(self, external_discovery_update_hook, direct_hub_uri_list=[], disable_zeroconf=False, blacklisted_hubs={}):
00031         '''
00032           :param external_discovery_update: is a callback function that takes action on a discovery
00033           :type external_discovery_update: GatewayNode.register_gateway(ip, port)
00034 
00035           :param str[] direct_hub_uri_list: list of uri's to hubs (e.g. http://localhost:6380)
00036 
00037           :param disallowed_hubs:
00038           :type disallowed_hubs: # 'ip:port' : (error_code, error_code_str) dictionary of hubs that have been blacklisted (maintained by manager of this class)
00039         '''
00040         threading.Thread.__init__(self)
00041         self.discovery_update_hook = external_discovery_update_hook
00042         self._trigger_shutdown = False
00043         self.trigger_update = False
00044         self._direct_hub_uri_list = direct_hub_uri_list
00045         self._direct_discovered_hubs = []
00046         self._zeroconf_services_available = False if disable_zeroconf else _zeroconf_services_available()
00047         self._blacklisted_hubs = blacklisted_hubs
00048         if self._zeroconf_services_available:
00049             self._discovery_request = zeroconf_srvs.ListDiscoveredServicesRequest()
00050             self._discovery_request.service_type = HubDiscovery.gateway_hub_service
00051             _add_listener()
00052             self._list_discovered_services = rospy.ServiceProxy("zeroconf/list_discovered_services", zeroconf_srvs.ListDiscoveredServices, persistent=True)
00053             self._zeroconf_discovered_hubs = []
00054         self._discovered_hubs_modification_mutex = threading.Lock()
00055         # Only run the thread if we need to.
00056         if self._zeroconf_services_available or self._direct_hub_uri_list:
00057             self.start()
00058 
00059     def shutdown(self):
00060         '''
00061           Called from the main program to shutdown this thread.
00062         '''
00063         self._trigger_shutdown = True
00064         self._trigger_update = True  # causes it to interrupt a sleep and drop back to check shutdown condition
00065         if self.is_alive():  # python complains if you join a non-started thread
00066             self.join()  # wait for the thread to finish
00067 
00068     def run(self):
00069         '''
00070           The hub discovery thread worker function. Monitors zeroconf for the presence of new hubs.
00071 
00072           We spin fast initially for convenience, and then wind down once we've detected
00073           a hub.
00074 
00075           Note that the zeroconf service is persistent. Alternatively we could use the zeroconf
00076           subscriber to be a wee bit more efficient.
00077         '''
00078         half_sec = 0.5  # rospy.Duration(0, 500000000)
00079         self._loop_period = half_sec
00080         self._internal_sleep_period = half_sec
00081         self._last_loop_timestamp = time.time()  # rospy.Time.now()
00082         # error codes which inform the client is should stop scanning for this hub
00083         reasons_not_to_keep_scanning = [
00084             ErrorCodes.SUCCESS,
00085             ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS,
00086             ErrorCodes.HUB_CONNECTION_NOT_IN_NONEMPTY_WHITELIST,
00087             # this now has to be permitted as we will often have zeroconf failing for gateways
00088             # that have dropped out of wireless range.
00089             # ErrorCodes.HUB_CONNECTION_UNRESOLVABLE
00090         ]
00091         unresolvable_hub = []
00092         while not rospy.is_shutdown() and not self._trigger_shutdown:
00093             self._discovered_hubs_modification_mutex.acquire()
00094             # Zeroconf scanning
00095             if self._zeroconf_services_available:
00096                 new_services, unused_lost_services = self._zeroconf_scan()
00097                 for service in new_services:
00098                     (ip, port) = _resolve_address(service)
00099                     service_uri = str(ip) + ':' + str(port)
00100                     if service_uri not in self._blacklisted_hubs.keys():
00101                         result, reason = self.discovery_update_hook(ip, port)
00102                         if result == ErrorCodes.HUB_CONNECTION_UNRESOLVABLE:
00103                             if service_uri not in unresolvable_hub:
00104                                 rospy.loginfo("Gateway : unresolvable hub [%s]" % reason)
00105                                 unresolvable_hub.append(service_uri)
00106                         elif result == ErrorCodes.SUCCESS:
00107                             # we're good
00108                             rospy.loginfo("Gateway : discovered hub via zeroconf [%s:%s]" % (str(ip), str(port)))
00109                             if service_uri in unresolvable_hub:
00110                                 unresolvable_hub.remove(service_uri)
00111                         else:  # any of the other reasons not to keep scanning
00112                             rospy.loginfo("Gateway : blacklisting hub [%s]" % reason)
00113                             self._zeroconf_discovered_hubs.append(service)
00114             # Direct scanning
00115             new_hubs, unused_lost_hubs = self._direct_scan()
00116             for hub_uri in new_hubs:
00117                 hostname, port = _resolve_url(hub_uri)
00118                 rospy.loginfo("Gateway : discovered hub directly [%s]" % hub_uri)
00119                 result, _ = self.discovery_update_hook(hostname, port)
00120                 if result in reasons_not_to_keep_scanning:
00121                     self._direct_discovered_hubs.append(hub_uri)
00122             self._discovered_hubs_modification_mutex.release()
00123             if not self._zeroconf_services_available and not self._direct_hub_uri_list:
00124                 rospy.logfatal("Gateway : zeroconf unavailable and no valid direct hub uris. Stopping hub discovery.")
00125                 break  # nothing left to do
00126             self._sleep()
00127         if self._zeroconf_services_available:
00128             self._list_discovered_services.close()
00129 
00130     def disengage_hub(self, hub):
00131         '''
00132           Called when a discovered hub is lost in the upstream application.
00133 
00134           This method should remove the hub from the list of discovered hubs.
00135           When the hub comes back up again, the hub discovery thread will
00136           call the discovery_update_hook again
00137 
00138           @param hub: hub to be disengage
00139           @type Hub
00140         '''
00141         self._discovered_hubs_modification_mutex.acquire()
00142         self._direct_discovered_hubs[:] = [x for x in self._direct_discovered_hubs
00143                                            if not _match_url_to_hub_url(x, hub.uri)]
00144         if self._zeroconf_services_available:
00145             self._zeroconf_discovered_hubs[:] = [x for x in self._zeroconf_discovered_hubs
00146                                                  if not _match_zeroconf_address_to_hub_url(x, hub.uri)]
00147         self._discovered_hubs_modification_mutex.release()
00148 
00149     def _sleep(self):
00150         '''
00151           Internal non-interruptible sleep loop to check for shutdown and update triggers.
00152           This lets us set a really long watch_loop update if we wish.
00153         '''
00154         while not rospy.is_shutdown() and not self.trigger_update and (time.time() - self._last_loop_timestamp < self._loop_period):
00155             rospy.rostime.wallsleep(self._internal_sleep_period)
00156         self.trigger_update = False
00157         self._last_loop_timestamp = time.time()
00158 
00159     #############################
00160     # Private methods
00161     #############################
00162 
00163     def _direct_scan(self):
00164         '''
00165           Ping the list of hubs we are directly looking for to see if they are alive.
00166         '''
00167         discovered_hubs = []
00168         remove_uris = []
00169         for uri in self._direct_hub_uri_list:
00170             (hostname, port) = _resolve_url(uri)
00171             if not hostname:
00172                 rospy.logerr("Gateway : Unable to parse direct hub uri [%s]" % uri)
00173                 remove_uris.append(uri)
00174                 continue
00175             (ping_result, unused_ping_error_message) = hub_client.ping_hub(hostname, port)
00176             if ping_result:
00177                 discovered_hubs.append(uri)
00178         difference = lambda l1, l2: [x for x in l1 if x not in l2]
00179         self._direct_hub_uri_list[:] = difference(self._direct_hub_uri_list, remove_uris)
00180         new_hubs = difference(discovered_hubs, self._direct_discovered_hubs)
00181         lost_hubs = difference(self._direct_discovered_hubs, discovered_hubs)
00182         # self._direct_discovered_hubs = discovered_hubs
00183         # self._direct_discovered_hubs.extend(discovered_hubs)
00184         return new_hubs, lost_hubs
00185 
00186     def _zeroconf_scan(self):
00187         '''
00188           This checks for new services and adds them. I'm not taking any
00189           action when a discovered service disappears yet though. Probably
00190           should take of that at some time.
00191         '''
00192         # rospy.loginfo("Gateway : checking for autodiscovered gateway hubs")
00193         try:
00194             response = self._list_discovered_services(self._discovery_request)
00195         except (rospy.service.ServiceException, rospy.exceptions.ROSInterruptException):
00196             # means we've shut down, just return so it can cleanly shutdown back in run()
00197             return [], []
00198         except (rospy.exceptions.TransportTerminated, AttributeError) as unused_e:
00199             # We should never ever see this - but we're calling the zeroconf node
00200             # when we may be in a shutdown hook. Instead of getting one of the standard
00201             # exceptions above, it gives us anyone of these curious errors.
00202             # Rospy could handle this better...
00203             return [], []
00204         difference = lambda l1, l2: [x for x in l1 if x not in l2]
00205         new_services = difference(response.services, self._zeroconf_discovered_hubs)
00206         lost_services = difference(self._zeroconf_discovered_hubs, response.services)
00207         return new_services, lost_services
00208 
00209 ###############################################################################
00210 # Functions
00211 ###############################################################################
00212 
00213 
00214 def _resolve_url(url):
00215     '''
00216       Resolved a url into ip/port portions using urlparse
00217       @var url : The url to parse (may or may not have a scheme)
00218       @return (string,int) : ip, port pair
00219     '''
00220     o = urlparse(url)
00221     ip = None
00222     port = None
00223     try:
00224         if o.hostname is not None and o.port is not None:
00225             ip, port = str(o.hostname), int(o.port)
00226         else:
00227             # Explicit attempt to parse hostname:port
00228             values = url.split(':')
00229             if len(values) == 2:
00230                 ip, port = str(values[0]), int(values[1])
00231     except ValueError:
00232         ip, port = None, None
00233     return ip, port
00234 
00235 
00236 def _match_url_to_hub_url(url, hub_uri):
00237     '''
00238       @param url: The original url used to specify the hub
00239       @type string
00240 
00241       @param hub_uri: The uri constructed by the hub, devoid of any URL scheme
00242       @type string: of the form ip:port
00243     '''
00244     (ip, port) = _resolve_url(url)
00245     return (hub_uri == str(ip) + ":" + str(port))
00246 
00247 
00248 def _resolve_address(msg):
00249     '''
00250       Resolves a zeroconf address into ip/port portions.
00251       @var msg : zeroconf_msgs.DiscoveredService
00252       @return (string,int) : ip, port pair.
00253     '''
00254     ip = "localhost"
00255     if not msg.is_local:
00256         ip = msg.ipv4_addresses[0]
00257     return (ip, msg.port)
00258 
00259 
00260 def _match_zeroconf_address_to_hub_url(msg, hub_uri):
00261     '''
00262       @param msg: The original zeroconf address used to specify the hub
00263       @type zeroconf_msgs.DiscoveredService
00264 
00265       @param hub_uri: The uri constructed by the hub, devoid of any URL scheme
00266       @type string: of the form ip:port
00267     '''
00268     (ip, port) = _resolve_address(msg)
00269     return (hub_uri == str(ip) + ":" + str(port))
00270 
00271 
00272 def _zeroconf_services_available():
00273     '''
00274       Check for zeroconf services on startup. If none is found within a suitable
00275       timeout, disable this module.
00276     '''
00277     zeroconf_timeout = 15  # Amount of time to wait for the zeroconf services to appear
00278     rospy.loginfo("Gateway : checking if zeroconf services are available...")
00279     try:
00280         rospy.wait_for_service("zeroconf/add_listener", timeout=zeroconf_timeout)
00281     except rospy.ROSException:
00282         rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.")
00283         return False
00284     return True
00285 
00286 
00287 def _add_listener():
00288     '''
00289       Looks for the zeroconf services and attempts to add a rocon hub listener.
00290       Make sure this is called only after _zeroconf_services_available returns true.
00291     '''
00292     try:
00293         add_listener = rospy.ServiceProxy("zeroconf/add_listener", zeroconf_srvs.AddListener)
00294         if not add_listener(service_type=HubDiscovery.gateway_hub_service):
00295             return False
00296     except rospy.ROSException:
00297         rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.")
00298         return False
00299     except rospy.ServiceException:
00300         rospy.logwarn("Gateway : unable to connect to zeroconf/add_listener service [timeout||crashed]].")
00301         return False
00302     return True


rocon_hub_client
Author(s): Daniel Stonier , Jihoon Lee
autogenerated on Sat Jun 8 2019 18:48:43