gateway_node.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
5 #
6 ##############################################################################
7 # Imports
8 ##############################################################################
9 
10 import rospy
11 import rocon_gateway
12 import uuid
13 import gateway_msgs.msg as gateway_msgs
14 import gateway_msgs.srv as gateway_srvs
15 import std_msgs.msg as std_msgs
16 import std_srvs.srv as std_srvs
17 from urlparse import urlparse
18 import rocon_hub_client
19 
20 from . import gateway
21 from . import hub_manager
22 
23 ##############################################################################
24 # Gateway Configuration and Main Loop Class
25 ##############################################################################
26 
27 
28 class GatewayNode():
29 
30  """
31  Currently this just provides getup and go for the gateway.
32  """
33  ##########################################################################
34  # Init & Shutdown
35  ##########################################################################
36 
37  def __init__(self):
38  self._param = rocon_gateway.setup_ros_parameters()
39  if self._param['disable_uuids']:
40  self._unique_name = self._param['name']
41  rospy.logwarn("Gateway : uuid's disabled, using possibly non-unique name [%s]" % self._unique_name)
42  else: # append a unique hex string
43  # random 16 byte string, alternatively uuid.getnode() returns a hash based
44  # on the mac address, uuid.uid1() based on localhost and time
45  key = uuid.uuid4()
46  self._unique_name = self._param['name'] + key.hex
47  rospy.loginfo("Gateway : generated unique hash name [%s]" % self._unique_name)
48  self._disallowed_hubs = {}
49  self._disallowed_hubs_error_codes = [gateway_msgs.ErrorCodes.HUB_CONNECTION_NOT_IN_NONEMPTY_WHITELIST,
50  gateway_msgs.ErrorCodes.HUB_CONNECTION_BLACKLISTED,
51  gateway_msgs.ErrorCodes.HUB_NAME_NOT_FOUND,
52  # this now has to be permitted as we will often have zeroconf failing for gateways
53  # that have dropped out of wireless range.
54  # gateway_msgs.ErrorCodes.HUB_CONNECTION_UNRESOLVABLE
55  ]
57  hub_whitelist=self._param['hub_whitelist'],
58  hub_blacklist=self._param['hub_blacklist']
59  )
60  # Be careful of the construction sequence here, parts depend on others.
62  # self._publish_gateway_info needs self._gateway_publishers
64  self._gateway_services = self._setup_ros_services() # Needs self._gateway
65  self._gateway_subscribers = self._setup_ros_subscribers() # Needs self._gateway
66  # 'ip:port' : (error_code, error_code_str) dictionary of hubs that this gateway has tried to register,
67  # but not been permitted (hub is not in whitelist, or is blacklisted)
68  direct_hub_uri_list = [self._param['hub_uri']] if self._param['hub_uri'] != '' else []
69  self._hub_discovery_thread = rocon_hub_client.HubDiscovery(
70  self._hub_ensure_connection, direct_hub_uri_list, self._param['disable_zeroconf'], self._disallowed_hubs)
71 
72  # Make local gateway information immediately available
74 
75  def spin(self):
76  self._gateway.spin()
77  self._shutdown()
78  # else the shutdown hook handles it.
79 
80  def _shutdown(self):
81  '''
82  Clears this gateway's information from the redis server.
83  '''
84  rospy.loginfo("Gateway : shutting down.")
85  try:
86  # We shouldn't do anything network related here.
87  # To be stable against network outages, the shutdown flow and the "crash/network disconnected" flow
88  # should be as close as possible.
89 
90  # we still need this to cleanup threads locally
91  self._hub_discovery_thread.shutdown()
92 
93  self._gateway = None
94  except Exception as e:
95  rospy.logerr("Gateway : unknown error on shutdown [%s][%s]" % (str(e), type(e)))
96  raise
97 
98  ##########################################################################
99  # Hub Discovery & Connection
100  ##########################################################################
101 
102  def _hub_ensure_connection(self, ip, port):
103  '''
104  Called when the hub discovery can ping a hub
105 
106  :param ip:
107  :param port:
108  :return:
109  '''
110 
111  uri = ip + ':' + str(port)
112  if uri in self._disallowed_hubs.keys():
113  # we already tried this one before, quietly return from here.
114  return self._disallowed_hubs[uri]
115 
116  hub, error_code, error_code_str = self._hub_manager.is_connected_to_hub(ip, port)
117  if error_code == gateway_msgs.ErrorCodes.NO_HUB_CONNECTION:
118  error_code, error_code_str = self._register_gateway(hub)
119 
120  if error_code == gateway_msgs.ErrorCodes.SUCCESS: # everything should be fine...
121  pass
122  elif error_code == gateway_msgs.ErrorCodes.HUB_CONNECTION_ALREADY_EXISTS: # everything should also be fine...
123  pass
124  elif error_code in self._disallowed_hubs_error_codes:
125  self._disallowed_hubs[uri] = (error_code, error_code_str)
126  rospy.logwarn(
127  "Gateway : failed to register gateway with the hub [%s][%s][%s]" % (uri, error_code, error_code_str))
128  elif error_code == gateway_msgs.ErrorCodes.HUB_CONNECTION_FAILED:
129  rospy.logwarn(
130  "Gateway : failed to connect to the hub [%s][%s][%s]" % (uri, error_code, error_code_str))
131  elif error_code == gateway_msgs.ErrorCodes.HUB_CONNECTION_UNRESOLVABLE:
132  # be less noisy about this one - it's a normal error when a gateway has moved out of wireless range
133  # but we still discover an 'unresolvable' hub on avahi before avahi eventually removes it.
134  rospy.logdebug(
135  "Gateway : failed to register gateway with the hub [%s][%s][%s]" % (uri, error_code, error_code_str))
136  else:
137  rospy.logwarn(
138  "Gateway : Unknown error code when insuring gateway connection with the hub [%s][%s][%s]" %
139  (uri, error_code, error_code_str))
140 
141  return error_code, error_code_str
142 
143  def _register_gateway(self, hub):
144  '''
145  Called when either the hub discovery module finds a hub
146  or a request to connect via ros service is made.
147 
148  It starts the actual redis connection with the hub and also
149  registers the appropriate information about the gateway on
150  the hub.
151 
152  Note, the return type is only really used by the service callback
153  (ros_service_connect_hub).
154 
155  @return error code and message
156  @rtype gateway_msgs.ErrorCodes, string
157 
158  @sa hub_discovery.HubDiscovery
159  '''
160 
161  existing_advertisements = self._gateway.public_interface.getConnections()
162  hub, error_code, error_code_str = \
163  self._hub_manager.connect_to_hub(
164  hub, # Hub Details
165  self._param['firewall'], # Gateway Details
166  self._unique_name,
167  self._disengage_hub,
168  self._gateway.ip,
169  existing_advertisements
170  )
171  if hub:
172  rospy.loginfo("Gateway : registering on the hub [%s]" % hub.name)
173  self._publish_gateway_info()
174 
175  return error_code, error_code_str
176 
177  def _disengage_hub(self, hub):
178  """
179  Called whenever gateway_hub detects the connection to the hub has been
180  lost.
181 
182  This function informs the hub discovery thread that the hub was lost,
183  which allows the hub_discovery thread to start looking for the hub.
184 
185  @param hub: hub to be disengaged
186  @type GatewayHub
187  """
188 
189  self._hub_discovery_thread.disengage_hub(hub)
190  try:
191  self._gateway.disengage_hub(hub)
192  except AttributeError:
193  # caused when self_gateway is NoneType object (can happen as we pass this
194  # callback in without guaranteeing we have a gateway object yet)
195  pass
196 
197  ##########################################################################
198  # Ros Pubs, Subs and Services
199  ##########################################################################
200 
202  gateway_services = {}
203  gateway_services['connect_hub'] = rospy.Service(
204  '~connect_hub', gateway_srvs.ConnectHub, self.ros_service_connect_hub) # @IgnorePep8
205  gateway_services['remote_gateway_info'] = rospy.Service(
206  '~remote_gateway_info', gateway_srvs.RemoteGatewayInfo, self.ros_service_remote_gateway_info) # @IgnorePep8
207  gateway_services['advertise'] = rospy.Service(
208  '~advertise', gateway_srvs.Advertise, self._gateway.ros_service_advertise) # @IgnorePep8
209  gateway_services['advertise_all'] = rospy.Service(
210  '~advertise_all', gateway_srvs.AdvertiseAll, self._gateway.ros_service_advertise_all) # @IgnorePep8
211  gateway_services['flip'] = rospy.Service(
212  '~flip', gateway_srvs.Remote, self._gateway.ros_service_flip) # @IgnorePep8
213  gateway_services['flip_all'] = rospy.Service(
214  '~flip_all', gateway_srvs.RemoteAll, self._gateway.ros_service_flip_all) # @IgnorePep8
215  gateway_services['pull'] = rospy.Service(
216  '~pull', gateway_srvs.Remote, self._gateway.ros_service_pull) # @IgnorePep8
217  gateway_services['pull_all'] = rospy.Service(
218  '~pull_all', gateway_srvs.RemoteAll, self._gateway.ros_service_pull_all) # @IgnorePep8
219  #gateway_services['set_watcher_period'] = rospy.Service(
220  # '~set_watcher_period',
221  # gateway_srvs.SetWatcherPeriod,
222  # self._gateway.ros_service_set_watcher_period) # @IgnorePep8
223  return gateway_services
224 
226  gateway_publishers = {}
227  gateway_publishers['gateway_info'] = rospy.Publisher('~gateway_info', gateway_msgs.GatewayInfo, latch=True, queue_size=5)
228  return gateway_publishers
229 
231  gateway_subscribers = {}
232  #gateway_subscribers['force_update'] = rospy.Subscriber(
233  # '~force_update', std_msgs.Empty, self._gateway.ros_subscriber_force_update)
234  return gateway_subscribers
235 
236  ##########################################################################
237  # Ros Service Callbacks
238  ##########################################################################
239 
240  def ros_service_connect_hub(self, request):
241  """
242  Handle incoming requests to connect directly to a gateway hub.
243 
244  Requests are of the form of a uri (hostname:port pair) pointing to
245  the gateway hub.
246  """
247  response = gateway_srvs.ConnectHubResponse()
248  o = urlparse(request.uri)
249  response.result, response.error_message = self._hub_ensure_connection(o.hostname, o.port)
250  # Some ros logging
251  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
252  rospy.loginfo("Gateway : made direct connection to hub [%s]" % request.uri)
253  return response
254 
256  if self._gateway is None:
257  return # not everything initialised yet
258  try:
259  gateway_info = gateway_msgs.GatewayInfo()
260  gateway_info.name = self._unique_name
261  gateway_info.ip = self._gateway.ip
262  gateway_info.connected = self._gateway.is_connected()
263  gateway_info.hub_names = []
264  gateway_info.hub_uris = []
265  for hub in self._hub_manager.hubs:
266  gateway_info.hub_names.append(hub.name)
267  gateway_info.hub_uris.append(hub.uri)
268  gateway_info.firewall = self._param['firewall']
269  gateway_info.flipped_connections = self._gateway.flipped_interface.get_flipped_connections()
270  gateway_info.flipped_in_connections = self._gateway.flipped_interface.getLocalRegistrations()
271  gateway_info.flip_watchlist = self._gateway.flipped_interface.getWatchlist()
272  gateway_info.pulled_connections = self._gateway.pulled_interface.getLocalRegistrations()
273  gateway_info.pull_watchlist = self._gateway.pulled_interface.getWatchlist()
274  gateway_info.public_watchlist = self._gateway.public_interface.getWatchlist()
275  gateway_info.public_interface = self._gateway.public_interface.getInterface()
276  self._gateway_publishers['gateway_info'].publish(gateway_info)
277  except AttributeError:
278  pass # occurs if self._gateway is reset to None in the middle of all this.
279 
281  """
282  Sends out to the hubs to get the remote gateway information for either the specified,
283  or the known list of remote gateways.
284 
285  :todo: can we optimise this so that hub requests go as a group?
286  """
287  response = gateway_srvs.RemoteGatewayInfoResponse()
288  requested_gateways = request.gateways if request.gateways else self._hub_manager.list_remote_gateway_names()
289  for gateway in list(set(requested_gateways)):
290  remote_gateway_info = self._hub_manager.remote_gateway_info(gateway)
291  if remote_gateway_info:
292  response.gateways.append(remote_gateway_info)
293  else:
294  rospy.logwarn("Gateway : requested gateway info for unavailable gateway [%s]" % gateway)
295  return response
def _hub_ensure_connection(self, ip, port)
Hub Discovery & Connection.
Gateway Configuration and Main Loop Class.
Definition: gateway_node.py:28
def __init__(self)
Init & Shutdown.
Definition: gateway_node.py:37
def _setup_ros_services(self)
Ros Pubs, Subs and Services.
def ros_service_remote_gateway_info(self, request)
def ros_service_connect_hub(self, request)
Ros Service Callbacks.


rocon_gateway
Author(s): Daniel Stonier , Jihoon Lee , Piyush Khandelwal
autogenerated on Mon Jun 10 2019 14:40:10