1
2
3
4
5
6
7
8
9
10
11 import threading
12 from urlparse import urlparse
13 import rospy
14 import zeroconf_msgs.srv as zeroconf_srvs
15
16
17 import hub_api
18
19
20
21
22
23
25
26 gateway_hub_service = "_ros-multimaster-hub._tcp"
27
28 '''
29 Used to discover hubs via zeroconf.
30 '''
31 - def __init__(self, external_discovery_update_hook, direct_hub_uri_list=[], disable_zeroconf=False):
32 '''
33 @param external_discovery_update is a callback function that takes action on a discovery
34 @type gateway_node.update_discovery_hook(ip, port)
35
36 @param direct_hub_uri_list : list of uri's to hubs (e.g. http://localhost:6380
37 @type list of uri
38 '''
39 threading.Thread.__init__(self)
40 self.discovery_update_hook = external_discovery_update_hook
41 self._trigger_shutdown = False
42 self.trigger_update = False
43 self._direct_hub_uri_list = direct_hub_uri_list
44 self._direct_discovered_hubs = []
45 self._zeroconf_services_available = False if disable_zeroconf else _zeroconf_services_available()
46 if self._zeroconf_services_available:
47 self._discovery_request = zeroconf_srvs.ListDiscoveredServicesRequest()
48 self._discovery_request.service_type = HubDiscovery.gateway_hub_service
49 _add_listener()
50 self._list_discovered_services = rospy.ServiceProxy("zeroconf/list_discovered_services", zeroconf_srvs.ListDiscoveredServices, persistent=True)
51 self._zeroconf_discovered_hubs = []
52
53 if self._zeroconf_services_available or self._direct_hub_uri_list:
54 self.start()
55
57 '''
58 Called from the main program to shutdown this thread.
59 '''
60 self._trigger_shutdown = True
61 self._trigger_update = True
62 if self.is_alive():
63 self.join()
64
66 '''
67 The hub discovery thread worker function. Monitors zeroconf for the presence of new hubs.
68
69 We spin fast initially for convenience, and then wind down once we've detected
70 a hub.
71
72 Note that the zeroconf service is persistent. Alternatively we could use the zeroconf
73 subscriber to be a wee bit more efficient.
74 '''
75 half_sec = rospy.Duration(0, 500000000)
76 self._loop_period = half_sec
77 self._internal_sleep_period = half_sec
78 self._last_loop_timestamp = rospy.Time.now()
79 while not rospy.is_shutdown() and not self._trigger_shutdown:
80
81 if self._zeroconf_services_available:
82 new_services, unused_lost_services = self._zeroconf_scan()
83 for service in new_services:
84 (ip, port) = _resolve_address(service)
85 rospy.loginfo("Gateway : discovered hub via zeroconf [%s:%s]" % (str(ip), str(port)))
86 self.discovery_update_hook(ip, port)
87
88 new_hubs, unused_lost_hubs = self._direct_scan()
89 for hub_uri in new_hubs:
90 hostname, port = _resolve_url(hub_uri)
91 rospy.loginfo("Gateway : discovered hub directly [%s]" % hub_uri)
92 self.discovery_update_hook(hostname, port)
93 if not self._zeroconf_services_available and not self._direct_hub_uri_list:
94 rospy.logfatal("Gateway : zeroconf unavailable and no valid direct hub uris. Stopping hub discovery.")
95 break
96 self._sleep()
97 if self._zeroconf_services_available:
98 self._list_discovered_services.close()
99
101 '''
102 Internal interruptible sleep loop to check for shutdown and update triggers.
103 This lets us set a really long watch_loop update if we wish.
104 '''
105 while not rospy.is_shutdown() and not self.trigger_update and (rospy.Time.now() - self._last_loop_timestamp < self._loop_period):
106 rospy.sleep(self._internal_sleep_period)
107 self.trigger_update = False
108 self._last_loop_timestamp = rospy.Time.now()
109
110
111
112
113
115 '''
116 Ping the list of hubs we are directly looking for to see if they are alive.
117 '''
118 discovered_hubs = []
119 remove_uris = []
120 for uri in self._direct_hub_uri_list:
121 (hostname, port) = _resolve_url(uri)
122 if not hostname:
123 rospy.logerr("Gateway : Unable to parse direct hub uri [%s]" % uri)
124 remove_uris.append(uri)
125 continue
126 if hub_api.ping_hub(hostname, port):
127 discovered_hubs.append(uri)
128 self._direct_hub_uri_list[:] = [x for x in self._direct_hub_uri_list
129 if x not in remove_uris]
130 difference = lambda l1, l2: [x for x in l1 if x not in l2]
131 new_hubs = difference(discovered_hubs, self._direct_discovered_hubs)
132 lost_hubs = difference(self._direct_discovered_hubs, discovered_hubs)
133 self._direct_discovered_hubs = discovered_hubs
134 return new_hubs, lost_hubs
135
137 '''
138 This checks for new services and adds them. I'm not taking any
139 action when a discovered service disappears yet though. Probably
140 should take of that at some time.
141 '''
142
143 try:
144 response = self._list_discovered_services(self._discovery_request)
145 except rospy.service.ServiceException:
146
147 return [], []
148 difference = lambda l1, l2: [x for x in l1 if x not in l2]
149 new_services = difference(response.services, self._zeroconf_discovered_hubs)
150 lost_services = difference(self._zeroconf_discovered_hubs, response.services)
151 self._zeroconf_discovered_hubs = response.services
152
153 return new_services, lost_services
154
155
156
157
158
159
161 '''
162 Resolved a url into ip/port portions using urlparse
163 @var url : The url to parse (may or may not have a scheme)
164 @return (string,int) : ip, port pair
165 '''
166 o = urlparse(url)
167 ip = None
168 port = None
169 try:
170 if o.hostname is not None and o.port is not None:
171 ip, port = str(o.hostname), int(o.port)
172 else:
173
174 values = url.split(':')
175 if len(values) == 2:
176 ip, port = str(values[0]), int(values[1])
177 except ValueError:
178 ip, port = None, None
179 return ip, port
180
182 '''
183 Resolves a zeroconf address into ip/port portions.
184 @var msg : zeroconf_msgs.DiscoveredService
185 @return (string,int) : ip, port pair.
186 '''
187 ip = "localhost"
188 if not msg.is_local:
189 ip = msg.ipv4_addresses[0]
190 return (ip, msg.port)
191
192
194 '''
195 Check for zeroconf services on startup. If none is found within a suitable
196 timeout, disable this module.
197 '''
198 zeroconf_timeout = 5
199 rospy.loginfo("Gateway : checking if zeroconf services are available...")
200 try:
201 rospy.wait_for_service("zeroconf/add_listener", timeout=zeroconf_timeout)
202 except rospy.ROSException:
203 rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.")
204 return False
205 return True
206
207
209 '''
210 Looks for the zeroconf services and attempts to add a rocon hub listener.
211 Make sure this is called only after _zeroconf_services_available returns true.
212 '''
213 try:
214 add_listener = rospy.ServiceProxy("zeroconf/add_listener", zeroconf_srvs.AddListener)
215 if not add_listener(service_type=HubDiscovery.gateway_hub_service):
216 return False
217 except rospy.ROSException:
218 rospy.logwarn("Gateway : timed out waiting for zeroconf services to become available.")
219 return False
220 return True
221