gateway_hub.py
Go to the documentation of this file.
1 #
2 # License: BSD
3 #
4 # https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
5 #
6 ###############################################################################
7 # Imports
8 ###############################################################################
9 
10 import threading
11 import rospy
12 import re
13 import utils
14 from gateway_msgs.msg import RemoteRuleWithStatus as FlipStatus, RemoteRule
15 import gateway_msgs.msg as gateway_msgs
16 import rocon_python_comms
17 import rocon_python_utils
18 import rocon_gateway_utils
19 import rocon_hub_client
20 import rocon_python_redis as redis
21 import time
22 from rocon_hub_client import hub_api, hub_client
23 from rocon_hub_client.exceptions import HubConnectionLostError, \
24  HubNameNotFoundError, HubNotFoundError, HubConnectionFailedError
25 
26 from .exceptions import GatewayUnavailableError
27 
28 import rocon_console.console as console
29 
30 ###############################################################################
31 # Redis Connection Checker
32 ##############################################################################
33 
34 
35 class HubConnectionCheckerThread(threading.Thread):
36  '''
37  Pings redis periodically to figure out if the redis connection is still alive.
38  '''
39 
40  def __init__(self, ip, port, hub_connection_lost_hook):
41  threading.Thread.__init__(self)
42  self.daemon = True # clean shut down of thread when hub connection is lost
43  self.ping_frequency = 0.2 # Too spammy? # TODO Need to parametrize
44  self._hub_connection_lost_hook = hub_connection_lost_hook
45  self.ip = ip
46  self.port = port
47  self.pinger = rocon_python_utils.network.Pinger(self.ip, self.ping_frequency)
48  self.terminate_requested = False
49 
50  def get_latency(self):
51  return self.pinger.get_latency()
52 
53  def run(self):
54  '''
55  This runs in the background to gather the latest connection statistics
56  Note - it's not used in the keep alive check
57  '''
58  self.pinger.start()
59  rate = rocon_python_comms.WallRate(self.ping_frequency)
60  alive = True
61  timeout = 1 / self.ping_frequency
62  while alive and not self.terminate_requested:
63  alive, message = hub_client.ping_hub(self.ip, self.port, timeout)
64  rate.sleep()
65  if not alive:
66  rospy.logwarn("Gateway : hub connection no longer alive, disengaging [%s]" % message)
68  # else shutting down thread by request
69 
70 ##############################################################################
71 # Hub
72 ##############################################################################
73 
74 
75 class GatewayHub(rocon_hub_client.Hub):
76  """
77  Manages the Hub data.
78  This is used both by HubManager for the gateway node, and by the rocon hub watcher.
79  """
80  def __init__(self, ip, port, whitelist, blacklist):
81  '''
82  @param remote_gateway_request_callbacks : to handle redis responses
83  @type list of function pointers (back to GatewaySync class
84 
85  @param ip : redis server ip
86  @param port : redis server port
87 
88  @raise HubNameNotFoundError, HubNotFoundError
89  '''
90  try:
91  super(GatewayHub, self).__init__(ip, port, whitelist, blacklist) # can just do super() in python3
92  except HubNotFoundError:
93  raise
94  except HubNameNotFoundError:
95  raise
97  self._firewall = 0
98 
99  # Setting up some basic parameters in-case we use this API without registering a gateway
100  self._redis_keys['gatewaylist'] = hub_api.create_rocon_hub_key('gatewaylist')
103 
104  ##########################################################################
105  # Hub Connections
106  ##########################################################################
107 
108  def register_gateway(self, firewall, unique_gateway_name, hub_connection_lost_gateway_hook, gateway_ip):
109  '''
110  Register a gateway with the hub.
111 
112  @param firewall
113  @param unique_gateway_name
114  @param hub_connection_lost_gateway_hook : used to trigger Gateway.disengage_hub(hub)
115  on lost hub connections in redis pubsub listener thread.
116  @gateway_ip
117 
118  @raise HubConnectionLostError if for some reason, the redis server has become unavailable.
119  '''
120  if not self._redis_server:
121  raise HubConnectionLostError()
122  self._unique_gateway_name = unique_gateway_name
123  self.private_key, public_key = utils.generate_private_public_key()
124 
125  serialized_public_key = utils.serialize_key(public_key)
126  ping_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, ':ping')
127  # rospy.loginfo("=>{0} TTL {1}".format(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL))
128 
129  self._redis_keys['ip'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'ip')
130  self._redis_keys['gateway'] = hub_api.create_rocon_key(unique_gateway_name)
131  self._redis_keys['firewall'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'firewall')
132  self._redis_keys['public_key'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'public_key')
133 
134  self._firewall = 1 if firewall else 0
135  self._hub_connection_lost_gateway_hook = hub_connection_lost_gateway_hook
136 
137  pipe = self._redis_server.pipeline()
138 
139  try:
140  pipe.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
141  pipe.set(self._redis_keys['firewall'], self._firewall)
142 
143  # I think we just used this for debugging, but we might want to hide it in
144  # future (it's the ros master hostname/ip)
145  pipe.set(self._redis_keys['ip'], gateway_ip)
146 
147  pipe.get(self._redis_keys['public_key'])
148  pipe.set(self._redis_keys['public_key'], serialized_public_key)
149  pipe.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
150 
151  # Let hub know we are alive
152  pipe.set(ping_key, True)
153  pipe.expire(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL)
154 
155  ret_pipe = pipe.execute()
156  [r_check_gateway, r_firewall, r_ip, r_oldkey, r_newkey, r_add_gateway, r_ping, r_expire] = ret_pipe
157 
158  except (redis.WatchError, redis.ConnectionError) as e:
159  raise HubConnectionFailedError("Connection Failed while registering hub[%s]" % str(e))
160  finally:
161  pipe.reset()
162 
163  # if not self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway']):
164  # should never get here - unique should be unique
165  # pass
166 
167  self.mark_named_gateway_available(self._redis_keys['gateway'])
168 
169  if serialized_public_key != r_oldkey:
170  rospy.loginfo('Gateway : found existing mismatched public key on the hub, ' +
171  'requesting resend for all flip-ins.')
172  self._resend_all_flip_ins()
173 
174  # Mark this gateway as now available
176  self.ip, self.port, self._hub_connection_lost_hook)
177  self.hub_connection_checker_thread.start()
178  self.connection_lost_lock = threading.Lock()
179 
181  '''
182  This gets triggered by the redis connection checker thread when the hub connection is lost.
183  It then passes the trigger to the gateway who needs to remove the hub.
184  '''
185  self.connection_lost_lock.acquire()
186  # should probably have a try: except AttributeError here as the following is not atomic.
187  if self._hub_connection_lost_gateway_hook is not None:
188  rospy.loginfo("Gateway : lost connection with hub, attempting to disconnect...")
191  self.connection_lost_lock.release()
192 
194  '''
195  Checks if gateway info is on the hub.
196 
197  @return: success or failure of the operation
198  @rtype: bool
199  '''
200  try:
201  if self._unique_gateway_name: # this will be set during the first registration
202  return self.is_named_gateway_registered(self._redis_keys['gateway'])
203  else:
204  # we dont have local memory of being registered
205  # so we want to override any existing record on hubs out there
206  return False
207  except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
208  # usually just means the hub has gone down just before us or is in the
209  # middle of doing so forget it for now
210  # rospy.logwarn("Gateway : problem checking gateway on the hub " +
211  # "(likely that hub is temporarily out of network).")
212  pass
213 
214  def publish_network_statistics(self, statistics):
215  '''
216  Publish network interface information to the hub
217 
218  @param statistics
219  @type gateway_msgs.RemoteGateway
220  '''
221  try:
222  # Let hub know that we are alive - even for wired connections. Perhaps something can
223  # go wrong for them too, though no idea what. Anyway, writing one entry is low cost
224  # and it makes the logic easier on the hub side.
225  ping_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, ':ping')
226  self._redis_server.set(ping_key, True)
227  self._redis_server.expire(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL)
228  # rospy.loginfo("=>{0} TTL {1}".format(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL))
229  # this should probably be posted independently of whether the hub is contactable or not
230  # refer to https://github.com/robotics-in-concert/rocon_multimaster/pull/273/files#diff-22b726fec736c73a96fd98c957d9de1aL189
231  if not statistics.network_info_available:
232  rospy.logdebug("Gateway : unable to publish network statistics [network info unavailable]")
233  return
234  network_info_available = hub_api.create_rocon_gateway_key(
235  self._unique_gateway_name, 'network:info_available')
236  self._redis_server.set(network_info_available, statistics.network_info_available)
237  network_type = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'network:type')
238  self._redis_server.set(network_type, statistics.network_type)
239 
240  # Update latency statistics
241  latency = self.hub_connection_checker_thread.get_latency()
243  # If wired, don't worry about wireless statistics.
244  if statistics.network_type == gateway_msgs.RemoteGateway.WIRED:
245  return
246  wireless_bitrate_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:bitrate')
247  self._redis_server.set(wireless_bitrate_key, statistics.wireless_bitrate)
248  wireless_link_quality = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:quality')
249  self._redis_server.set(wireless_link_quality, statistics.wireless_link_quality)
250  wireless_signal_level = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:signal_level')
251  self._redis_server.set(wireless_signal_level, statistics.wireless_signal_level)
252  wireless_noise_level = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:noise_level')
253  self._redis_server.set(wireless_noise_level, statistics.wireless_noise_level)
254  except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
255  rospy.logdebug("Gateway : unable to publish network statistics [no connection to the hub]")
256 
257  def unregister_named_gateway(self, gateway_key):
258  '''
259  Remove all gateway info for given gateway key from the hub.
260  '''
261  try:
262  gateway_keys = self._redis_server.keys(gateway_key + ":*")
263  pipe = self._redis_server.pipeline()
264  pipe.delete(*gateway_keys)
265  pipe.srem(self._redis_keys['gatewaylist'], gateway_key)
266  pipe.execute()
267  except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
268  pass
269 
270  def is_named_gateway_registered(self, gateway_key):
271  '''
272  Check if the gateway exists in this hub
273  because sometimes the gateway ping can be there but all info has been wiped by the hub
274  '''
275  try:
276  return self._redis_server.sismember(self._redis_keys['gatewaylist'], gateway_key)
277  except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
278  pass
279 
280  def update_named_gateway_latency_stats(self, gateway_name, latency_stats):
281  '''
282  For a given gateway, update the latency statistics
283 
284  #param gateway_name : gateway name, not the redis key
285  @type str
286  @param latency_stats : ping statistics to the gateway from the hub
287  @type list : 4-tuple of float values [min, avg, max, mean deviation]
288  '''
289  try:
290  min_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:min')
291  avg_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:avg')
292  max_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:max')
293  mdev_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:mdev')
294  self._redis_server.set(min_latency_key, latency_stats[0])
295  self._redis_server.set(avg_latency_key, latency_stats[1])
296  self._redis_server.set(max_latency_key, latency_stats[2])
297  self._redis_server.set(mdev_latency_key, latency_stats[3])
298  except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
299  rospy.logerr("Gateway: unable to update latency stats for " + gateway_name)
300 
301  def mark_named_gateway_available(self, gateway_key, available=True,
302  time_since_last_seen=0.0):
303  '''
304  This function is used by the hub to mark if a gateway can be pinged.
305  If a gateway cannot be pinged, the hub indicates how longs has it been
306  since the hub was last seen
307 
308  @param gateway_key : The gateway key (not the name)
309  @type str
310  @param available: If the gateway can be pinged right now
311  @type bool
312  @param time_since_last_seen: If available is false, how long has it
313  been since the gateway was last seen (in seconds)
314  @type float
315  '''
316  pipe = self._redis_server.pipeline()
317  try:
318  available_key = gateway_key + ":available"
319  pipe.set(available_key, available)
320  time_since_last_seen_key = gateway_key + ":time_since_last_seen"
321  pipe.set(time_since_last_seen_key, int(time_since_last_seen))
322  unused_ret_pipe = pipe.execute()
323  except (redis.WatchError, redis.ConnectionError) as e:
324  raise HubConnectionFailedError("Connection Failed while registering hub[%s]" % str(e))
325  finally:
326  pipe.reset()
327 
328  ##########################################################################
329  # Hub Data Retrieval
330  ##########################################################################
331 
332  def remote_gateway_info(self, gateway):
333  '''
334  Return remote gateway information for the specified gateway string id.
335 
336  @param gateways : gateway id string to search for
337  @type string
338  @return remote gateway information
339  @rtype gateway_msgs.RemotGateway or None
340  '''
341  firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall'))
342  if firewall is None:
343  return None # equivalent to saying no gateway of this id found
344  ip = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'ip'))
345  if ip is None:
346  return None # hub information not available/correct
347  remote_gateway = gateway_msgs.RemoteGateway()
348  remote_gateway.name = gateway
349  remote_gateway.ip = ip
350  remote_gateway.firewall = True if int(firewall) else False
351  remote_gateway.public_interface = []
352  encoded_advertisements = self._redis_server.smembers(
353  hub_api.create_rocon_gateway_key(gateway, 'advertisements'))
354  for encoded_advertisement in encoded_advertisements:
355  advertisement = utils.deserialize_connection(encoded_advertisement)
356  remote_gateway.public_interface.append(advertisement.rule)
357  remote_gateway.flipped_interface = []
358  encoded_flips = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'flips'))
359  for encoded_flip in encoded_flips:
360  [target_gateway, name, connection_type, node] = utils.deserialize(encoded_flip)
361  remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
362  remote_gateway.flipped_interface.append(remote_rule)
363  remote_gateway.pulled_interface = []
364  encoded_pulls = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'pulls'))
365  for encoded_pull in encoded_pulls:
366  [target_gateway, name, connection_type, node] = utils.deserialize(encoded_pull)
367  remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
368  remote_gateway.pulled_interface.append(remote_rule)
369 
370  # Gateway health/network connection statistics indicators
371  gateway_available_key = hub_api.create_rocon_gateway_key(gateway, 'available')
372  remote_gateway.conn_stats.gateway_available = \
373  self._parse_redis_bool(self._redis_server.get(gateway_available_key))
374  time_since_last_seen_key = hub_api.create_rocon_gateway_key(gateway, 'time_since_last_seen')
375  remote_gateway.conn_stats.time_since_last_seen = \
376  self._parse_redis_int(self._redis_server.get(time_since_last_seen_key))
377 
378  ping_latency_min_key = hub_api.create_rocon_gateway_key(gateway, 'latency:min')
379  remote_gateway.conn_stats.ping_latency_min = \
380  self._parse_redis_float(self._redis_server.get(ping_latency_min_key))
381  ping_latency_max_key = hub_api.create_rocon_gateway_key(gateway, 'latency:max')
382  remote_gateway.conn_stats.ping_latency_max = \
383  self._parse_redis_float(self._redis_server.get(ping_latency_max_key))
384  ping_latency_avg_key = hub_api.create_rocon_gateway_key(gateway, 'latency:avg')
385  remote_gateway.conn_stats.ping_latency_avg = \
386  self._parse_redis_float(self._redis_server.get(ping_latency_avg_key))
387  ping_latency_mdev_key = hub_api.create_rocon_gateway_key(gateway, 'latency:mdev')
388  remote_gateway.conn_stats.ping_latency_mdev = \
389  self._parse_redis_float(self._redis_server.get(ping_latency_mdev_key))
390 
391  # Gateway network connection indicators
392  network_info_available_key = hub_api.create_rocon_gateway_key(gateway, 'network:info_available')
393  remote_gateway.conn_stats.network_info_available = \
394  self._parse_redis_bool(self._redis_server.get(network_info_available_key))
395  if not remote_gateway.conn_stats.network_info_available:
396  return remote_gateway
397  network_type_key = hub_api.create_rocon_gateway_key(gateway, 'network:type')
398  remote_gateway.conn_stats.network_type = \
399  self._parse_redis_int(self._redis_server.get(network_type_key))
400  if remote_gateway.conn_stats.network_type == gateway_msgs.RemoteGateway.WIRED:
401  return remote_gateway
402  wireless_bitrate_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:bitrate')
403  remote_gateway.conn_stats.wireless_bitrate = \
404  self._parse_redis_float(self._redis_server.get(wireless_bitrate_key))
405  wireless_link_quality_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:quality')
406  remote_gateway.conn_stats.wireless_link_quality = \
407  self._parse_redis_int(self._redis_server.get(wireless_link_quality_key))
408  wireless_signal_level_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:signal_level')
409  remote_gateway.conn_stats.wireless_signal_level = \
410  self._parse_redis_float(self._redis_server.get(wireless_signal_level_key))
411  wireless_noise_level_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:noise_level')
412  remote_gateway.conn_stats.wireless_noise_level = \
413  self._parse_redis_float(self._redis_server.get(wireless_noise_level_key))
414  return remote_gateway
415 
417  '''
418  Return a list of the gateways (name list, not redis keys).
419  e.g. ['gateway32adcda32','pirate21fasdf']. If not connected, just
420  returns an empty list.
421  '''
422  if not self._redis_server:
423  rospy.logerr("Gateway : cannot retrieve remote gateway names [%s][%s]." % (self.name, self.uri))
424  return []
425  gateways = []
426  try:
427  gateway_keys = self._redis_server.smembers(self._redis_keys['gatewaylist'])
428  for gateway in gateway_keys:
429  if hub_api.key_base_name(gateway) != self._unique_gateway_name:
430  # rospy.loginfo("Gateway discovered: [%s][%s]." % (self.name, self.uri))
431  gateways.append(hub_api.key_base_name(gateway))
432  except (redis.ConnectionError, AttributeError) as unused_e:
433  # redis misbehaves a little here, sometimes it doesn't catch a disconnection properly
434  # see https://github.com/robotics-in-concert/rocon_multimaster/issues/251 so it
435  # pops up as an AttributeError
436  pass
437  return gateways
438 
439  def matches_remote_gateway_name(self, gateway):
440  '''
441  Use this when gateway can be a regular expression and
442  we need to check it off against list_remote_gateway_names()
443 
444  @return a list of matches (higher level decides on action for duplicates).
445  @rtype list[str] : list of remote gateway names.
446  '''
447  matches = []
448  try:
449  for remote_gateway in self.list_remote_gateway_names():
450  if re.match(gateway, remote_gateway):
451  matches.append(remote_gateway)
452  except HubConnectionLostError:
453  raise
454  return matches
455 
457  '''
458  Use this when gateway can be a regular expression and
459  we need to check it off against list_remote_gateway_names()
460  '''
461  weak_matches = []
462  try:
463  for remote_gateway in self.list_remote_gateway_names():
464  if re.match(gateway, rocon_gateway_utils.gateway_basename(remote_gateway)):
465  weak_matches.append(remote_gateway)
466  except HubConnectionLostError:
467  raise
468  return weak_matches
469 
470  def get_remote_connection_state(self, remote_gateway):
471  '''
472  Equivalent to get_connection_state, but generates it from the public
473  interface of a remote gateway
474 
475  @param remote_gateway : hash name for a remote gateway
476  @type str
477  @return dictionary of remote advertisements
478  @rtype dictionary of connection type keyed connection values
479  '''
480  connections = utils.create_empty_connection_type_dictionary()
481  key = hub_api.create_rocon_gateway_key(remote_gateway, 'advertisements')
482  try:
483  public_interface = self._redis_server.smembers(key)
484  for connection_str in public_interface:
485  connection = utils.deserialize_connection(connection_str)
486  connections[connection.rule.type].append(connection)
487  except redis.exceptions.ConnectionError:
488  # will arrive here if the hub happens to have been lost last update and arriving here
489  pass
490  return connections
491 
493  '''
494  Returns the value of the remote gateway's firewall (flip)
495  flag.
496 
497  @param gateway : gateway string id
498  @param string
499 
500  @return state of the flag
501  @rtype Bool
502 
503  @raise GatewayUnavailableError when specified gateway is not on the hub
504  '''
505  firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall'))
506  if firewall is not None:
507  return True if int(firewall) else False
508  else:
509  raise GatewayUnavailableError
510 
512  '''
513  Retrieves the local list of advertisements from the hub. This
514  gets used to sync across multiple hubs.
515 
516  @return dictionary of remote advertisements
517  @rtype dictionary of connection type keyed connection values
518  '''
519  connections = utils.create_empty_connection_type_dictionary()
520  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
521  try:
522  public_interface = self._redis_server.smembers(key)
523  for connection_str in public_interface:
524  connection = utils.deserialize_connection(connection_str)
525  connections[connection.rule.type].append(connection)
526  except redis.exceptions.ConnectionError:
527  # not an error, just means its out of range, and we can't get the list.
528  pass
529  return connections
530 
531  def _parse_redis_float(self, val):
532  if val:
533  return float(val)
534  else:
535  return 0.0
536 
537  def _parse_redis_int(self, val):
538  if val:
539  return int(val)
540  else:
541  return 0.0
542 
543  def _parse_redis_bool(self, val):
544  if val and (val == 'True'):
545  return True
546  else:
547  return False
548 
549  ##########################################################################
550  # Posting Information to the Hub
551  ##########################################################################
552 
553  def advertise(self, connection):
554  '''
555  Places a topic, service or action on the public interface. On the
556  redis server, this representation will always be:
557 
558  - topic : a triple { name, type, xmlrpc node uri }
559  - service : a triple { name, rosrpc uri, xmlrpc node uri }
560  - action : ???
561 
562  @param connection: representation of a connection (topic, service, action)
563  @type connection: str
564  @raise .exceptions.ConnectionTypeError: if connection arg is invalid.
565  '''
566  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
567  msg_str = utils.serialize_connection(connection)
568  self._redis_server.sadd(key, msg_str)
569 
570  def unadvertise(self, connection):
571  '''
572  Removes a topic, service or action from the public interface.
573 
574  @param connection: representation of a connection (topic, service, action)
575  @type connection: str
576  @raise .exceptions.ConnectionTypeError: if connectionarg is invalid.
577  '''
578  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
579  msg_str = utils.serialize_connection(connection)
580  self._redis_server.srem(key, msg_str)
581 
582  def post_flip_details(self, gateway, name, connection_type, node):
583  '''
584  Post flip details to the redis server. This has no actual functionality,
585  it is just useful for debugging with the remote_gateway_info service.
586 
587  @param gateway : the target of the flip
588  @type string
589  @param name : the name of the connection
590  @type string
591  @param type : the type of the connection (one of ConnectionType.xxx
592  @type string
593  @param node : the node name it was pulled from
594  @type string
595  '''
596  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips')
597  serialized_data = utils.serialize([gateway, name, connection_type, node])
598  self._redis_server.sadd(key, serialized_data)
599 
600  def remove_flip_details(self, gateway, name, connection_type, node):
601  '''
602  Post flip details to the redis server. This has no actual functionality,
603  it is just useful for debugging with the remote_gateway_info service.
604 
605  @param gateway : the target of the flip
606  @type string
607  @param name : the name of the connection
608  @type string
609  @param type : the type of the connection (one of ConnectionType.xxx
610  @type string
611  @param node : the node name it was pulled from
612  @type string
613  '''
614  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips')
615  serialized_data = utils.serialize([gateway, name, connection_type, node])
616  self._redis_server.srem(key, serialized_data)
617 
618  def post_pull_details(self, gateway, name, connection_type, node):
619  '''
620  Post pull details to the hub. This has no actual functionality,
621  it is just useful for debugging with the remote_gateway_info service.
622 
623  @param gateway : the gateway it is pulling from
624  @type string
625  @param name : the name of the connection
626  @type string
627  @param type : the type of the connection (one of ConnectionType.xxx
628  @type string
629  @param node : the node name it was pulled from
630  @type string
631  '''
632  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls')
633  serialized_data = utils.serialize([gateway, name, connection_type, node])
634  self._redis_server.sadd(key, serialized_data)
635 
636  def remove_pull_details(self, gateway, name, connection_type, node):
637  '''
638  Post pull details to the hub. This has no actual functionality,
639  it is just useful for debugging with the remote_gateway_info service.
640 
641  @param gateway : the gateway it was pulling from
642  @type string
643  @param name : the name of the connection
644  @type string
645  @param type : the type of the connection (one of ConnectionType.xxx
646  @type string
647  @param node : the node name it was pulled from
648  @type string
649  '''
650  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls')
651  serialized_data = utils.serialize([gateway, name, connection_type, node])
652  self._redis_server.srem(key, serialized_data)
653 
654  ##########################################################################
655  # Flip specific communication
656  ##########################################################################
657 
659  '''
660  Marks all flip ins to be resent. Until these flips are resent, they
661  will not be processed
662  '''
663  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
664  encoded_flip_ins = []
665  try:
666  encoded_flip_ins = self._redis_server.smembers(key)
667  self._redis_server.delete(key)
668  for flip_in in encoded_flip_ins:
669  status, source, connection_list = utils.deserialize_request(flip_in)
670  connection = utils.get_connection_from_list(connection_list)
671  status = FlipStatus.RESEND
672  serialized_data = utils.serialize_connection_request(status,
673  source,
674  connection)
675  self._redis_server.sadd(key, serialized_data)
676  except (redis.ConnectionError, AttributeError) as unused_e:
677  # probably disconnected from the hub
678  pass
679 
681  '''
682  Gets all the flipped in connections listed on the hub that are interesting
683  for this gateway (i.e. all unblocked/pending). This is used by the
684  watcher loop to work out how it needs to update the local registrations.
685 
686  :returns: the flipped in registration strings and status.
687  :rtype: list of (utils.Registration, FlipStatus.XXX) tuples.
688  '''
689  registrations = []
690  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
691  encoded_flip_ins = []
692  try:
693  encoded_flip_ins = self._redis_server.smembers(key)
694  except (redis.ConnectionError, AttributeError) as unused_e:
695  # probably disconnected from the hub
696  pass
697  for flip_in in encoded_flip_ins:
698  status, source, connection_list = utils.deserialize_request(flip_in)
699  if source not in self.list_remote_gateway_names():
700  continue
701  connection = utils.get_connection_from_list(connection_list)
702  connection = utils.decrypt_connection(connection, self.private_key)
703  if status != FlipStatus.BLOCKED and status != FlipStatus.RESEND:
704  registrations.append((utils.Registration(connection, source), status))
705  return registrations
706 
707  def update_flip_request_status(self, registration_with_status):
708  '''
709  Updates the flip request status for this hub
710 
711  @param registration_with_status : the flip registration for which we are updating status
712  @type (utils.Registration, str) where str is the status
713 
714  @param status : pending/accepted/blocked
715  @type same as gateway_msgs.msg.RemoteRuleWithStatus.status
716 
717  @return True if this hub was used to send the flip request, and the status was updated. False otherwise.
718  @rtype Boolean
719  '''
720  result = self.update_multiple_flip_request_status([registration_with_status])
721  return result[0]
722 
723  def update_multiple_flip_request_status(self, registrations_with_status):
724  '''
725  Updates the flip request status for multiple registrations on this hub
726 
727  @param registrations_with_status : the flip registration for which we are updating status
728  @type list of (utils.Registration, str) where str is the status
729 
730  @param status : pending/accepted/blocked
731  @type same as gateway_msgs.msg.RemoteRuleWithStatus.status
732 
733  @return True if this hub was used to send the flip request, false otherwise.
734  @rtype Boolean
735  '''
736  result = [False] * len(registrations_with_status)
737  update_registrations = []
738  hub_found = False
739  key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
740  try:
741  encoded_flip_ins = self._redis_server.smembers(key)
742  for flip_in in encoded_flip_ins:
743  old_status, source, connection_list = utils.deserialize_request(flip_in)
744  connection = utils.get_connection_from_list(connection_list)
745  connection = utils.decrypt_connection(connection, self.private_key)
746  for index, (registration, new_status) in enumerate(registrations_with_status):
747  if source == registration.remote_gateway and connection == registration.connection:
748  if new_status != old_status:
749  self._redis_server.srem(key, flip_in)
750  update_registrations.append((index, (registration, new_status)))
751  else:
752  result[index] = True
753 
754  for (index, (registration, new_status)) in update_registrations:
755  encrypted_connection = utils.encrypt_connection(registration.connection,
756  self.private_key)
757  serialized_data = utils.serialize_connection_request(new_status,
758  registration.remote_gateway,
759  encrypted_connection)
760  self._redis_server.sadd(key, serialized_data)
761  result[index] = True
762  except redis.exceptions.ConnectionError:
763  # Means the hub has gone down (typically on shutdown so just be quiet)
764  # If we really need to know that a hub is crashed, change this policy
765  pass
766  return result
767 
768  def get_flip_request_status(self, remote_rule):
769  '''
770  Get the status of a flipped registration. If the flip request does not
771  exist (for instance, in the case where this hub was not used to send
772  the request), then None is returned
773 
774  @return the flip status or None
775  @rtype same as gateway_msgs.msg.RemoteRuleWithStatus.status or None
776  '''
777  status = self.get_multiple_flip_request_status([remote_rule])
778  return status[0]
779 
780  def get_multiple_flip_request_status(self, remote_rules):
781  '''
782  Get the status of multiple flipped registration. If the flip request
783  does not exist (for instance, in the case where this hub was not used
784  to send the request), then None is returned. Multiple requests are
785  batched together for efficiency.
786 
787  @return the flip status, ordered as per the input remote rules
788  @rtype list of gateway_msgs.msg.RemoteRuleWithStatus.status or None
789  '''
790  gateway_specific_rules = {}
791  status = [None] * len(remote_rules)
792  for i, remote_rule in enumerate(remote_rules):
793  if remote_rule.gateway not in gateway_specific_rules:
794  gateway_specific_rules[remote_rule.gateway] = []
795  gateway_specific_rules[remote_rule.gateway].append((i, remote_rule))
796 
797  source_gateway = self._unique_gateway_name # me!
798 
799  for gateway in gateway_specific_rules:
800  key = hub_api.create_rocon_gateway_key(gateway, 'flip_ins')
801  encoded_flips = []
802  try:
803  encoded_flips = self._redis_server.smembers(key)
804  except (redis.ConnectionError, AttributeError) as unused_e:
805  # probably disconnected from the hub
806  pass
807  for flip in encoded_flips:
808  rule_status, source, connection_list = utils.deserialize_request(flip)
809  if source != source_gateway:
810  continue
811  connection = utils.get_connection_from_list(connection_list)
812  # Compare rules only as xmlrpc_uri and type_info are encrypted
813 
814  # print(console.cyan + "Connection Rule: " + console.yellow + "%s-%s" % (connection.rule.type, connection.rule.name))
815  for (index, remote_rule) in gateway_specific_rules[gateway]:
816  # print(console.cyan + "Remote Rule: " + console.yellow + "%s-%s" % (remote_rule.rule.type, remote_rule.rule.name))
817  # Important to consider actions - gateway rules can be actions, but connections on the redis server are only
818  # handled as fundamental types (pub, sub, server), so explode the gateway rule and then check
819  exploded_remote_rules = self.rule_explode([remote_rule])
820 
821  # If the connection (from flips) match one of the remote rules (once exploded for handling action)
822  if len([r for r in exploded_remote_rules if connection.rule == r.rule]) > 0:
823  if status[index] is None:
824  # a pub, sub, service or first connection in an exploded action rule will land here
825  status[index] = rule_status
826  elif status[index] != rule_status:
827  # when another part of an exploded action's status doesn't match the status of formely read
828  # parts, it lands here...need some good exception handling logic to represent the combined group
829  if rule_status == FlipStatus.UNKNOWN:
830  # if something unknown whole action connection is unknown
831  status[index] = rule_status
832  break
833  # RESEND or BLOCKED do not follow basic flow so we want to make it obvious at action level
834  # This might have to be improved to distinguish between blocked and resend
835  if ((status[index] == FlipStatus.PENDING or status[index] == FlipStatus.ACCEPTED) and
836  (rule_status == FlipStatus.BLOCKED or rule_status == FlipStatus.RESEND)
837  ):
838  # print(console.green + " Action Connection w/ unsynchronised components : %s/%s" % (connection.rule.name, remote_rule.rule.name) + console.reset)
839  status[index] = rule_status
840  break
841  break
842  return status
843 
844  def send_flip_request(self, remote_gateway, connection, timeout=15.0):
845  '''
846  Sends a message to the remote gateway via redis pubsub channel. This is called from the
847  watcher thread, when a flip rule gets activated.
848 
849  - redis channel name: rocon:<remote_gateway_name>
850  - data : list of [ command, gateway, rule type, type, xmlrpc_uri ]
851  - [0] - command : in this case 'flip'
852  - [1] - gateway : the name of this gateway, i.e. the flipper
853  - [2] - name : local name
854  - [3] - node : local node name
855  - [4] - connection_type : one of ConnectionType.PUBLISHER etc
856  - [5] - type_info : a ros format type (e.g. std_msgs/String or service api)
857  - [6] - xmlrpc_uri : the xmlrpc node uri
858 
859  @param command : string command name - either 'flip' or 'unflip'
860  @type str
861 
862  @param flip_rule : the flip to send
863  @type gateway_msgs.RemoteRule
864 
865  @param type_info : topic type (e.g. std_msgs/String)
866  @param str
867 
868  @param xmlrpc_uri : the node uri
869  @param str
870  '''
871  key = hub_api.create_rocon_gateway_key(remote_gateway, 'flip_ins')
872  source = hub_api.key_base_name(self._redis_keys['gateway'])
873 
874  # Check if a flip request already exists on the hub
875  if self.get_flip_request_status(RemoteRule(remote_gateway, connection.rule)) is not None:
876  # We remove the old one before creating the new one,
877  # to avoid broken flips from previous gateway instance
878  self._send_unflip_request(remote_gateway, connection.rule)
879 
880  # Encrypt the transmission
881  start_time = time.time()
882  while time.time() - start_time <= timeout:
883  remote_gateway_public_key_str = self._redis_server.get(
884  hub_api.create_rocon_gateway_key(remote_gateway, 'public_key'))
885  if remote_gateway_public_key_str is not None:
886  break
887  if remote_gateway_public_key_str is None:
888  rospy.logerr("Gateway : flip to " + remote_gateway +
889  " failed as public key not found")
890  return False
891 
892  remote_gateway_public_key = utils.deserialize_key(remote_gateway_public_key_str)
893  encrypted_connection = utils.encrypt_connection(connection, remote_gateway_public_key)
894 
895  # Send data
896  serialized_data = utils.serialize_connection_request(
897  FlipStatus.PENDING, source, encrypted_connection)
898  self._redis_server.sadd(key, serialized_data)
899  return True
900 
901  def send_unflip_request(self, remote_gateway, rule):
902  unflipped = True
903  exp_rules = self.rule_explode([rule])
904  for r in exp_rules:
905  unflipped = unflipped and self._send_unflip_request(remote_gateway, r)
906  return unflipped
907 
908  def _send_unflip_request(self, remote_gateway, rule):
909  '''
910  Unflip a previously flipped registration. If the flip request does not
911  exist (for instance, in the case where this hub was not used to send
912  the request), then False is returned
913 
914  @return True if the flip existed and was removed, False otherwise
915  @rtype Boolean
916  '''
917  key = hub_api.create_rocon_gateway_key(remote_gateway, 'flip_ins')
918  # rule.node is two parts (node_name, xmlrpc_uri) - but serialised connection rule is only node name
919  # strip the xmlrpc_uri for comparision tests
920  rule.node = rule.node.split(",")[0]
921  try:
922  encoded_flip_ins = self._redis_server.smembers(key)
923  for flip_in in encoded_flip_ins:
924  unused_status, source, connection_list = utils.deserialize_request(flip_in)
925  connection = utils.get_connection_from_list(connection_list)
926  if source == hub_api.key_base_name(self._redis_keys['gateway']) and rule == connection.rule:
927  self._redis_server.srem(key, flip_in)
928  return True
929  except redis.exceptions.ConnectionError:
930  # usually just means the hub has gone down just before us or is in the
931  # middle of doing so let it die nice and peacefully
932  if not rospy.is_shutdown():
933  rospy.logwarn("Gateway : hub connection error while sending unflip request.")
934  return False
935 
936  #TODO : improve design to not need this
937  def rule_explode(self, rule_list):
938  result_list=[]
939  for rule in rule_list:
940  if isinstance(rule, RemoteRule):
941  asm_rule = rule.rule
942  else:
943  asm_rule = rule
944 
945  exp_rules = []
946  if asm_rule.type == gateway_msgs.ConnectionType.ACTION_CLIENT:
947  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/goal", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
948  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/cancel", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
949  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/feedback", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
950  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/status", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
951  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/result", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
952  elif asm_rule.type == gateway_msgs.ConnectionType.ACTION_SERVER:
953  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/goal", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
954  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/cancel", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
955  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/feedback", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
956  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/status", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
957  exp_rules.append( gateway_msgs.Rule(name=asm_rule.name + "/result", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
958  else: # keep calm, no need to explode anything
959  exp_rules.append(asm_rule)
960 
961  if isinstance(rule, RemoteRule):
962  # if it was a remote rule, we replicate remote rules:
963  exp_rules = [RemoteRule(gateway=rule.gateway, rule=r) for r in exp_rules]
964 
965  result_list += exp_rules
966 
967  return result_list
968 
969  # unused yet. but striving for symmetry would make things easier to understand...
970  def rule_assemble(self, rule_list):
971  result_list=[]
972  for rule in rule_list:
973 
974  if isinstance(rule, RemoteRule):
975  exp_rule = rule.rule
976  else:
977  exp_rule = rule
978 
979  # default (non action) case
980  action_name = exp_rule.name
981  action_client = False
982  action_server = False
983 
984  if exp_rule.name.endswith("/goal") and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
985  action_name = exp_rule.name[:-len("/goal")]
986  action_client = True
987  elif exp_rule.name.endswith("/cancel") and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
988  action_name = exp_rule.name[:-len("/cancel")]
989  action_client = True
990  elif exp_rule.name.endswith("/feedback") and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
991  action_name = exp_rule.name[:-len("/feedback")]
992  action_client = True
993  elif exp_rule.name.endswith("/status") and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
994  action_name = exp_rule.name[:-len("/status")]
995  action_client = True
996  elif exp_rule.name.endswith("/result") and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
997  action_name = exp_rule.name[:-len("/result")]
998  action_client = True
999 
1000  if exp_rule.name.endswith("/goal") and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
1001  action_name = exp_rule.name[:-len("/goal")]
1002  action_server = True
1003  elif exp_rule.name.endswith("/cancel") and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
1004  action_name = exp_rule.name[:-len("/cancel")]
1005  action_server = True
1006  elif exp_rule.name.endswith("/feedback") and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
1007  action_name = exp_rule.name[:-len("/feedback")]
1008  action_server = True
1009  elif exp_rule.name.endswith("/status") and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
1010  action_name = exp_rule.name[:-len("/status")]
1011  action_server = True
1012  elif exp_rule.name.endswith("/result") and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
1013  action_name = exp_rule.name[:-len("/result")]
1014  action_server = True
1015 
1016  result_rule = None
1017  if action_client and len([ a for a in result_list if a.name == action_name and a.type == gateway_msgs.ConnectionType.ACTION_CLIENT and a.node == exp_rule.node]) == 0:
1018  result_rule = gateway_msgs.Rule(name=action_name, type=gateway_msgs.ConnectionType.ACTION_CLIENT, node=exp_rule.node)
1019  elif action_server and len([ a for a in result_list if a.name == action_name and a.type == gateway_msgs.ConnectionType.ACTION_SERVER and a.node == exp_rule.node]) == 0:
1020  result_rule = gateway_msgs.Rule(name=action_name, type=gateway_msgs.ConnectionType.ACTION_SERVER, node=exp_rule.node)
1021  elif not action_client and not action_client: # default case : just include that rule
1022  result_rule = exp_rule
1023 
1024  if result_rule is not None:
1025  if isinstance(rule, RemoteRule):
1026  result_rule = RemoteRule(rule.gateway, result_rule)
1027  result_list.append(result_rule)
1028  # else we skip it
1029 
1030  return result_list
def mark_named_gateway_available(self, gateway_key, available=True, time_since_last_seen=0.0)
Definition: gateway_hub.py:302
def update_named_gateway_latency_stats(self, gateway_name, latency_stats)
Definition: gateway_hub.py:280
def register_gateway(self, firewall, unique_gateway_name, hub_connection_lost_gateway_hook, gateway_ip)
Hub Connections.
Definition: gateway_hub.py:108
def _resend_all_flip_ins(self)
Flip specific communication.
Definition: gateway_hub.py:658
def get_multiple_flip_request_status(self, remote_rules)
Definition: gateway_hub.py:780
def remove_flip_details(self, gateway, name, connection_type, node)
Definition: gateway_hub.py:600
def _send_unflip_request(self, remote_gateway, rule)
Definition: gateway_hub.py:908
def get_flip_request_status(self, remote_rule)
Definition: gateway_hub.py:768
def update_flip_request_status(self, registration_with_status)
Definition: gateway_hub.py:707
def unregister_named_gateway(self, gateway_key)
Definition: gateway_hub.py:257
def send_unflip_request(self, remote_gateway, rule)
Definition: gateway_hub.py:901
def __init__(self, ip, port, whitelist, blacklist)
Definition: gateway_hub.py:80
def matches_remote_gateway_name(self, gateway)
Definition: gateway_hub.py:439
def matches_remote_gateway_basename(self, gateway)
Definition: gateway_hub.py:456
def remote_gateway_info(self, gateway)
Hub Data Retrieval.
Definition: gateway_hub.py:332
def is_named_gateway_registered(self, gateway_key)
Definition: gateway_hub.py:270
def send_flip_request(self, remote_gateway, connection, timeout=15.0)
Definition: gateway_hub.py:844
def publish_network_statistics(self, statistics)
Definition: gateway_hub.py:214
def rule_assemble(self, rule_list)
Definition: gateway_hub.py:970
def unadvertise(self, connection)
Definition: gateway_hub.py:570
def get_remote_gateway_firewall_flag(self, gateway)
Definition: gateway_hub.py:492
def __init__(self, ip, port, hub_connection_lost_hook)
Definition: gateway_hub.py:40
def get_remote_connection_state(self, remote_gateway)
Definition: gateway_hub.py:470
def update_multiple_flip_request_status(self, registrations_with_status)
Definition: gateway_hub.py:723
def advertise(self, connection)
Posting Information to the Hub.
Definition: gateway_hub.py:553
def post_flip_details(self, gateway, name, connection_type, node)
Definition: gateway_hub.py:582
def rule_explode(self, rule_list)
Definition: gateway_hub.py:937
def post_pull_details(self, gateway, name, connection_type, node)
Definition: gateway_hub.py:618
def remove_pull_details(self, gateway, name, connection_type, node)
Definition: gateway_hub.py:636


rocon_gateway
Author(s): Daniel Stonier , Jihoon Lee , Piyush Khandelwal
autogenerated on Mon Jun 10 2019 14:40:10