gateway_node.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
00005 #
00006 ##############################################################################
00007 # Imports
00008 ##############################################################################
00009 
00010 import rospy
00011 import rocon_gateway
00012 import uuid
00013 import gateway_msgs.msg as gateway_msgs
00014 import gateway_msgs.srv as gateway_srvs
00015 import std_msgs.msg as std_msgs
00016 import std_srvs.srv as std_srvs
00017 from urlparse import urlparse
00018 import rocon_hub_client
00019 
00020 from . import gateway
00021 from . import hub_manager
00022 
00023 ##############################################################################
00024 # Gateway Configuration and Main Loop Class
00025 ##############################################################################
00026 
00027 
00028 class GatewayNode():
00029 
00030     '''
00031       Currently this just provides getup and go for the gateway.
00032     '''
00033     ##########################################################################
00034     # Init & Shutdown
00035     ##########################################################################
00036 
00037     def __init__(self):
00038         self._param = rocon_gateway.setup_ros_parameters()
00039         if self._param['disable_uuids']:
00040             self._unique_name = self._param['name']
00041             rospy.logwarn("Gateway : uuid's disabled, using possibly non-unique name [%s]" % self._unique_name)
00042         else:  # append a unique hex string
00043             # random 16 byte string, alternatively uuid.getnode() returns a hash based
00044             # on the mac address, uuid.uid1() based on localhost and time
00045             key = uuid.uuid4()
00046             self._unique_name = self._param['name'] + key.hex
00047             rospy.loginfo("Gateway : generated unique hash name [%s]" % self._unique_name)
00048         self._hub_manager = hub_manager.HubManager(
00049             hub_whitelist=self._param['hub_whitelist'],
00050             hub_blacklist=self._param['hub_blacklist']
00051         )
00052         # Be careful of the construction sequence here, parts depend on others.
00053         self._gateway_publishers = self._setup_ros_publishers()
00054         # self._publish_gateway_info needs self._gateway_publishers
00055         self._gateway = gateway.Gateway(self._hub_manager, self._param, self._unique_name, self._publish_gateway_info)
00056         self._gateway_services = self._setup_ros_services()  # Needs self._gateway
00057         self._gateway_subscribers = self._setup_ros_subscribers()  # Needs self._gateway
00058         # 'ip:port' : (error_code, error_code_str) dictionary of hubs that this gateway has tried to register,
00059         # but not been permitted (hub is not in whitelist, or is blacklisted)
00060         self._disallowed_hubs = {}
00061         self._disallowed_hubs_error_codes = [gateway_msgs.ErrorCodes.HUB_CONNECTION_NOT_IN_NONEMPTY_WHITELIST,
00062                                              gateway_msgs.ErrorCodes.HUB_CONNECTION_BLACKLISTED,
00063                                              gateway_msgs.ErrorCodes.HUB_NAME_NOT_FOUND,
00064                                              # this now has to be permitted as we will often have zeroconf failing for gateways
00065                                              # that have dropped out of wireless range.
00066                                              # gateway_msgs.ErrorCodes.HUB_CONNECTION_UNRESOLVABLE
00067                                              ]
00068         direct_hub_uri_list = [self._param['hub_uri']] if self._param['hub_uri'] != '' else []
00069         self._hub_discovery_thread = rocon_hub_client.HubDiscovery(
00070             self._register_gateway, direct_hub_uri_list, self._param['disable_zeroconf'], self._disallowed_hubs)
00071 
00072         # Shutdown hooks - allowing external triggers for shutting down
00073         if self._param['external_shutdown']:
00074             rospy.on_shutdown(self._wait_for_shutdown)
00075             unused_shutdown_service = rospy.Service('~shutdown', std_srvs.Empty, self.ros_service_shutdown)
00076 
00077         # Make local gateway information immediately available
00078         self._publish_gateway_info()
00079 
00080     def spin(self):
00081         self._gateway.spin()
00082         if not self._param['external_shutdown']:
00083             self._shutdown()
00084         # else the shutdown hook handles it.
00085 
00086     def _wait_for_shutdown(self):
00087         '''
00088           Shutdown hook - we wait here for an external shutdown via ros service
00089           timing out after a reasonable time if we need to.
00090         '''
00091         if self._param['external_shutdown']:
00092             timeout = self._param['external_shutdown_timeout']
00093             count = 0.0
00094             while count < timeout:
00095                 if self._gateway is None:
00096                     return
00097                 else:
00098                     count += 0.5
00099                     rospy.rostime.wallsleep(0.5)  # human time
00100             rospy.logwarn("Gateway : timed out waiting for external shutdown by ros service, forcing shutdown now.")
00101             self._shutdown()
00102 
00103     def _shutdown(self):
00104         '''
00105           Clears this gateway's information from the redis server.
00106         '''
00107         rospy.loginfo("Gateway : shutting down.")
00108         try:
00109             self._hub_discovery_thread.shutdown()
00110             self._gateway.shutdown()
00111             self._hub_manager.shutdown()
00112             self._gateway = None
00113         except Exception as e:
00114             rospy.logerr("Gateway : unknown error on shutdown [%s][%s]" % (str(e), type(e)))
00115             raise
00116 
00117     ##########################################################################
00118     # Hub Discovery & Connection
00119     ##########################################################################
00120 
00121     def _register_gateway(self, ip, port):
00122         '''
00123           Called when either the hub discovery module finds a hub
00124           or a request to connect via ros service is made.
00125 
00126           It starts the actual redis connection with the hub and also
00127           registers the appropriate information about the gateway on
00128           the hub.
00129 
00130           Note, the return type is only really used by the service callback
00131           (ros_service_connect_hub).
00132 
00133           @return error code and message
00134           @rtype gateway_msgs.ErrorCodes, string
00135 
00136           @sa hub_discovery.HubDiscovery
00137         '''
00138         uri = ip + ':' + str(port)
00139         if uri in self._disallowed_hubs.keys():
00140             # we already tried this one before, quietly return from here.
00141             return self._disallowed_hubs[uri]
00142 
00143         existing_advertisements = self._gateway.public_interface.getConnections()
00144         hub, error_code, error_code_str = \
00145             self._hub_manager.connect_to_hub(
00146                 ip,  # Hub Details
00147                 port,
00148                 self._param['firewall'],  # Gateway Details
00149                 self._unique_name,
00150                 self._disengage_hub,
00151                 self._gateway.ip,
00152                 existing_advertisements
00153             )
00154         if hub:
00155             rospy.loginfo("Gateway : registering on the hub [%s]" % hub.name)
00156             self._publish_gateway_info()
00157         else:
00158             if error_code == gateway_msgs.ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS:
00159                 pass  # be quiet - usually happens if we connect directly, then zeroconf tries.
00160             elif error_code in self._disallowed_hubs_error_codes:
00161                 self._disallowed_hubs[uri] = (error_code, error_code_str)
00162                 rospy.logwarn(
00163                     "Gateway : failed to register gateway with the hub [%s][%s][%s]" % (uri, error_code, error_code_str))
00164             elif error_code == gateway_msgs.ErrorCodes.HUB_CONNECTION_UNRESOLVABLE:
00165                 # be less noisy about this one - it's a normal error when a gateway has moved out of wireless range
00166                 # but we still discover an 'unresolvable' hub on avahi before avahi eventually removes it.
00167                 rospy.logdebug(
00168                     "Gateway : failed to register gateway with the hub [%s][%s][%s]" % (uri, error_code, error_code_str))
00169             else:
00170                 rospy.logwarn(
00171                     "Gateway : caught an unknown error trying register gateway with the hub [%s][%s][%s]" %
00172                     (uri, error_code, error_code_str))
00173         return error_code, error_code_str
00174 
00175     def _disengage_hub(self, hub):
00176         '''
00177           Called whenever gateway_hub detects the connection to the hub has been
00178           lost.
00179 
00180           This function informs the hub discovery thread that the hub was lost,
00181           which allows the hub_discovery thread to start looking for the hub.
00182 
00183           @param hub: hub to be disengaged
00184           @type GatewayHub
00185         '''
00186 
00187         self._hub_discovery_thread.disengage_hub(hub)
00188         try:
00189             self._gateway.disengage_hub(hub)
00190         except AttributeError:
00191             # caused when self_gateway is NoneType object (can happen as we pass this
00192             # callback in without guaranteeing we have a gateway object yet)
00193             pass
00194 
00195     ##########################################################################
00196     # Ros Pubs, Subs and Services
00197     ##########################################################################
00198 
00199     def _setup_ros_services(self):
00200         gateway_services = {}
00201         gateway_services['connect_hub'] = rospy.Service(
00202             '~connect_hub', gateway_srvs.ConnectHub, self.ros_service_connect_hub)  # @IgnorePep8
00203         gateway_services['remote_gateway_info'] = rospy.Service(
00204             '~remote_gateway_info', gateway_srvs.RemoteGatewayInfo, self.ros_service_remote_gateway_info)  # @IgnorePep8
00205         gateway_services['advertise'] = rospy.Service(
00206             '~advertise', gateway_srvs.Advertise, self._gateway.ros_service_advertise)  # @IgnorePep8
00207         gateway_services['advertise_all'] = rospy.Service(
00208             '~advertise_all', gateway_srvs.AdvertiseAll, self._gateway.ros_service_advertise_all)  # @IgnorePep8
00209         gateway_services['flip'] = rospy.Service(
00210             '~flip', gateway_srvs.Remote, self._gateway.ros_service_flip)  # @IgnorePep8
00211         gateway_services['flip_all'] = rospy.Service(
00212             '~flip_all', gateway_srvs.RemoteAll, self._gateway.ros_service_flip_all)  # @IgnorePep8
00213         gateway_services['pull'] = rospy.Service(
00214             '~pull', gateway_srvs.Remote, self._gateway.ros_service_pull)  # @IgnorePep8
00215         gateway_services['pull_all'] = rospy.Service(
00216             '~pull_all', gateway_srvs.RemoteAll, self._gateway.ros_service_pull_all)  # @IgnorePep8
00217         gateway_services['set_watcher_period'] = rospy.Service(
00218             '~set_watcher_period',
00219             gateway_srvs.SetWatcherPeriod,
00220             self._gateway.ros_service_set_watcher_period)  # @IgnorePep8
00221         return gateway_services
00222 
00223     def _setup_ros_publishers(self):
00224         gateway_publishers = {}
00225         gateway_publishers['gateway_info'] = rospy.Publisher('~gateway_info', gateway_msgs.GatewayInfo, latch=True, queue_size=5)
00226         return gateway_publishers
00227 
00228     def _setup_ros_subscribers(self):
00229         gateway_subscribers = {}
00230         gateway_subscribers['force_update'] = rospy.Subscriber(
00231             '~force_update', std_msgs.Empty, self._gateway.ros_subscriber_force_update)
00232         return gateway_subscribers
00233 
00234     ##########################################################################
00235     # Ros Service Callbacks
00236     ##########################################################################
00237 
00238     def ros_service_shutdown(self, unused_request):
00239         self._shutdown()
00240         return std_srvs.EmptyResponse()
00241 
00242     def ros_service_connect_hub(self, request):
00243         '''
00244           Handle incoming requests to connect directly to a gateway hub.
00245 
00246           Requests are of the form of a uri (hostname:port pair) pointing to
00247           the gateway hub.
00248         '''
00249         response = gateway_srvs.ConnectHubResponse()
00250         o = urlparse(request.uri)
00251         response.result, response.error_message = self._register_gateway(o.hostname, o.port)
00252         # Some ros logging
00253         if response.result == gateway_msgs.ErrorCodes.SUCCESS:
00254             rospy.loginfo("Gateway : made direct connection to hub [%s]" % request.uri)
00255         return response
00256 
00257     def _publish_gateway_info(self):
00258         if self._gateway is None:
00259             return  # not everything initialised yet
00260         try:
00261             gateway_info = gateway_msgs.GatewayInfo()
00262             gateway_info.name = self._unique_name
00263             gateway_info.ip = self._gateway.ip
00264             gateway_info.connected = self._gateway.is_connected()
00265             gateway_info.hub_names = []
00266             gateway_info.hub_uris = []
00267             for hub in self._hub_manager.hubs:
00268                 gateway_info.hub_names.append(hub.name)
00269                 gateway_info.hub_uris.append(hub.uri)
00270             gateway_info.firewall = self._param['firewall']
00271             gateway_info.flipped_connections = self._gateway.flipped_interface.get_flipped_connections()
00272             gateway_info.flipped_in_connections = self._gateway.flipped_interface.getLocalRegistrations()
00273             gateway_info.flip_watchlist = self._gateway.flipped_interface.getWatchlist()
00274             gateway_info.pulled_connections = self._gateway.pulled_interface.getLocalRegistrations()
00275             gateway_info.pull_watchlist = self._gateway.pulled_interface.getWatchlist()
00276             gateway_info.public_watchlist = self._gateway.public_interface.getWatchlist()
00277             gateway_info.public_interface = self._gateway.public_interface.getInterface()
00278             self._gateway_publishers['gateway_info'].publish(gateway_info)
00279         except AttributeError:
00280             pass  # occurs if self._gateway is reset to None in the middle of all this.
00281 
00282     def ros_service_remote_gateway_info(self, request):
00283         '''
00284           Sends out to the hubs to get the remote gateway information for either the specified,
00285           or the known list of remote gateways.
00286 
00287           :todo: can we optimise this so that hub requests go as a group?
00288         '''
00289         response = gateway_srvs.RemoteGatewayInfoResponse()
00290         requested_gateways = request.gateways if request.gateways else self._hub_manager.list_remote_gateway_names()
00291         for gateway in list(set(requested_gateways)):
00292             remote_gateway_info = self._hub_manager.remote_gateway_info(gateway)
00293             if remote_gateway_info:
00294                 response.gateways.append(remote_gateway_info)
00295             else:
00296                 rospy.logwarn("Gateway : requested gateway info for unavailable gateway [%s]" % gateway)
00297         return response


rocon_gateway
Author(s): Daniel Stonier , Jihoon Lee , Piyush Khandelwal
autogenerated on Sat Jun 8 2019 18:48:44