10 from urlparse
import urlparse
13 import zeroconf_msgs.srv
as zeroconf_srvs
14 from gateway_msgs.msg
import ErrorCodes
16 from .
import hub_client
25 gateway_hub_service =
"_ros-multimaster-hub._tcp" 28 Used to discover hubs via zeroconf. 30 def __init__(self, verify_connection_hook, direct_hub_uri_list=[], disable_zeroconf=False, blacklisted_hubs={}):
32 :param external_discovery_update: is a callback function that takes action on a discovery 33 :type external_discovery_update: GatewayNode.register_gateway(ip, port) 35 :param str[] direct_hub_uri_list: list of uri's to hubs (e.g. http://localhost:6380) 37 :param disallowed_hubs: 38 :type disallowed_hubs: # 'ip:port' : (error_code, error_code_str) dictionary of hubs that have been blacklisted (maintained by manager of this class) 40 threading.Thread.__init__(self)
50 self._discovery_request.service_type = HubDiscovery.gateway_hub_service
52 self.
_list_discovered_services = rospy.ServiceProxy(
"zeroconf/list_discovered_services", zeroconf_srvs.ListDiscoveredServices, persistent=
True)
61 Called from the main program to shutdown this thread. 73 The hub discovery thread worker function. Monitors zeroconf for the presence of new hubs. 75 Note that the zeroconf service is persistent. Alternatively we could use the zeroconf 76 subscriber to be a wee bit more efficient. 83 reasons_not_to_keep_scanning = [
86 ErrorCodes.HUB_CONNECTION_NOT_IN_NONEMPTY_WHITELIST,
93 self._discovered_hubs_modification_mutex.acquire()
97 for service
in new_services:
99 service_uri = str(ip) +
':' + str(port)
100 if service_uri
not in self._blacklisted_hubs.keys():
102 if result == ErrorCodes.HUB_CONNECTION_UNRESOLVABLE:
103 if service_uri
not in unresolvable_hub:
104 rospy.loginfo(
"Gateway : unresolvable hub [%s]" % reason)
105 unresolvable_hub.append(service_uri)
106 elif result == ErrorCodes.HUB_CONNECTION_FAILED:
107 rospy.logwarn(
"Gateway : hub connection failed. [%s][%s]" %(service_uri, reason))
108 elif result == ErrorCodes.SUCCESS:
110 if service_uri
in unresolvable_hub:
111 unresolvable_hub.remove(service_uri)
113 rospy.loginfo(
"Gateway : removing hub from the list to be resolved via zeroconf [%s]" % reason)
114 self._zeroconf_discovered_hubs.append(service)
117 for hub_uri
in new_hubs:
120 if result
in reasons_not_to_keep_scanning:
121 rospy.loginfo(
"Gateway : ignoring discovered hub [%s]" % hub_uri)
122 self._direct_discovered_hubs.append(hub_uri)
123 self._discovered_hubs_modification_mutex.release()
125 rospy.logfatal(
"Gateway : zeroconf unavailable and no valid direct hub uris. Stopping hub discovery.")
129 self._list_discovered_services.close()
133 Called when a discovered hub is lost in the upstream application. 135 This method should remove the hub from the list of discovered hubs. 136 When the hub comes back up again, the hub discovery thread will 137 call the discovery_update_hook again 139 @param hub: hub to be disengage 142 self._discovered_hubs_modification_mutex.acquire()
148 self._discovered_hubs_modification_mutex.release()
152 Internal non-interruptible sleep loop to check for shutdown and update triggers. 153 This lets us set a really long watch_loop update if we wish. 166 Ping the list of hubs we are directly looking for to see if they are alive. 167 Also check if the gateway there is listed to determine if the connection should be refreshed 170 already_used_hubs = []
175 rospy.logerr(
"Gateway : Unable to parse direct hub uri [%s]" % uri)
176 remove_uris.append(uri)
178 (ping_result, unused_ping_error_message) = hub_client.ping_hub(hostname, port)
180 discovered_hubs.append(uri)
181 difference =
lambda l1, l2: [x
for x
in l1
if x
not in l2]
187 return new_hubs, lost_hubs
191 This checks for new services and adds them. I'm not taking any 192 action when a discovered service disappears yet though. Probably 193 should take of that at some time. 198 except (rospy.service.ServiceException, rospy.exceptions.ROSInterruptException):
201 except (rospy.exceptions.TransportTerminated, AttributeError)
as unused_e:
207 difference =
lambda l1, l2: [x
for x
in l1
if x
not in l2]
210 return new_services, lost_services
219 Resolved a url into ip/port portions using urlparse 220 @var url : The url to parse (may or may not have a scheme) 221 @return (string,int) : ip, port pair 227 if o.hostname
is not None and o.port
is not None:
228 ip, port = str(o.hostname), int(o.port)
231 values = url.split(
':')
233 ip, port = str(values[0]), int(values[1])
235 ip, port =
None,
None 241 @param url: The original url used to specify the hub 244 @param hub_uri: The uri constructed by the hub, devoid of any URL scheme 245 @type string: of the form ip:port 248 return (hub_uri == str(ip) +
":" + str(port))
253 Resolves a zeroconf address into ip/port portions. 254 @var msg : zeroconf_msgs.DiscoveredService 255 @return (string,int) : ip, port pair. 259 ip = msg.ipv4_addresses[0]
260 return (ip, msg.port)
265 @param msg: The original zeroconf address used to specify the hub 266 @type zeroconf_msgs.DiscoveredService 268 @param hub_uri: The uri constructed by the hub, devoid of any URL scheme 269 @type string: of the form ip:port 272 return (hub_uri == str(ip) +
":" + str(port))
277 Check for zeroconf services on startup. If none is found within a suitable 278 timeout, disable this module. 280 zeroconf_timeout = 15
281 rospy.loginfo(
"Gateway : checking if zeroconf services are available...")
283 rospy.wait_for_service(
"zeroconf/add_listener", timeout=zeroconf_timeout)
284 except rospy.ROSException:
285 rospy.logwarn(
"Gateway : timed out waiting for zeroconf services to become available.")
292 Looks for the zeroconf services and attempts to add a rocon hub listener. 293 Make sure this is called only after _zeroconf_services_available returns true. 296 add_listener = rospy.ServiceProxy(
"zeroconf/add_listener", zeroconf_srvs.AddListener)
297 if not add_listener(service_type=HubDiscovery.gateway_hub_service):
299 except rospy.ROSException:
300 rospy.logwarn(
"Gateway : timed out waiting for zeroconf services to become available.")
302 except rospy.ServiceException:
303 rospy.logwarn(
"Gateway : unable to connect to zeroconf/add_listener service [timeout||crashed]].")
def _direct_scan(self)
Private methods.
def disengage_hub(self, hub)
_zeroconf_discovered_hubs
_list_discovered_services
def _resolve_address(msg)
_zeroconf_services_available
def _zeroconf_services_available()
def __init__(self, verify_connection_hook, direct_hub_uri_list=[], disable_zeroconf=False, blacklisted_hubs={})
def _match_zeroconf_address_to_hub_url(msg, hub_uri)
def _match_url_to_hub_url(url, hub_uri)
_discovered_hubs_modification_mutex