00001
00002
00003
00004
00005
00006
00007
00008
00009 import threading
00010 from urlparse import urlparse
00011 import rospy
00012 import time
00013 import zeroconf_msgs.srv as zeroconf_srvs
00014 from gateway_msgs.msg import ErrorCodes
00015
00016 from . import hub_client
00017
00018
00019
00020
00021
00022
00023 class HubDiscovery(threading.Thread):
00024
00025 gateway_hub_service = "_ros-multimaster-hub._tcp"
00026
00027 '''
00028 Used to discover hubs via zeroconf.
00029 '''
00030 def __init__(self, external_discovery_update_hook, direct_hub_uri_list=[], disable_zeroconf=False, blacklisted_hubs={}):
00031 '''
00032 :param external_discovery_update: is a callback function that takes action on a discovery
00033 :type external_discovery_update: GatewayNode.register_gateway(ip, port)
00034
00035 :param str[] direct_hub_uri_list: list of uri's to hubs (e.g. http://localhost:6380)
00036
00037 :param disallowed_hubs:
00038 :type disallowed_hubs: # 'ip:port' : (error_code, error_code_str) dictionary of hubs that have been blacklisted (maintained by manager of this class)
00039 '''
00040 threading.Thread.__init__(self)
00041 self.discovery_update_hook = external_discovery_update_hook
00042 self._trigger_shutdown = False
00043 self.trigger_update = False
00044 self._direct_hub_uri_list = direct_hub_uri_list
00045 self._direct_discovered_hubs = []
00046 self._zeroconf_services_available = False if disable_zeroconf else _zeroconf_services_available()
00047 self._blacklisted_hubs = blacklisted_hubs
00048 if self._zeroconf_services_available:
00049 self._discovery_request = zeroconf_srvs.ListDiscoveredServicesRequest()
00050 self._discovery_request.service_type = HubDiscovery.gateway_hub_service
00051 _add_listener()
00052 self._list_discovered_services = rospy.ServiceProxy("zeroconf/list_discovered_services", zeroconf_srvs.ListDiscoveredServices, persistent=True)
00053 self._zeroconf_discovered_hubs = []
00054 self._discovered_hubs_modification_mutex = threading.Lock()
00055
00056 if self._zeroconf_services_available or self._direct_hub_uri_list:
00057 self.start()
00058
00059 def shutdown(self):
00060 '''
00061 Called from the main program to shutdown this thread.
00062 '''
00063 self._trigger_shutdown = True
00064 self._trigger_update = True
00065 if self.is_alive():
00066 self.join()
00067
00068 def run(self):
00069 '''
00070 The hub discovery thread worker function. Monitors zeroconf for the presence of new hubs.
00071
00072 We spin fast initially for convenience, and then wind down once we've detected
00073 a hub.
00074
00075 Note that the zeroconf service is persistent. Alternatively we could use the zeroconf
00076 subscriber to be a wee bit more efficient.
00077 '''
00078 half_sec = 0.5
00079 self._loop_period = half_sec
00080 self._internal_sleep_period = half_sec
00081 self._last_loop_timestamp = time.time()
00082
00083 reasons_not_to_keep_scanning = [
00084 ErrorCodes.SUCCESS,
00085 ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS,
00086 ErrorCodes.HUB_CONNECTION_NOT_IN_NONEMPTY_WHITELIST,
00087
00088
00089
00090 ]
00091 unresolvable_hub = []
00092 while not rospy.is_shutdown() and not self._trigger_shutdown:
00093 self._discovered_hubs_modification_mutex.acquire()
00094
00095 if self._zeroconf_services_available:
00096 new_services, unused_lost_services = self._zeroconf_scan()
00097 for service in new_services:
00098 (ip, port) = _resolve_address(service)
00099 service_uri = str(ip) + ':' + str(port)
00100 if service_uri not in self._blacklisted_hubs.keys():
00101 result, reason = self.discovery_update_hook(ip, port)
00102 if result == ErrorCodes.HUB_CONNECTION_UNRESOLVABLE:
00103 if service_uri not in unresolvable_hub:
00104 rospy.loginfo("Gateway : unresolvable hub [%s]" % reason)
00105 unresolvable_hub.append(service_uri)
00106 elif result == ErrorCodes.SUCCESS:
00107
00108 rospy.loginfo("Gateway : discovered hub via zeroconf [%s:%s]" % (str(ip), str(port)))
00109 if service_uri in unresolvable_hub:
00110 unresolvable_hub.remove(service_uri)
00111 else:
00112 rospy.loginfo("Gateway : blacklisting hub [%s]" % reason)
00113 self._zeroconf_discovered_hubs.append(service)
00114
00115 new_hubs, unused_lost_hubs = self._direct_scan()
00116 for hub_uri in new_hubs:
00117 hostname, port = _resolve_url(hub_uri)
00118 rospy.loginfo("Gateway : discovered hub directly [%s]" % hub_uri)
00119 result, _ = self.discovery_update_hook(hostname, port)
00120 if result in reasons_not_to_keep_scanning:
00121 self._direct_discovered_hubs.append(hub_uri)
00122 self._discovered_hubs_modification_mutex.release()
00123 if not self._zeroconf_services_available and not self._direct_hub_uri_list:
00124 rospy.logfatal("Gateway : zeroconf unavailable and no valid direct hub uris. Stopping hub discovery.")
00125 break
00126 self._sleep()
00127 if self._zeroconf_services_available:
00128 self._list_discovered_services.close()
00129
00130 def disengage_hub(self, hub):
00131 '''
00132 Called when a discovered hub is lost in the upstream application.
00133
00134 This method should remove the hub from the list of discovered hubs.
00135 When the hub comes back up again, the hub discovery thread will
00136 call the discovery_update_hook again
00137
00138 @param hub: hub to be disengage
00139 @type Hub
00140 '''
00141 self._discovered_hubs_modification_mutex.acquire()
00142 self._direct_discovered_hubs[:] = [x for x in self._direct_discovered_hubs
00143 if not _match_url_to_hub_url(x, hub.uri)]
00144 if self._zeroconf_services_available:
00145 self._zeroconf_discovered_hubs[:] = [x for x in self._zeroconf_discovered_hubs
00146 if not _match_zeroconf_address_to_hub_url(x, hub.uri)]
00147 self._discovered_hubs_modification_mutex.release()
00148
00149 def _sleep(self):
00150 '''
00151 Internal non-interruptible sleep loop to check for shutdown and update triggers.
00152 This lets us set a really long watch_loop update if we wish.
00153 '''
00154 while not rospy.is_shutdown() and not self.trigger_update and (time.time() - self._last_loop_timestamp < self._loop_period):
00155 rospy.rostime.wallsleep(self._internal_sleep_period)
00156 self.trigger_update = False
00157 self._last_loop_timestamp = time.time()
00158
00159
00160
00161
00162
00163 def _direct_scan(self):
00164 '''
00165 Ping the list of hubs we are directly looking for to see if they are alive.
00166 '''
00167 discovered_hubs = []
00168 remove_uris = []
00169 for uri in self._direct_hub_uri_list:
00170 (hostname, port) = _resolve_url(uri)
00171 if not hostname:
00172 rospy.logerr("Gateway : Unable to parse direct hub uri [%s]" % uri)
00173 remove_uris.append(uri)
00174 continue
00175 (ping_result, unused_ping_error_message) = hub_client.ping_hub(hostname, port)
00176 if ping_result:
00177 discovered_hubs.append(uri)
00178 difference = lambda l1, l2: [x for x in l1 if x not in l2]
00179 self._direct_hub_uri_list[:] = difference(self._direct_hub_uri_list, remove_uris)
00180 new_hubs = difference(discovered_hubs, self._direct_discovered_hubs)
00181 lost_hubs = difference(self._direct_discovered_hubs, discovered_hubs)
00182
00183
00184 return new_hubs, lost_hubs
00185
00186 def _zeroconf_scan(self):
00187 '''
00188 This checks for new services and adds them. I'm not taking any
00189 action when a discovered service disappears yet though. Probably
00190 should take of that at some time.
00191 '''
00192
00193 try:
00194 response = self._list_discovered_services(self._discovery_request)
00195 except (rospy.service.ServiceException, rospy.exceptions.ROSInterruptException):
00196
00197 return [], []
00198 except (rospy.exceptions.TransportTerminated, AttributeError) as unused_e:
00199
00200
00201
00202
00203 return [], []
00204 difference = lambda l1, l2: [x for x in l1 if x not in l2]
00205 new_services = difference(response.services, self._zeroconf_discovered_hubs)
00206 lost_services = difference(self._zeroconf_discovered_hubs, response.services)
00207 return new_services, lost_services
00208
00209
00210
00211
00212
00213
00214 def _resolve_url(url):
00215 '''
00216 Resolved a url into ip/port portions using urlparse
00217 @var url : The url to parse (may or may not have a scheme)
00218 @return (string,int) : ip, port pair
00219 '''
00220 o = urlparse(url)
00221 ip = None
00222 port = None
00223 try:
00224 if o.hostname is not None and o.port is not None:
00225 ip, port = str(o.hostname), int(o.port)
00226 else:
00227
00228 values = url.split(':')
00229 if len(values) == 2:
00230 ip, port = str(values[0]), int(values[1])
00231 except ValueError:
00232 ip, port = None, None
00233 return ip, port
00234
00235
00236 def _match_url_to_hub_url(url, hub_uri):
00237 '''
00238 @param url: The original url used to specify the hub
00239 @type string
00240
00241 @param hub_uri: The uri constructed by the hub, devoid of any URL scheme
00242 @type string: of the form ip:port
00243 '''
00244 (ip, port) = _resolve_url(url)
00245 return (hub_uri == str(ip) + ":" + str(port))
00246
00247
00248 def _resolve_address(msg):
00249 '''
00250 Resolves a zeroconf address into ip/port portions.
00251 @var msg : zeroconf_msgs.DiscoveredService
00252 @return (string,int) : ip, port pair.
00253 '''
00254 ip = "localhost"
00255 if not msg.is_local:
00256 ip = msg.ipv4_addresses[0]
00257 return (ip, msg.port)
00258
00259
00260 def _match_zeroconf_address_to_hub_url(msg, hub_uri):
00261 '''
00262 @param msg: The original zeroconf address used to specify the hub
00263 @type zeroconf_msgs.DiscoveredService
00264
00265 @param hub_uri: The uri constructed by the hub, devoid of any URL scheme
00266 @type string: of the form ip:port
00267 '''
00268 (ip, port) = _resolve_address(msg)
00269 return (hub_uri == str(ip) + ":" + str(port))
00270
00271
00272 def _zeroconf_services_available():
00273 '''
00274 Check for zeroconf services on startup. If none is found within a suitable
00275 timeout, disable this module.
00276 '''
00277 zeroconf_timeout = 15
00278 rospy.loginfo("Gateway : checking if zeroconf services are available...")
00279 try:
00280 rospy.wait_for_service("zeroconf/add_listener", timeout=zeroconf_timeout)
00281 except rospy.ROSException:
00282 rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.")
00283 return False
00284 return True
00285
00286
00287 def _add_listener():
00288 '''
00289 Looks for the zeroconf services and attempts to add a rocon hub listener.
00290 Make sure this is called only after _zeroconf_services_available returns true.
00291 '''
00292 try:
00293 add_listener = rospy.ServiceProxy("zeroconf/add_listener", zeroconf_srvs.AddListener)
00294 if not add_listener(service_type=HubDiscovery.gateway_hub_service):
00295 return False
00296 except rospy.ROSException:
00297 rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.")
00298 return False
00299 except rospy.ServiceException:
00300 rospy.logwarn("Gateway : unable to connect to zeroconf/add_listener service [timeout||crashed]].")
00301 return False
00302 return True