src/rocon_gateway/gateway.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
5 #
6 
7 ###############################################################################
8 # Imports
9 ###############################################################################
10 
11 import copy
12 import os
13 import threading
14 
15 import rospy
16 import gateway_msgs.msg as gateway_msgs
17 import gateway_msgs.srv as gateway_srvs
18 import rocon_python_comms
19 
20 from gateway_msgs.msg import RemoteRuleWithStatus as FlipStatus
21 
22 from . import utils
23 from . import ros_parameters
24 #from .watcher_thread import WatcherThread
25 from .flipped_interface import FlippedInterface
26 from .public_interface import PublicInterface
27 from .pulled_interface import PulledInterface
28 from .master_api import LocalMaster
29 from .network_interface_manager import NetworkInterfaceManager
30 
31 ###############################################################################
32 # Thread
33 ###############################################################################
34 
35 
36 class Gateway(object):
37 
38  '''
39  Used to synchronise with hubs.
40  '''
41 
42  def __init__(self, hub_manager, param, unique_name, publish_gateway_info_callback):
43  '''
44  @param hub_manager : container for all the hubs this gateway connects to
45  @type hub_api.HubManmager
46 
47  @param param : parameters set by ros_parameters.py
48  @type : dictionary of parameter key-value pairs
49 
50  @param unique_name : gateway name (param['name']) with unique uuid hash appended
51 
52  @param publish_gateway_info_callback : callback for publishing gateway info
53  '''
54  self.hub_manager = hub_manager
55  self.master = None
56  # handling slow startup timeout
57  while self.master is None:
58  try:
59  self.master = LocalMaster()
60  except rocon_python_comms.NotFoundException as exc:
61  rospy.logwarn(str(exc))
62  rospy.logwarn("Cannot create Gateway's LocalMaster. Retrying...")
63  self.master = None
64  rospy.rostime.wallsleep(1)
65 
66  self.ip = self.master.get_ros_ip() # gateway is always assumed to sit on the same ip as the master
67  self._param = param
68  self._unique_name = unique_name
69  self._publish_gateway_info = publish_gateway_info_callback
70  default_rule_blacklist = ros_parameters.generate_rules(self._param["default_blacklist"])
71  default_rules, all_targets = ros_parameters.generate_remote_rules(self._param["default_flips"])
72  self.flipped_interface = FlippedInterface(
73  firewall=self._param['firewall'],
74  default_rule_blacklist=default_rule_blacklist,
75  default_rules=default_rules,
76  all_targets=all_targets)
77  default_rules, all_targets = ros_parameters.generate_remote_rules(self._param["default_pulls"])
78  self.pulled_interface = PulledInterface(default_rule_blacklist=default_rule_blacklist,
79  default_rules=default_rules,
80  all_targets=all_targets)
81  self.public_interface = PublicInterface(
82  default_rule_blacklist=default_rule_blacklist,
83  default_rules=ros_parameters.generate_rules(self._param['default_advertisements']))
84  if self._param['advertise_all']:
85  # no extra blacklist beyond the default (keeping it simple in yaml for now)
86  self.public_interface.advertise_all([])
87 
88  self.network_interface_manager = NetworkInterfaceManager(self._param['network_interface'])
89  # TODO : Use self._param['watch_loop_period'] to set the connection_cache spin freq ( OR directly in connection_cache node ) ?
90 
91  def spin(self):
92  if not rospy.core.is_initialized():
93  raise rospy.exceptions.ROSInitException("client code must call rospy.init_node() first")
94  rospy.logdebug("node[%s, %s] entering spin(), pid[%s]", rospy.core.get_caller_id(), rospy.core.get_node_uri(), os.getpid())
95  try:
96  while not rospy.core.is_shutdown():
98  remote_gateway_hub_index = self.hub_manager.create_remote_gateway_hub_index()
99 
100  with self.master.get_connection_state() as connections:
101  self.update_flipped_interface(connections, remote_gateway_hub_index)
102  self.update_public_interface(connections)
103  self.update_pulled_interface(connections, remote_gateway_hub_index)
104 
105  registrations = self.hub_manager.get_flip_requests()
106  self.update_flipped_in_interface(registrations, remote_gateway_hub_index)
107  rospy.rostime.wallsleep(1)
108  except KeyboardInterrupt:
109  rospy.logdebug("keyboard interrupt, shutting down")
110  rospy.core.signal_shutdown('keyboard interrupt')
111 
112  def is_connected(self):
113  '''
114  We often check if we're connected to any hubs often just to ensure we
115  don't waste time processing if there is no-one listening.
116 
117  @return True if at least one hub is connected, False otherwise
118  @rtype Bool
119  '''
120  return self.hub_manager.is_connected()
121 
122  def disengage_hub(self, hub):
123  '''
124  Disengage from the specified hub. Don't actually need to clean up connections
125  here like we do in shutdown - that can be handled from the watcher thread itself.
126 
127  @param hub : the hub that will be deleted.
128  '''
129  self.hub_manager.disengage_hub(hub)
130  self._publish_gateway_info()
131 
132  ###############################################################################
133  # Update interface states (jobs assigned from connection_cache callback thread)
134  ###############################################################################
135 
136  def update_flipped_interface(self, local_connection_index, remote_gateway_hub_index):
137  """
138  Process the list of local connections and check against
139  the current flip rules and patterns for changes. If a rule
140  has become (un)available take appropriate action.
141 
142  @param local_connection_index : list of current local connections
143  @type : dictionary of ConnectionType.xxx keyed sets of utils.Connections
144 
145  @param gateways : list of remote gateway string id's
146  @type string
147  """
148  state_changed = False
149 
150  # Get flip status of existing requests, and remove those requests that need to be resent
151  flipped_connections = self.flipped_interface.get_flipped_connections()
152  for flip in flipped_connections:
153  if flip.remote_rule.gateway in remote_gateway_hub_index:
154  for hub in remote_gateway_hub_index[flip.remote_rule.gateway]:
155  status = hub.get_flip_request_status(flip.remote_rule)
156  if status == FlipStatus.RESEND:
157  rospy.loginfo("Gateway : resend requested for flip request [%s]%s" %
158  (flip.remote_rule.gateway, utils.format_rule(flip.remote_rule.rule)))
159  # Remove the flip, so that it will be resent as part of new_flips
160  self.flipped_interface.remove_flip(flip.remote_rule)
161  hub.send_unflip_request(flip.remote_rule.gateway, flip.remote_rule.rule)
162  hub.remove_flip_details(flip.remote_rule.gateway,
163  flip.remote_rule.rule.name,
164  flip.remote_rule.rule.type,
165  flip.remote_rule.rule.node)
166  break
167 
168  new_flips, lost_flips = self.flipped_interface.update(
169  local_connection_index, remote_gateway_hub_index, self._unique_name, self.master)
170  for connection_type in utils.connection_types:
171  for flip in new_flips[connection_type]:
172  firewall_flag = self.hub_manager.get_remote_gateway_firewall_flag(flip.gateway)
173  if firewall_flag:
174  continue
175  state_changed = True
176  # for actions, need to post flip details here
177  connections = self.master.generate_connection_details(flip.rule.type, flip.rule.name, flip.rule.node)
178  if (connection_type == gateway_msgs.ConnectionType.ACTION_CLIENT or
179  connection_type == gateway_msgs.ConnectionType.ACTION_SERVER):
180  rospy.loginfo("Gateway : sending flip request [%s]%s" %
181  (flip.gateway, utils.format_rule(flip.rule)))
182  hub = remote_gateway_hub_index[flip.gateway][0]
183  hub.post_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
184  for connection in connections:
185  hub.send_flip_request(flip.gateway, connection) # flip the individual pubs/subs
186  else:
187  for connection in connections:
188  rospy.loginfo("Gateway : sending flip request [%s]%s" %
189  (flip.gateway, utils.format_rule(flip.rule)))
190  hub = remote_gateway_hub_index[flip.gateway][0]
191  hub.send_flip_request(flip.gateway, connection)
192  hub.post_flip_details(
193  flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
194  for flip in lost_flips[connection_type]:
195  state_changed = True
196  rospy.loginfo("Gateway : sending unflip request [%s]%s" % (flip.gateway, utils.format_rule(flip.rule)))
197  for hub in remote_gateway_hub_index[flip.gateway]:
198  rule = copy.deepcopy(flip.rule)
199  if hub.send_unflip_request(flip.gateway, rule):
200  # This hub was used to send the original flip request
201  hub.remove_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
202  break
203 
204  # Update flip status
205  flipped_connections = self.flipped_interface.get_flipped_connections()
206  # rospy.loginfo("flipped_connections = {}".format(flipped_connections))
207  for flip in flipped_connections:
208  for hub in remote_gateway_hub_index[flip.remote_rule.gateway]:
209  remote_rule = copy.deepcopy(flip.remote_rule)
210  remote_rule.rule.node = remote_rule.rule.node.split(",")[0] # get only node name, the xmlrpc_uri is not serialised
211  status = hub.get_flip_request_status(remote_rule)
212  if status is not None:
213  flip_state_changed = self.flipped_interface.update_flip_status(flip.remote_rule, status)
214  state_changed = state_changed or flip_state_changed
215  break
216 
217  if state_changed:
218  self._publish_gateway_info()
219 
220  def update_pulled_interface(self, unused_connections, remote_gateway_hub_index):
221  """
222  Process the list of local connections and check against
223  the current pull rules and patterns for changes. If a rule
224  has become (un)available take appropriate action.
225 
226  This is called by the watcher thread. The remote_gateway_hub_index
227  is always a full dictionary of all remote gateway and hub key-value
228  pairs - it is only included as an argument here to save
229  processing doubly in the watcher thread.
230 
231  @param connections : list of current local connections parsed from the master
232  @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
233 
234  @param remote_gateway_hub_index : key-value remote gateway name-hub list pairs
235  @type dictionary of remote_gateway_name-list of hub_api.Hub objects key-value pairs
236  """
237  state_changed = False
238  remote_connections = {}
239  for remote_gateway in remote_gateway_hub_index.keys() + self.pulled_interface.list_remote_gateway_names():
240  remote_connections[remote_gateway] = {}
241  try:
242  for hub in remote_gateway_hub_index[remote_gateway]:
243  remote_connections[remote_gateway].update(hub.get_remote_connection_state(remote_gateway))
244  except KeyError:
245  pass # remote gateway no longer exists on the hub network
246  new_pulls, lost_pulls = self.pulled_interface.update(remote_connections, self._unique_name)
247  for connection_type in utils.connection_types:
248  for pull in new_pulls[connection_type]:
249  # dig out the corresponding connection (bit inefficient plouging back into this again
250  connection = None
251  for remote_gateway in remote_connections.keys():
252  for c in remote_connections[remote_gateway][pull.rule.type]:
253  if c.rule.name == pull.rule.name and \
254  c.rule.node == pull.rule.node:
255  connection = c
256  break
257  if connection:
258  break
259  # Register this pull
260  existing_registration = self.pulled_interface.find_registration_match(
261  pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type)
262  if not existing_registration:
263  rospy.loginfo("Gateway : pulling in connection %s[%s]" %
264  (utils.format_rule(pull.rule), remote_gateway))
265  registration = utils.Registration(connection, pull.gateway)
266  new_registration = self.master.register(registration)
267  if new_registration is not None:
268  self.pulled_interface.registrations[registration.connection.rule.type].append(new_registration)
269  hub = remote_gateway_hub_index[pull.gateway][0]
270  hub.post_pull_details(pull.gateway, pull.rule.name, pull.rule.type, pull.rule.node)
271  state_changed = True
272  for pull in lost_pulls[connection_type]:
273  # Unregister this pull
274  existing_registration = self.pulled_interface.find_registration_match(
275  pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type)
276  if existing_registration:
277  rospy.loginfo("Gateway : abandoning pulled connection %s[%s]" % (
278  utils.format_rule(pull.rule), pull.gateway))
279  self.master.unregister(existing_registration)
280  # This code was here, but causing bugs...actually it should never remove details from the hub,
281  # that is the responsibility of the advertising gateway. TODO confirm this.
282  #hub = remote_gateway_hub_index[pull.gateway][0]
283  # if hub:
284  # hub.remove_pull_details(pull.gateway, pull.rule.name, pull.rule.type, pull.rule.node)
285  self.pulled_interface.registrations[
286  existing_registration.connection.rule.type].remove(existing_registration)
287  state_changed = True
288  if state_changed:
289  self._publish_gateway_info()
290 
291  def update_public_interface(self, local_connection_index):
292  """
293  Process the list of local connections and check against
294  the current rules and patterns for changes. If a rule
295  has become (un)available take appropriate action.
296 
297  @param local_connection_index : list of current local connections parsed from the master
298  @type : { utils.ConnectionType.xxx : utils.Connection[] } dictionaries
299  """
300  state_changed = False
301  # new_conns, lost_conns are of type { gateway_msgs.ConnectionType.xxx : utils.Connection[] }
302  new_conns, lost_conns = self.public_interface.update(
303  local_connection_index, self.master.generate_advertisement_connection_details)
304  # public_interface is of type gateway_msgs.Rule[]
305  public_interface = self.public_interface.getInterface()
306  for connection_type in utils.connection_types:
307  for new_connection in new_conns[connection_type]:
308  rospy.loginfo("Gateway : adding connection to public interface %s" %
309  utils.format_rule(new_connection.rule))
310  self.hub_manager.advertise(new_connection)
311  state_changed = True
312  for lost_connection in lost_conns[connection_type]:
313  rospy.loginfo("Gateway : removing connection from public interface %s" %
314  utils.format_rule(lost_connection.rule))
315  self.hub_manager.unadvertise(lost_connection)
316  state_changed = True
317  if state_changed:
318  self._publish_gateway_info()
319  return public_interface
320 
321  def update_flipped_in_interface(self, registrations, remote_gateway_hub_index):
322  """
323  Match the flipped in connections to supplied registrations using
324  supplied registrations, flipping and unflipping as necessary.
325 
326  @param registrations : registrations (with status) to be processed
327  @type list of (utils.Registration, str) where the str contains the status
328  """
329 
330  hubs = {}
331  for gateway in remote_gateway_hub_index:
332  for hub in remote_gateway_hub_index[gateway]:
333  hubs[hub.uri] = hub
334 
335  update_flip_status = {}
336  if self.flipped_interface.firewall:
337  if len(registrations) != 0:
338  rospy.logwarn("Gateway : firewalled, but received flip requests...")
339  for (registration, status) in registrations:
340  for hub in remote_gateway_hub_index[registration.remote_gateway]:
341  if hub.uri not in update_flip_status:
342  update_flip_status[hub.uri] = []
343  update_flip_status[hub.uri].append((registration, FlipStatus.BLOCKED))
344 
345  # Mark all these registrations as blocked
346  for hub_uri, hub in hubs.iteritems():
347  if hub_uri in update_flip_status:
348  hub.update_multiple_flip_request_status(update_flip_status[hub_uri])
349  return
350 
351  state_changed = False
352 
353  # Add new registrations
354  for (registration, status) in registrations:
355  # probably not necessary as the flipping gateway will already check this
356  existing_registration = self.flipped_interface.find_registration_match(
357  registration.remote_gateway,
358  registration.connection.rule.name,
359  registration.connection.rule.node,
360  registration.connection.rule.type)
361  if not existing_registration:
362  rospy.loginfo("Gateway : received a flip request %s" % str(registration))
363  state_changed = True
364  new_registration = self.master.register(registration)
365  if new_registration is not None:
366  self.flipped_interface.registrations[registration.connection.rule.type].append(new_registration)
367  # Update this flip's status
368  if status != FlipStatus.ACCEPTED:
369  for hub in remote_gateway_hub_index[registration.remote_gateway]:
370  if hub.uri not in update_flip_status:
371  update_flip_status[hub.uri] = []
372  update_flip_status[hub.uri].append((registration, FlipStatus.ACCEPTED))
373 
374  # Update the flip status for newly added registrations
375  for hub_uri, hub in hubs.iteritems():
376  if hub_uri in update_flip_status:
377  hub.update_multiple_flip_request_status(update_flip_status[hub_uri])
378 
379  # Remove local registrations that are no longer flipped to this gateway
380  local_registrations = copy.deepcopy(self.flipped_interface.registrations)
381  for connection_type in utils.connection_types:
382  for local_registration in local_registrations[connection_type]:
383  matched_registration = None
384  for (registration, status) in registrations:
385  if registration.connection == local_registration.connection and \
386  registration.remote_gateway == local_registration.remote_gateway:
387  matched_registration = registration
388  break
389  else:
390  continue
391  if matched_registration is None:
392  state_changed = True
393  rospy.loginfo("Gateway : unflipping received flip %s" % str(local_registration))
394  self.master.unregister(local_registration)
395  self.flipped_interface.registrations[connection_type].remove(local_registration)
396 
397  if state_changed:
398  self._publish_gateway_info()
399 
401  """
402  If we are running over a wired connection, then do nothing.
403  If over the wireless, updated data transfer rate and signal strength
404  for this gateway on the hub
405  """
406  statistics = self.network_interface_manager.get_statistics()
407  self.hub_manager.publish_network_statistics(statistics)
408 
409  ##########################################################################
410  # Incoming commands from local system (ros service callbacks)
411  ##########################################################################
412 
413  # def ros_service_set_watcher_period(self, request):
414  # '''
415  # Configures the watcher period. This is useful to slow/speed up the
416  # watcher loop. Quite often you want it polling quickly early while
417  # configuring connections, but on long loops later when it does not have
418  # to do very much except look for shutdown.
419  #
420  # @param request
421  # @type gateway_srvs.SetWatcherPeriodRequest
422  # @return service response
423  # @rtgateway_srvs.srv.SetWatcherPeriodResponse
424  # '''
425  # self.watcher_thread.set_watch_loop_period(request.period)
426  # return gateway_srvs.SetWatcherPeriodResponse(self.watcher_thread.get_watch_loop_period())
427  #
428  # def ros_subscriber_force_update(self, data):
429  # '''
430  # Trigger a watcher loop update
431  # '''
432  # self.watcher_thread.trigger_update = True
433 
434  def ros_service_advertise(self, request):
435  '''
436  Puts/Removes a number of rules on the public interface watchlist.
437  As local rules matching these rules become available/go away,
438  the public interface is modified accordingly. A manual update is done
439  at the end of the advertise call to quickly capture existing
440  rules
441 
442  @param request
443  @type gateway_srvs.AdvertiseRequest
444  @return service response
445  @rtgateway_srvs.srv.AdvertiseReponse
446  '''
447  response = gateway_srvs.AdvertiseResponse()
448  try:
449  if not request.cancel:
450  for rule in request.rules:
451  if not self.public_interface.add_rule(rule):
452  response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS
453  response.error_message = "advertisment rule already exists [%s:(%s,%s)]" % (
454  rule.name, rule.type, rule.node)
455  else:
456  for rule in request.rules:
457  if not self.public_interface.remove_rule(rule):
458  response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_NOT_FOUND
459  response.error_message = "advertisment not found [%s:(%s,%s)]" % (
460  rule.name, rule.type, rule.node)
461  except Exception as e:
462  rospy.logerr("Gateway : unknown advertise error [%s]." % str(e))
463  response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR
464 
465  # Let the watcher get on with the update asap
466  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
467  #self.watcher_thread.trigger_update = True
468  self._publish_gateway_info()
469  else:
470  rospy.logerr("Gateway : %s." % response.error_message)
471  response.watchlist = self.public_interface.getWatchlist()
472  return response
473 
474  def ros_service_advertise_all(self, request):
475  """
476  Toggles the advertise all mode. If advertising all, an additional
477  blacklist parameter can be supplied which includes all the topics that
478  will not be advertised/watched for. This blacklist is added to the
479  default blacklist of the public interface
480 
481  @param request
482  @type gateway_srvs.AdvertiseAllRequest
483  @return service response
484  @rtype gateway_srvs.AdvertiseAllReponse
485  """
486  response = gateway_srvs.AdvertiseAllResponse()
487  try:
488  if not request.cancel:
489  if not self.public_interface.advertise_all(request.blacklist):
490  response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS
491  response.error_message = "already advertising all."
492  else:
493  self.public_interface.unadvertise_all()
494  except Exception as e:
495  response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR
496  response.error_message = "unknown advertise all error [%s]" % (str(e))
497 
498  # Let the watcher get on with the update asap
499  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
500  #self.watcher_thread.trigger_update = True
501  self._publish_gateway_info()
502  else:
503  rospy.logerr("Gateway : %s." % response.error_message)
504  response.blacklist = self.public_interface.getBlacklist()
505  return response
506 
507  def ros_service_flip(self, request):
508  """
509  Puts flip rules on a watchlist which (un)flips them when they
510  become (un)available.
511 
512  @param request
513  @type gateway_srvs.RemoteRequest
514  @return service response
515  @rtype gateway_srvs.RemoteResponse
516  """
517  # could move this below and if any are fails, just abort adding the rules.
518  # Check if the target remote gateway is valid.
519 # response = self._check_remote_gateways(request.remotes)
520 # if response:
521 # return response
522 
523  response = gateway_srvs.RemoteResponse(gateway_msgs.ErrorCodes.SUCCESS, "")
524 
525  # result is currently SUCCESS
526  # Process all add/remove flip requests
527  if not request.cancel: # Rule add request
528  response = self._add_flip_rules(request.remotes)
529  else: # Rule remove request
530  response = self._remove_flip_rules(request.remotes)
531 
532  # Post processing
533  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
534  self._publish_gateway_info()
535  #self.watcher_thread.trigger_update = True
536  else:
537  rospy.logerr("Gateway : %s." % response.error_message)
538  return response
539 
540  def ros_service_flip_all(self, request):
541  """
542  Flips everything except a specified blacklist to a particular gateway,
543  or if the cancel flag is set, clears all flips to that gateway.
544 
545  @param request
546  @type gateway_srvs.RemoteAllRequest
547  @return service response
548  @rtype gateway_srvs.RemoteAllResponse
549  """
550  response = gateway_srvs.RemoteAllResponse()
551  remote_gateway_target_hash_name, response.result, response.error_message = self._ros_service_remote_checks(
552  request.gateway)
553  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
554  if not request.cancel:
555  if self.flipped_interface.flip_all(remote_gateway_target_hash_name, request.blacklist):
556  rospy.loginfo("Gateway : flipping all to gateway '%s'" % (remote_gateway_target_hash_name))
557  else:
558  response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
559  response.error_message = "already flipping all to gateway '%s' " + remote_gateway_target_hash_name
560  else: # request.cancel
561  self.flipped_interface.unflip_all(remote_gateway_target_hash_name)
562  rospy.loginfo("Gateway : cancelling a previous flip all request [%s]" % (request.gateway))
563  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
564  self._publish_gateway_info()
565  #self.watcher_thread.trigger_update = True
566  else:
567  rospy.logerr("Gateway : %s." % response.error_message)
568  return response
569 
570  def ros_service_pull(self, request):
571  """
572  Puts a single rule on a watchlist and pulls it from a particular
573  gateway when it becomes (un)available.
574 
575  @param request
576  @type gateway_srvs.RemoteRequest
577  @return service response
578  @rtype gateway_srvs.RemoteResponse
579  """
580  # could move this below and if any are fails, just abort adding the rules.
581  # Check if the target remote gateway is valid.
582  response = self._check_remote_gateways(request.remotes)
583  if response:
584  return response
585 
586  response = gateway_srvs.RemoteResponse(gateway_msgs.ErrorCodes.SUCCESS, "")
587 
588  # result is currently SUCCESS
589  added_rules = []
590  for remote in request.remotes:
591  if not request.cancel:
592  pull_rule = self.pulled_interface.add_rule(remote)
593  if pull_rule:
594  added_rules.append(pull_rule)
595  rospy.loginfo("Gateway : added pull rule [%s:(%s,%s)]" %
596  (pull_rule.gateway, pull_rule.rule.name, pull_rule.rule.type))
597  else:
598  response.result = gateway_msgs.ErrorCodes.PULL_RULE_ALREADY_EXISTS
599  response.error_message = "pull rule already exists [%s:(%s,%s)]" % (
600  remote.gateway, remote.rule.name, remote.rule.type)
601  break
602  else: # request.cancel
603  for remote in request.remotes:
604  removed_pull_rules = self.pulled_interface.remove_rule(remote)
605  if removed_pull_rules:
606  rospy.loginfo("Gateway : removed pull rule [%s:%s]" % (remote.gateway, remote.rule.name))
607  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
608  self._publish_gateway_info()
609  #self.watcher_thread.trigger_update = True
610  else:
611  if added_rules: # completely abort any added rules
612  for added_rule in added_rules:
613  self.pulled_interface.remove_rule(added_rule)
614  rospy.logerr("Gateway : %s." % response.error_message)
615  return response
616 
617  def ros_service_pull_all(self, request):
618  """
619  Pull everything except a specified blacklist from a particular gateway,
620  or if the cancel flag is set, clears all pulls from that gateway.
621 
622  @param request
623  @type gateway_srvs.RemoteAllRequest
624  @return service response
625  @rtype gateway_srvs.RemoteAllResponse
626  """
627  response = gateway_srvs.RemoteAllResponse()
628  remote_gateway_target_hash_name, response.result, response.error_message = self._ros_service_remote_checks(
629  request.gateway)
630  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
631  if not request.cancel:
632  if self.pulled_interface.pull_all(remote_gateway_target_hash_name, request.blacklist):
633  rospy.loginfo("Gateway : pulling all from gateway [%s]" % (request.gateway))
634  else:
635  response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
636  response.error_message = "already pulling all from gateway [%s] " % (request.gateway)
637  else: # request.cancel
638  self.pulled_interface.unpull_all(remote_gateway_target_hash_name)
639  rospy.loginfo("Gateway : cancelling a previous pull all request [%s]" % (request.gateway))
640  if response.result == gateway_msgs.ErrorCodes.SUCCESS:
641  self._publish_gateway_info()
642  #self.watcher_thread.trigger_update = True
643  else:
644  rospy.logerr("Gateway : %s." % response.error_message)
645  return response
646 
647  def _ros_service_remote_checks(self, gateway):
648  """
649  Some simple checks when pulling or flipping to make sure that the remote gateway is visible. It
650  does a strict check on the hash names first, then falls back to looking for weak matches on the
651  human friendly name.
652 
653  @param gateway : remote gateway target name (can be hash name, basename or regex pattern)
654  @type string
655  @return pair of result type and message
656  @rtype gateway_msgs.ErrorCodes.xxx, string
657  """
658  if not self.is_connected():
659  return None, gateway_msgs.ErrorCodes.NO_HUB_CONNECTION, "not connected to hub, aborting"
660  if gateway == self._unique_name:
661  return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_SELF_IS_NOT, "gateway cannot flip/pull to itself"
662  return gateway, gateway_msgs.ErrorCodes.SUCCESS, ""
663 # matches, weak_matches = self.hub_manager.match_remote_gateway_name(gateway)
664 # if len(matches) > 1:
665 # return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_TARGET_HAS_MULTIPLE_MATCHES, \
666 # "remote gateway target has multiple matches, invalid [%s][%s]" % (gateway, matches)
667 # elif len(matches) == 1:
668 # return matches[0], gateway_msgs.ErrorCodes.SUCCESS, ""
669 # Fallback to checking for weak matches
670 # if len(weak_matches) > 1:
671 # return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_TARGET_HAS_MULTIPLE_MATCHES, \
672 # "remote gateway target has multiple matches against hashed names, invalid [%s]" % weak_matches
673 # elif len(weak_matches) == 1:
674 # return weak_matches[0], gateway_msgs.ErrorCodes.SUCCESS, ""
675 # Not visible
676 # return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_NOT_VISIBLE, "remote
677 # gateway is currently not visible on the hubs [%s]" % gateway
678 
679  def _check_remote_gateways(self, remotes):
680  """
681  Check given gateways in remote rules are valid
682 
683  :param remotes: remote rules
684  :type remotes: gateway_msgs.RemoteRule[]
685 
686  :return: whether it is valid, error message if it fails
687  :rtypes: None or gateway_srvs.RemoteResponse
688  """
689  response = gateway_srvs.RemoteResponse()
690  for remote in remotes:
691  remote.gateway, response.result, response.error_message = self._ros_service_remote_checks(remote.gateway)
692  if response.result != gateway_msgs.ErrorCodes.SUCCESS:
693  rospy.logerr("Gateway : %s." % response.error_message)
694  return response
695  return None
696 
697  def _add_flip_rules(self, remotes):
698  """
699  Add given rules into watcher list
700 
701  :param remotes: remote rules
702  :type remotes: gateway_msgs.RemoteRule[]
703  :return: whether it is successful
704  :rtypes: gateway_srvs.RemoteResponse
705  """
706  response = gateway_srvs.RemoteResponse()
707  response.result = gateway_msgs.ErrorCodes.SUCCESS
708 
709  added_rules = []
710  for remote in remotes:
711  flip_rule = self.flipped_interface.add_rule(remote)
712  if flip_rule:
713  added_rules.append(flip_rule)
714  rospy.loginfo("Gateway : added flip rule [%s:(%s,%s)]" % (flip_rule.gateway, flip_rule.rule.name, flip_rule.rule.type))
715  else:
716  response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
717  response.error_message = "flip rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
718 
719  if response.result != gateway_msgs.ErrorCodes.SUCCESS:
720  # completely abort any added rules
721  for added_rule in added_rules:
722  self.flipped_interface.remove_rule(added_rule)
723  return response
724 
725  def _remove_flip_rules(self, remotes):
726  """
727  remove given rules into watcher list
728 
729  :param remotes: remote rules
730  :type remotes: gateway_msgs.RemoteRule[]
731  :return: whether it is successful
732  :rtypes: gateway_srvs.RemoteResponse
733  """
734  response = gateway_srvs.RemoteResponse()
735  response.result = gateway_msgs.ErrorCodes.SUCCESS
736 
737  for remote in remotes:
738  removed_flip_rules = self.flipped_interface.remove_rule(remote)
739  if removed_flip_rules:
740  rospy.loginfo("Gateway : removed flip rule [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type))
741  return response
def update_public_interface(self, local_connection_index)
def update_pulled_interface(self, unused_connections, remote_gateway_hub_index)
def __init__(self, hub_manager, param, unique_name, publish_gateway_info_callback)
def update_flipped_in_interface(self, registrations, remote_gateway_hub_index)
def ros_service_advertise(self, request)
Incoming commands from local system (ros service callbacks)
def update_flipped_interface(self, local_connection_index, remote_gateway_hub_index)
Update interface states (jobs assigned from connection_cache callback thread)


rocon_gateway
Author(s): Daniel Stonier , Jihoon Lee , Piyush Khandelwal
autogenerated on Mon Jun 10 2019 14:40:10