Package rocon_hub_client :: Module hub_discovery
[frames] | no frames]

Source Code for Module rocon_hub_client.hub_discovery

  1  #!/usr/bin/env python 
  2  # 
  3  # License: BSD 
  4  #   https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro-devel/rocon_hub_client/LICENSE 
  5  # 
  6   
  7  ############################################################################### 
  8  # Imports 
  9  ############################################################################### 
 10   
 11  import threading 
 12  from urlparse import urlparse 
 13  import rospy 
 14  import zeroconf_msgs.srv as zeroconf_srvs 
 15   
 16  # local imports 
 17  import hub_api 
 18   
 19  ############################################################################### 
 20  # Thread 
 21  ############################################################################### 
 22   
 23   
24 -class HubDiscovery(threading.Thread):
25 26 gateway_hub_service = "_ros-multimaster-hub._tcp" 27 28 ''' 29 Used to discover hubs via zeroconf. 30 '''
31 - def __init__(self, external_discovery_update_hook, direct_hub_uri_list=[], disable_zeroconf=False):
32 ''' 33 @param external_discovery_update is a callback function that takes action on a discovery 34 @type gateway_node.update_discovery_hook(ip, port) 35 36 @param direct_hub_uri_list : list of uri's to hubs (e.g. http://localhost:6380 37 @type list of uri 38 ''' 39 threading.Thread.__init__(self) 40 self.discovery_update_hook = external_discovery_update_hook 41 self._trigger_shutdown = False 42 self.trigger_update = False 43 self._direct_hub_uri_list = direct_hub_uri_list 44 self._direct_discovered_hubs = [] 45 self._zeroconf_services_available = False if disable_zeroconf else _zeroconf_services_available() 46 if self._zeroconf_services_available: 47 self._discovery_request = zeroconf_srvs.ListDiscoveredServicesRequest() 48 self._discovery_request.service_type = HubDiscovery.gateway_hub_service 49 _add_listener() 50 self._list_discovered_services = rospy.ServiceProxy("zeroconf/list_discovered_services", zeroconf_srvs.ListDiscoveredServices, persistent=True) 51 self._zeroconf_discovered_hubs = [] 52 # Only run the thread if we need to. 53 if self._zeroconf_services_available or self._direct_hub_uri_list: 54 self.start()
55
56 - def shutdown(self):
57 ''' 58 Called from the main program to shutdown this thread. 59 ''' 60 self._trigger_shutdown = True 61 self._trigger_update = True # causes it to interrupt a sleep and drop back to check shutdown condition 62 if self.is_alive(): # python complains if you join a non-started thread 63 self.join() # wait for the thread to finish
64
65 - def run(self):
66 ''' 67 The hub discovery thread worker function. Monitors zeroconf for the presence of new hubs. 68 69 We spin fast initially for convenience, and then wind down once we've detected 70 a hub. 71 72 Note that the zeroconf service is persistent. Alternatively we could use the zeroconf 73 subscriber to be a wee bit more efficient. 74 ''' 75 half_sec = rospy.Duration(0, 500000000) 76 self._loop_period = half_sec 77 self._internal_sleep_period = half_sec 78 self._last_loop_timestamp = rospy.Time.now() 79 while not rospy.is_shutdown() and not self._trigger_shutdown: 80 # Zeroconf scanning 81 if self._zeroconf_services_available: 82 new_services, unused_lost_services = self._zeroconf_scan() 83 for service in new_services: 84 (ip, port) = _resolve_address(service) 85 rospy.loginfo("Gateway : discovered hub via zeroconf [%s:%s]" % (str(ip), str(port))) 86 self.discovery_update_hook(ip, port) 87 # Direct scanning 88 new_hubs, unused_lost_hubs = self._direct_scan() 89 for hub_uri in new_hubs: 90 hostname, port = _resolve_url(hub_uri) 91 rospy.loginfo("Gateway : discovered hub directly [%s]" % hub_uri) 92 self.discovery_update_hook(hostname, port) 93 if not self._zeroconf_services_available and not self._direct_hub_uri_list: 94 rospy.logfatal("Gateway : zeroconf unavailable and no valid direct hub uris. Stopping hub discovery.") 95 break # nothing left to do 96 self._sleep() 97 if self._zeroconf_services_available: 98 self._list_discovered_services.close()
99
100 - def _sleep(self):
101 ''' 102 Internal interruptible sleep loop to check for shutdown and update triggers. 103 This lets us set a really long watch_loop update if we wish. 104 ''' 105 while not rospy.is_shutdown() and not self.trigger_update and (rospy.Time.now() - self._last_loop_timestamp < self._loop_period): 106 rospy.sleep(self._internal_sleep_period) 107 self.trigger_update = False 108 self._last_loop_timestamp = rospy.Time.now()
109 110 ############################# 111 # Private methods 112 ############################# 113
114 - def _direct_scan(self):
115 ''' 116 Ping the list of hubs we are directly looking for to see if they are alive. 117 ''' 118 discovered_hubs = [] 119 remove_uris = [] 120 for uri in self._direct_hub_uri_list: 121 (hostname, port) = _resolve_url(uri) 122 if not hostname: 123 rospy.logerr("Gateway : Unable to parse direct hub uri [%s]" % uri) 124 remove_uris.append(uri) 125 continue 126 if hub_api.ping_hub(hostname, port): 127 discovered_hubs.append(uri) 128 self._direct_hub_uri_list[:] = [x for x in self._direct_hub_uri_list 129 if x not in remove_uris] 130 difference = lambda l1, l2: [x for x in l1 if x not in l2] 131 new_hubs = difference(discovered_hubs, self._direct_discovered_hubs) 132 lost_hubs = difference(self._direct_discovered_hubs, discovered_hubs) 133 self._direct_discovered_hubs = discovered_hubs 134 return new_hubs, lost_hubs
135
136 - def _zeroconf_scan(self):
137 ''' 138 This checks for new services and adds them. I'm not taking any 139 action when a discovered service disappears yet though. Probably 140 should take of that at some time. 141 ''' 142 #rospy.loginfo("Gateway : checking for autodiscovered gateway hubs") 143 try: 144 response = self._list_discovered_services(self._discovery_request) 145 except rospy.service.ServiceException: 146 # means we've shut down, just return so it can cleanly shutdown back in run() 147 return [], [] 148 difference = lambda l1, l2: [x for x in l1 if x not in l2] 149 new_services = difference(response.services, self._zeroconf_discovered_hubs) 150 lost_services = difference(self._zeroconf_discovered_hubs, response.services) 151 self._zeroconf_discovered_hubs = response.services 152 #self._zeroconf_discovered_hubs.extend(new_services) 153 return new_services, lost_services
154 155 156 ############################################################################### 157 # Functions 158 ############################################################################### 159
160 -def _resolve_url(url):
161 ''' 162 Resolved a url into ip/port portions using urlparse 163 @var url : The url to parse (may or may not have a scheme) 164 @return (string,int) : ip, port pair 165 ''' 166 o = urlparse(url) 167 ip = None 168 port = None 169 try: 170 if o.hostname is not None and o.port is not None: 171 ip, port = str(o.hostname), int(o.port) 172 else: 173 # Explicit attempt to parse hostname:port 174 values = url.split(':') 175 if len(values) == 2: 176 ip, port = str(values[0]), int(values[1]) 177 except ValueError: 178 ip, port = None, None 179 return ip, port
180
181 -def _resolve_address(msg):
182 ''' 183 Resolves a zeroconf address into ip/port portions. 184 @var msg : zeroconf_msgs.DiscoveredService 185 @return (string,int) : ip, port pair. 186 ''' 187 ip = "localhost" 188 if not msg.is_local: 189 ip = msg.ipv4_addresses[0] 190 return (ip, msg.port)
191 192
193 -def _zeroconf_services_available():
194 ''' 195 Check for zeroconf services on startup. If none is found within a suitable 196 timeout, disable this module. 197 ''' 198 zeroconf_timeout = 5 # Amount of time to wait for the zeroconf services to appear 199 rospy.loginfo("Gateway : checking if zeroconf services are available...") 200 try: 201 rospy.wait_for_service("zeroconf/add_listener", timeout=zeroconf_timeout) 202 except rospy.ROSException: 203 rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.") 204 return False 205 return True
206 207
208 -def _add_listener():
209 ''' 210 Looks for the zeroconf services and attempts to add a rocon hub listener. 211 Make sure this is called only after _zeroconf_services_available returns true. 212 ''' 213 try: 214 add_listener = rospy.ServiceProxy("zeroconf/add_listener", zeroconf_srvs.AddListener) 215 if not add_listener(service_type=HubDiscovery.gateway_hub_service): 216 return False 217 except rospy.ROSException: 218 rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.") 219 return False 220 return True
221