00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import rospy
00011 import rocon_gateway
00012 import uuid
00013 import gateway_msgs.msg as gateway_msgs
00014 import gateway_msgs.srv as gateway_srvs
00015 import std_msgs.msg as std_msgs
00016 import std_srvs.srv as std_srvs
00017 from urlparse import urlparse
00018 import rocon_hub_client
00019
00020 from . import gateway
00021 from . import hub_manager
00022
00023
00024
00025
00026
00027
00028 class GatewayNode():
00029
00030 '''
00031 Currently this just provides getup and go for the gateway.
00032 '''
00033
00034
00035
00036
00037 def __init__(self):
00038 self._param = rocon_gateway.setup_ros_parameters()
00039 if self._param['disable_uuids']:
00040 self._unique_name = self._param['name']
00041 rospy.logwarn("Gateway : uuid's disabled, using possibly non-unique name [%s]" % self._unique_name)
00042 else:
00043
00044
00045 key = uuid.uuid4()
00046 self._unique_name = self._param['name'] + key.hex
00047 rospy.loginfo("Gateway : generated unique hash name [%s]" % self._unique_name)
00048 self._hub_manager = hub_manager.HubManager(
00049 hub_whitelist=self._param['hub_whitelist'],
00050 hub_blacklist=self._param['hub_blacklist']
00051 )
00052
00053 self._gateway_publishers = self._setup_ros_publishers()
00054
00055 self._gateway = gateway.Gateway(self._hub_manager, self._param, self._unique_name, self._publish_gateway_info)
00056 self._gateway_services = self._setup_ros_services()
00057 self._gateway_subscribers = self._setup_ros_subscribers()
00058
00059
00060 self._disallowed_hubs = {}
00061 self._disallowed_hubs_error_codes = [gateway_msgs.ErrorCodes.HUB_CONNECTION_NOT_IN_NONEMPTY_WHITELIST,
00062 gateway_msgs.ErrorCodes.HUB_CONNECTION_BLACKLISTED,
00063 gateway_msgs.ErrorCodes.HUB_NAME_NOT_FOUND,
00064
00065
00066
00067 ]
00068 direct_hub_uri_list = [self._param['hub_uri']] if self._param['hub_uri'] != '' else []
00069 self._hub_discovery_thread = rocon_hub_client.HubDiscovery(
00070 self._register_gateway, direct_hub_uri_list, self._param['disable_zeroconf'], self._disallowed_hubs)
00071
00072
00073 if self._param['external_shutdown']:
00074 rospy.on_shutdown(self._wait_for_shutdown)
00075 unused_shutdown_service = rospy.Service('~shutdown', std_srvs.Empty, self.ros_service_shutdown)
00076
00077
00078 self._publish_gateway_info()
00079
00080 def spin(self):
00081 self._gateway.spin()
00082 if not self._param['external_shutdown']:
00083 self._shutdown()
00084
00085
00086 def _wait_for_shutdown(self):
00087 '''
00088 Shutdown hook - we wait here for an external shutdown via ros service
00089 timing out after a reasonable time if we need to.
00090 '''
00091 if self._param['external_shutdown']:
00092 timeout = self._param['external_shutdown_timeout']
00093 count = 0.0
00094 while count < timeout:
00095 if self._gateway is None:
00096 return
00097 else:
00098 count += 0.5
00099 rospy.rostime.wallsleep(0.5)
00100 rospy.logwarn("Gateway : timed out waiting for external shutdown by ros service, forcing shutdown now.")
00101 self._shutdown()
00102
00103 def _shutdown(self):
00104 '''
00105 Clears this gateway's information from the redis server.
00106 '''
00107 rospy.loginfo("Gateway : shutting down.")
00108 try:
00109 self._hub_discovery_thread.shutdown()
00110 self._gateway.shutdown()
00111 self._hub_manager.shutdown()
00112 self._gateway = None
00113 except Exception as e:
00114 rospy.logerr("Gateway : unknown error on shutdown [%s][%s]" % (str(e), type(e)))
00115 raise
00116
00117
00118
00119
00120
00121 def _register_gateway(self, ip, port):
00122 '''
00123 Called when either the hub discovery module finds a hub
00124 or a request to connect via ros service is made.
00125
00126 It starts the actual redis connection with the hub and also
00127 registers the appropriate information about the gateway on
00128 the hub.
00129
00130 Note, the return type is only really used by the service callback
00131 (ros_service_connect_hub).
00132
00133 @return error code and message
00134 @rtype gateway_msgs.ErrorCodes, string
00135
00136 @sa hub_discovery.HubDiscovery
00137 '''
00138 uri = ip + ':' + str(port)
00139 if uri in self._disallowed_hubs.keys():
00140
00141 return self._disallowed_hubs[uri]
00142
00143 existing_advertisements = self._gateway.public_interface.getConnections()
00144 hub, error_code, error_code_str = \
00145 self._hub_manager.connect_to_hub(
00146 ip,
00147 port,
00148 self._param['firewall'],
00149 self._unique_name,
00150 self._disengage_hub,
00151 self._gateway.ip,
00152 existing_advertisements
00153 )
00154 if hub:
00155 rospy.loginfo("Gateway : registering on the hub [%s]" % hub.name)
00156 self._publish_gateway_info()
00157 else:
00158 if error_code == gateway_msgs.ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS:
00159 pass
00160 elif error_code in self._disallowed_hubs_error_codes:
00161 self._disallowed_hubs[uri] = (error_code, error_code_str)
00162 rospy.logwarn(
00163 "Gateway : failed to register gateway with the hub [%s][%s][%s]" % (uri, error_code, error_code_str))
00164 elif error_code == gateway_msgs.ErrorCodes.HUB_CONNECTION_UNRESOLVABLE:
00165
00166
00167 rospy.logdebug(
00168 "Gateway : failed to register gateway with the hub [%s][%s][%s]" % (uri, error_code, error_code_str))
00169 else:
00170 rospy.logwarn(
00171 "Gateway : caught an unknown error trying register gateway with the hub [%s][%s][%s]" %
00172 (uri, error_code, error_code_str))
00173 return error_code, error_code_str
00174
00175 def _disengage_hub(self, hub):
00176 '''
00177 Called whenever gateway_hub detects the connection to the hub has been
00178 lost.
00179
00180 This function informs the hub discovery thread that the hub was lost,
00181 which allows the hub_discovery thread to start looking for the hub.
00182
00183 @param hub: hub to be disengaged
00184 @type GatewayHub
00185 '''
00186
00187 self._hub_discovery_thread.disengage_hub(hub)
00188 try:
00189 self._gateway.disengage_hub(hub)
00190 except AttributeError:
00191
00192
00193 pass
00194
00195
00196
00197
00198
00199 def _setup_ros_services(self):
00200 gateway_services = {}
00201 gateway_services['connect_hub'] = rospy.Service(
00202 '~connect_hub', gateway_srvs.ConnectHub, self.ros_service_connect_hub)
00203 gateway_services['remote_gateway_info'] = rospy.Service(
00204 '~remote_gateway_info', gateway_srvs.RemoteGatewayInfo, self.ros_service_remote_gateway_info)
00205 gateway_services['advertise'] = rospy.Service(
00206 '~advertise', gateway_srvs.Advertise, self._gateway.ros_service_advertise)
00207 gateway_services['advertise_all'] = rospy.Service(
00208 '~advertise_all', gateway_srvs.AdvertiseAll, self._gateway.ros_service_advertise_all)
00209 gateway_services['flip'] = rospy.Service(
00210 '~flip', gateway_srvs.Remote, self._gateway.ros_service_flip)
00211 gateway_services['flip_all'] = rospy.Service(
00212 '~flip_all', gateway_srvs.RemoteAll, self._gateway.ros_service_flip_all)
00213 gateway_services['pull'] = rospy.Service(
00214 '~pull', gateway_srvs.Remote, self._gateway.ros_service_pull)
00215 gateway_services['pull_all'] = rospy.Service(
00216 '~pull_all', gateway_srvs.RemoteAll, self._gateway.ros_service_pull_all)
00217 gateway_services['set_watcher_period'] = rospy.Service(
00218 '~set_watcher_period',
00219 gateway_srvs.SetWatcherPeriod,
00220 self._gateway.ros_service_set_watcher_period)
00221 return gateway_services
00222
00223 def _setup_ros_publishers(self):
00224 gateway_publishers = {}
00225 gateway_publishers['gateway_info'] = rospy.Publisher('~gateway_info', gateway_msgs.GatewayInfo, latch=True, queue_size=5)
00226 return gateway_publishers
00227
00228 def _setup_ros_subscribers(self):
00229 gateway_subscribers = {}
00230 gateway_subscribers['force_update'] = rospy.Subscriber(
00231 '~force_update', std_msgs.Empty, self._gateway.ros_subscriber_force_update)
00232 return gateway_subscribers
00233
00234
00235
00236
00237
00238 def ros_service_shutdown(self, unused_request):
00239 self._shutdown()
00240 return std_srvs.EmptyResponse()
00241
00242 def ros_service_connect_hub(self, request):
00243 '''
00244 Handle incoming requests to connect directly to a gateway hub.
00245
00246 Requests are of the form of a uri (hostname:port pair) pointing to
00247 the gateway hub.
00248 '''
00249 response = gateway_srvs.ConnectHubResponse()
00250 o = urlparse(request.uri)
00251 response.result, response.error_message = self._register_gateway(o.hostname, o.port)
00252
00253 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
00254 rospy.loginfo("Gateway : made direct connection to hub [%s]" % request.uri)
00255 return response
00256
00257 def _publish_gateway_info(self):
00258 if self._gateway is None:
00259 return
00260 try:
00261 gateway_info = gateway_msgs.GatewayInfo()
00262 gateway_info.name = self._unique_name
00263 gateway_info.ip = self._gateway.ip
00264 gateway_info.connected = self._gateway.is_connected()
00265 gateway_info.hub_names = []
00266 gateway_info.hub_uris = []
00267 for hub in self._hub_manager.hubs:
00268 gateway_info.hub_names.append(hub.name)
00269 gateway_info.hub_uris.append(hub.uri)
00270 gateway_info.firewall = self._param['firewall']
00271 gateway_info.flipped_connections = self._gateway.flipped_interface.get_flipped_connections()
00272 gateway_info.flipped_in_connections = self._gateway.flipped_interface.getLocalRegistrations()
00273 gateway_info.flip_watchlist = self._gateway.flipped_interface.getWatchlist()
00274 gateway_info.pulled_connections = self._gateway.pulled_interface.getLocalRegistrations()
00275 gateway_info.pull_watchlist = self._gateway.pulled_interface.getWatchlist()
00276 gateway_info.public_watchlist = self._gateway.public_interface.getWatchlist()
00277 gateway_info.public_interface = self._gateway.public_interface.getInterface()
00278 self._gateway_publishers['gateway_info'].publish(gateway_info)
00279 except AttributeError:
00280 pass
00281
00282 def ros_service_remote_gateway_info(self, request):
00283 '''
00284 Sends out to the hubs to get the remote gateway information for either the specified,
00285 or the known list of remote gateways.
00286
00287 :todo: can we optimise this so that hub requests go as a group?
00288 '''
00289 response = gateway_srvs.RemoteGatewayInfoResponse()
00290 requested_gateways = request.gateways if request.gateways else self._hub_manager.list_remote_gateway_names()
00291 for gateway in list(set(requested_gateways)):
00292 remote_gateway_info = self._hub_manager.remote_gateway_info(gateway)
00293 if remote_gateway_info:
00294 response.gateways.append(remote_gateway_info)
00295 else:
00296 rospy.logwarn("Gateway : requested gateway info for unavailable gateway [%s]" % gateway)
00297 return response