gateway_hub.py
Go to the documentation of this file.
00001 #
00002 # License: BSD
00003 #
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/license/LICENSE
00005 #
00006 ###############################################################################
00007 # Imports
00008 ###############################################################################
00009 
00010 import threading
00011 import rospy
00012 import re
00013 import utils
00014 from gateway_msgs.msg import RemoteRuleWithStatus as FlipStatus, RemoteRule
00015 import gateway_msgs.msg as gateway_msgs
00016 import rocon_python_comms
00017 import rocon_python_utils
00018 import rocon_gateway_utils
00019 import rocon_hub_client
00020 import rocon_python_redis as redis
00021 import time
00022 from rocon_hub_client import hub_api, hub_client
00023 from rocon_hub_client.exceptions import HubConnectionLostError, \
00024     HubNameNotFoundError, HubNotFoundError
00025 
00026 from .exceptions import GatewayUnavailableError
00027 
00028 ###############################################################################
00029 # Redis Connection Checker
00030 ##############################################################################
00031 
00032 
00033 class HubConnectionCheckerThread(threading.Thread):
00034 
00035     '''
00036       Pings redis periodically to figure out if redis is still alive.
00037     '''
00038 
00039     def __init__(self, ip, port, hub_connection_lost_hook):
00040         threading.Thread.__init__(self)
00041         self.daemon = True  # clean shut down of thread when hub connection is lost
00042         self.ping_frequency = 0.2  # Too spammy? # TODO Need to parametrize
00043         self._hub_connection_lost_hook = hub_connection_lost_hook
00044         self.ip = ip
00045         self.port = port
00046         self.pinger = rocon_python_utils.network.Pinger(self.ip, self.ping_frequency)
00047 
00048     def get_latency(self):
00049         return self.pinger.get_latency()
00050 
00051     def run(self):
00052         # This runs in the background to gather the latest connection statistics
00053         # Note - it's not used in the keep alive check
00054         self.pinger.start()
00055         rate = rocon_python_comms.WallRate(self.ping_frequency)
00056         alive = True
00057         timeout = 1 / self.ping_frequency
00058         while alive:
00059             alive, message = hub_client.ping_hub(self.ip, self.port, timeout)
00060             rate.sleep()
00061         rospy.logwarn("Gatew Hub pinger : %s"%message)
00062         self._hub_connection_lost_hook()
00063 
00064 ##############################################################################
00065 # Hub
00066 ##############################################################################
00067 
00068 
00069 class GatewayHub(rocon_hub_client.Hub):
00070 
00071     def __init__(self, ip, port, whitelist, blacklist):
00072         '''
00073           @param remote_gateway_request_callbacks : to handle redis responses
00074           @type list of function pointers (back to GatewaySync class
00075 
00076           @param ip : redis server ip
00077           @param port : redis server port
00078 
00079           @raise HubNameNotFoundError, HubNotFoundError
00080         '''
00081         try:
00082             super(GatewayHub, self).__init__(ip, port, whitelist, blacklist)  # can just do super() in python3
00083         except HubNotFoundError:
00084             raise
00085         except HubNameNotFoundError:
00086             raise
00087         self._hub_connection_lost_gateway_hook = None
00088         self._firewall = 0
00089 
00090         # Setting up some basic parameters in-case we use this API without registering a gateway
00091         self._redis_keys['gatewaylist'] = hub_api.create_rocon_hub_key('gatewaylist')
00092         self._unique_gateway_name = ''
00093 
00094     ##########################################################################
00095     # Hub Connections
00096     ##########################################################################
00097 
00098     def register_gateway(self, firewall, unique_gateway_name, hub_connection_lost_gateway_hook, gateway_ip):
00099         '''
00100           Register a gateway with the hub.
00101 
00102           @param firewall
00103           @param unique_gateway_name
00104           @param hub_connection_lost_gateway_hook : used to trigger Gateway.disengage_hub(hub)
00105                  on lost hub connections in redis pubsub listener thread.
00106           @gateway_ip
00107 
00108           @raise HubConnectionLostError if for some reason, the redis server has become unavailable.
00109         '''
00110         if not self._redis_server:
00111             raise HubConnectionLostError()
00112         self._unique_gateway_name = unique_gateway_name
00113         self._redis_keys['gateway'] = hub_api.create_rocon_key(unique_gateway_name)
00114         self._redis_keys['firewall'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'firewall')
00115         self._firewall = 1 if firewall else 0
00116         self._hub_connection_lost_gateway_hook = hub_connection_lost_gateway_hook
00117         if not self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway']):
00118             # should never get here - unique should be unique
00119             pass
00120         self.mark_named_gateway_available(self._redis_keys['gateway'])
00121         self._redis_server.set(self._redis_keys['firewall'], self._firewall)
00122         # I think we just used this for debugging, but we might want to hide it in
00123         # future (it's the ros master hostname/ip)
00124         self._redis_keys['ip'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'ip')
00125         self._redis_server.set(self._redis_keys['ip'], gateway_ip)
00126 
00127         self.private_key, public_key = utils.generate_private_public_key()
00128         self._redis_keys['public_key'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'public_key')
00129         old_key = self._redis_server.get(self._redis_keys['public_key'])
00130         serialized_public_key = utils.serialize_key(public_key)
00131         self._redis_server.set(self._redis_keys['public_key'], serialized_public_key)
00132         if serialized_public_key != old_key:
00133             rospy.loginfo('Gateway : Found existing mismatched public key on the hub. ' +
00134                           'Requesting resend for all flip-ins.')
00135             self._resend_all_flip_ins()
00136 
00137         # Mark this gateway as now available
00138         self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
00139         self.hub_connection_checker_thread = HubConnectionCheckerThread(
00140             self.ip, self.port, self._hub_connection_lost_hook)
00141         self.hub_connection_checker_thread.start()
00142         self.connection_lost_lock = threading.Lock()
00143 
00144         # Let hub know we are alive
00145         ping_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, ':ping')
00146         self._redis_server.set(ping_key, True)
00147         self._redis_server.expire(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL)
00148 
00149     def _hub_connection_lost_hook(self):
00150         '''
00151           This gets triggered by the redis connection checker thread when the hub connection is lost.
00152           The trigger is passed to the gateway who needs to remove the hub.
00153         '''
00154         self.connection_lost_lock.acquire()
00155         # should probably have a try: except AttributeError here as the following is not atomic.
00156         if self._hub_connection_lost_gateway_hook is not None:
00157             rospy.loginfo("Gateway : lost connection with hub, attempting to disconnect...")
00158             self._hub_connection_lost_gateway_hook(self)
00159             self._hub_connection_lost_gateway_hook = None
00160         self.connection_lost_lock.release()
00161 
00162     def unregister_gateway(self):
00163         '''
00164           Remove all gateway info from the hub.
00165 
00166           @return: success or failure of the operation
00167           @rtype: bool
00168         '''
00169         try:
00170             self.unregister_named_gateway(self._redis_keys['gateway'])
00171         except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
00172             # usually just means the hub has gone down just before us or is in the
00173             # middle of doing so let it die nice and peacefully
00174             # rospy.logwarn("Gateway : problem unregistering from the hub " +
00175             #               "(likely that hub shutdown before the gateway).")
00176             pass
00177         # should we not also shut down self.remote_gatew
00178         rospy.loginfo("Gateway : unregistered from the hub [%s]" % self.name)
00179 
00180     def publish_network_statistics(self, statistics):
00181         '''
00182           Publish network interface information to the hub
00183 
00184           @param statistics
00185           @type gateway_msgs.RemoteGateway
00186         '''
00187         try:
00188             # this should probably be posted independently  of whether the hub is contactable or not
00189             # refer to https://github.com/robotics-in-concert/rocon_multimaster/pull/273/files#diff-22b726fec736c73a96fd98c957d9de1aL189
00190             if not statistics.network_info_available:
00191                 rospy.logdebug("Gateway : unable to publish network statistics [network info unavailable]")
00192                 return
00193             network_info_available = hub_api.create_rocon_gateway_key(
00194                 self._unique_gateway_name, 'network:info_available')
00195             self._redis_server.set(network_info_available, statistics.network_info_available)
00196             network_type = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'network:type')
00197             self._redis_server.set(network_type, statistics.network_type)
00198             # Let hub know that we are alive - even for wired connections. Perhaps something can
00199             # go wrong for them too, though no idea what. Anyway, writing one entry is low cost
00200             # and it makes the logic easier on the hub side.
00201             ping_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, ':ping')
00202             self._redis_server.set(ping_key, True)
00203             self._redis_server.expire(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL)
00204             # Update latency statistics
00205             latency = self.hub_connection_checker_thread.get_latency()
00206             self.update_named_gateway_latency_stats(self._unique_gateway_name, latency)
00207             # If wired, don't worry about wireless statistics.
00208             if statistics.network_type == gateway_msgs.RemoteGateway.WIRED:
00209                 return
00210             wireless_bitrate_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:bitrate')
00211             self._redis_server.set(wireless_bitrate_key, statistics.wireless_bitrate)
00212             wireless_link_quality = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:quality')
00213             self._redis_server.set(wireless_link_quality, statistics.wireless_link_quality)
00214             wireless_signal_level = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:signal_level')
00215             self._redis_server.set(wireless_signal_level, statistics.wireless_signal_level)
00216             wireless_noise_level = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:noise_level')
00217             self._redis_server.set(wireless_noise_level, statistics.wireless_noise_level)
00218         except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
00219             rospy.logerr("Gateway : unable to publish network statistics [no connection to the hub]")
00220 
00221     def unregister_named_gateway(self, gateway_key):
00222         '''
00223           Remove all gateway info for given gateway key from the hub.
00224         '''
00225         try:
00226             gateway_keys = self._redis_server.keys(gateway_key + ":*")
00227             pipe = self._redis_server.pipeline()
00228             pipe.delete(*gateway_keys)
00229             pipe.srem(self._redis_keys['gatewaylist'], gateway_key)
00230             pipe.execute()
00231         except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
00232             pass
00233 
00234     def update_named_gateway_latency_stats(self, gateway_name, latency_stats):
00235         '''
00236           For a given gateway, update the latency statistics
00237 
00238           #param gateway_name : gateway name, not the redis key
00239           @type str
00240           @param latency_stats : ping statistics to the gateway from the hub
00241           @type list : 4-tuple of float values [min, avg, max, mean deviation]
00242         '''
00243         try:
00244             min_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:min')
00245             avg_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:avg')
00246             max_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:max')
00247             mdev_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:mdev')
00248             self._redis_server.set(min_latency_key, latency_stats[0])
00249             self._redis_server.set(avg_latency_key, latency_stats[1])
00250             self._redis_server.set(max_latency_key, latency_stats[2])
00251             self._redis_server.set(mdev_latency_key, latency_stats[3])
00252         except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
00253             rospy.logerr("Gateway: unable to update latency stats for " + gateway_name)
00254 
00255     def mark_named_gateway_available(self, gateway_key, available=True,
00256                                      time_since_last_seen=0.0):
00257         '''
00258           This function is used by the hub to mark if a gateway can be pinged.
00259           If a gateway cannot be pinged, the hub indicates how longs has it been
00260           since the hub was last seen
00261 
00262           @param gateway_key : The gateway key (not the name)
00263           @type str
00264           @param available: If the gateway can be pinged right now
00265           @type bool
00266           @param time_since_last_seen: If available is false, how long has it
00267                  been since the gateway was last seen (in seconds)
00268           @type float
00269         '''
00270         available_key = gateway_key + ":available"
00271         self._redis_server.set(available_key, available)
00272         time_since_last_seen_key = gateway_key + ":time_since_last_seen"
00273         self._redis_server.set(time_since_last_seen_key, int(time_since_last_seen))
00274 
00275     ##########################################################################
00276     # Hub Data Retrieval
00277     ##########################################################################
00278 
00279     def remote_gateway_info(self, gateway):
00280         '''
00281           Return remote gateway information for the specified gateway string id.
00282 
00283           @param gateways : gateway id string to search for
00284           @type string
00285           @return remote gateway information
00286           @rtype gateway_msgs.RemotGateway or None
00287         '''
00288         firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall'))
00289         if firewall is None:
00290             return None  # equivalent to saying no gateway of this id found
00291         ip = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'ip'))
00292         if ip is None:
00293             return None  # hub information not available/correct
00294         remote_gateway = gateway_msgs.RemoteGateway()
00295         remote_gateway.name = gateway
00296         remote_gateway.ip = ip
00297         remote_gateway.firewall = True if int(firewall) else False
00298         remote_gateway.public_interface = []
00299         encoded_advertisements = self._redis_server.smembers(
00300             hub_api.create_rocon_gateway_key(gateway, 'advertisements'))
00301         for encoded_advertisement in encoded_advertisements:
00302             advertisement = utils.deserialize_connection(encoded_advertisement)
00303             remote_gateway.public_interface.append(advertisement.rule)
00304         remote_gateway.flipped_interface = []
00305         encoded_flips = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'flips'))
00306         for encoded_flip in encoded_flips:
00307             [target_gateway, name, connection_type, node] = utils.deserialize(encoded_flip)
00308             remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
00309             remote_gateway.flipped_interface.append(remote_rule)
00310         remote_gateway.pulled_interface = []
00311         encoded_pulls = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'pulls'))
00312         for encoded_pull in encoded_pulls:
00313             [target_gateway, name, connection_type, node] = utils.deserialize(encoded_pull)
00314             remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
00315             remote_gateway.pulled_interface.append(remote_rule)
00316 
00317         # Gateway health/network connection statistics indicators
00318         gateway_available_key = hub_api.create_rocon_gateway_key(gateway, 'available')
00319         remote_gateway.conn_stats.gateway_available = \
00320             self._parse_redis_bool(self._redis_server.get(gateway_available_key))
00321         time_since_last_seen_key = hub_api.create_rocon_gateway_key(gateway, 'time_since_last_seen')
00322         remote_gateway.conn_stats.time_since_last_seen = \
00323             self._parse_redis_int(self._redis_server.get(time_since_last_seen_key))
00324 
00325         ping_latency_min_key = hub_api.create_rocon_gateway_key(gateway, 'latency:min')
00326         remote_gateway.conn_stats.ping_latency_min = \
00327             self._parse_redis_float(self._redis_server.get(ping_latency_min_key))
00328         ping_latency_max_key = hub_api.create_rocon_gateway_key(gateway, 'latency:max')
00329         remote_gateway.conn_stats.ping_latency_max = \
00330             self._parse_redis_float(self._redis_server.get(ping_latency_max_key))
00331         ping_latency_avg_key = hub_api.create_rocon_gateway_key(gateway, 'latency:avg')
00332         remote_gateway.conn_stats.ping_latency_avg = \
00333             self._parse_redis_float(self._redis_server.get(ping_latency_avg_key))
00334         ping_latency_mdev_key = hub_api.create_rocon_gateway_key(gateway, 'latency:mdev')
00335         remote_gateway.conn_stats.ping_latency_mdev = \
00336             self._parse_redis_float(self._redis_server.get(ping_latency_mdev_key))
00337 
00338         # Gateway network connection indicators
00339         network_info_available_key = hub_api.create_rocon_gateway_key(gateway, 'network:info_available')
00340         remote_gateway.conn_stats.network_info_available = \
00341             self._parse_redis_bool(self._redis_server.get(network_info_available_key))
00342         if not remote_gateway.conn_stats.network_info_available:
00343             return remote_gateway
00344         network_type_key = hub_api.create_rocon_gateway_key(gateway, 'network:type')
00345         remote_gateway.conn_stats.network_type = \
00346             self._parse_redis_int(self._redis_server.get(network_type_key))
00347         if remote_gateway.conn_stats.network_type == gateway_msgs.RemoteGateway.WIRED:
00348             return remote_gateway
00349         wireless_bitrate_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:bitrate')
00350         remote_gateway.conn_stats.wireless_bitrate = \
00351             self._parse_redis_float(self._redis_server.get(wireless_bitrate_key))
00352         wireless_link_quality_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:quality')
00353         remote_gateway.conn_stats.wireless_link_quality = \
00354             self._parse_redis_int(self._redis_server.get(wireless_link_quality_key))
00355         wireless_signal_level_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:signal_level')
00356         remote_gateway.conn_stats.wireless_signal_level = \
00357             self._parse_redis_float(self._redis_server.get(wireless_signal_level_key))
00358         wireless_noise_level_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:noise_level')
00359         remote_gateway.conn_stats.wireless_noise_level = \
00360             self._parse_redis_float(self._redis_server.get(wireless_noise_level_key))
00361         return remote_gateway
00362 
00363     def list_remote_gateway_names(self):
00364         '''
00365           Return a list of the gateways (name list, not redis keys).
00366           e.g. ['gateway32adcda32','pirate21fasdf']. If not connected, just
00367           returns an empty list.
00368         '''
00369         if not self._redis_server:
00370             rospy.logerr("Gateway : cannot retrieve remote gateway names [%s][%s]." % (self.name, self.uri))
00371             return []
00372         gateways = []
00373         try:
00374             gateway_keys = self._redis_server.smembers(self._redis_keys['gatewaylist'])
00375             for gateway in gateway_keys:
00376                 if hub_api.key_base_name(gateway) != self._unique_gateway_name:
00377                     gateways.append(hub_api.key_base_name(gateway))
00378         except (redis.ConnectionError, AttributeError) as unused_e:
00379             # redis misbehaves a little here, sometimes it doesn't catch a disconnection properly
00380             # see https://github.com/robotics-in-concert/rocon_multimaster/issues/251 so it
00381             # pops up as an AttributeError
00382             pass
00383         return gateways
00384 
00385     def matches_remote_gateway_name(self, gateway):
00386         '''
00387           Use this when gateway can be a regular expression and
00388           we need to check it off against list_remote_gateway_names()
00389 
00390           @return a list of matches (higher level decides on action for duplicates).
00391           @rtype list[str] : list of remote gateway names.
00392         '''
00393         matches = []
00394         try:
00395             for remote_gateway in self.list_remote_gateway_names():
00396                 if re.match(gateway, remote_gateway):
00397                     matches.append(remote_gateway)
00398         except HubConnectionLostError:
00399             raise
00400         return matches
00401 
00402     def matches_remote_gateway_basename(self, gateway):
00403         '''
00404           Use this when gateway can be a regular expression and
00405           we need to check it off against list_remote_gateway_names()
00406         '''
00407         weak_matches = []
00408         try:
00409             for remote_gateway in self.list_remote_gateway_names():
00410                 if re.match(gateway, rocon_gateway_utils.gateway_basename(remote_gateway)):
00411                     weak_matches.append(remote_gateway)
00412         except HubConnectionLostError:
00413             raise
00414         return weak_matches
00415 
00416     def get_remote_connection_state(self, remote_gateway):
00417         '''
00418           Equivalent to get_connection_state, but generates it from the public
00419           interface of a remote gateway
00420 
00421           @param remote_gateway : hash name for a remote gateway
00422           @type str
00423           @return dictionary of remote advertisements
00424           @rtype dictionary of connection type keyed connection values
00425        '''
00426         connections = utils.create_empty_connection_type_dictionary()
00427         key = hub_api.create_rocon_gateway_key(remote_gateway, 'advertisements')
00428         try:
00429             public_interface = self._redis_server.smembers(key)
00430             for connection_str in public_interface:
00431                 connection = utils.deserialize_connection(connection_str)
00432                 connections[connection.rule.type].append(connection)
00433         except redis.exceptions.ConnectionError:
00434             # will arrive here if the hub happens to have been lost last update and arriving here
00435             pass
00436         return connections
00437 
00438     def get_remote_gateway_firewall_flag(self, gateway):
00439         '''
00440           Returns the value of the remote gateway's firewall (flip)
00441           flag.
00442 
00443           @param gateway : gateway string id
00444           @param string
00445 
00446           @return state of the flag
00447           @rtype Bool
00448 
00449           @raise GatewayUnavailableError when specified gateway is not on the hub
00450         '''
00451         firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall'))
00452         if firewall is not None:
00453             return True if int(firewall) else False
00454         else:
00455             raise GatewayUnavailableError
00456 
00457     def get_local_advertisements(self):
00458         '''
00459           Retrieves the local list of advertisements from the hub. This
00460           gets used to sync across multiple hubs.
00461 
00462           @return dictionary of remote advertisements
00463           @rtype dictionary of connection type keyed connection values
00464        '''
00465         connections = utils.create_empty_connection_type_dictionary()
00466         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
00467         public_interface = self._redis_server.smembers(key)
00468         for connection_str in public_interface:
00469             connection = utils.deserialize_connection(connection_str)
00470             connections[connection.rule.type].append(connection)
00471         return connections
00472 
00473     def _parse_redis_float(self, val):
00474         if val:
00475             return float(val)
00476         else:
00477             return 0.0
00478 
00479     def _parse_redis_int(self, val):
00480         if val:
00481             return int(val)
00482         else:
00483             return 0.0
00484 
00485     def _parse_redis_bool(self, val):
00486         if val and (val == 'True'):
00487             return True
00488         else:
00489             return False
00490 
00491     ##########################################################################
00492     # Posting Information to the Hub
00493     ##########################################################################
00494 
00495     def advertise(self, connection):
00496         '''
00497           Places a topic, service or action on the public interface. On the
00498           redis server, this representation will always be:
00499 
00500            - topic : a triple { name, type, xmlrpc node uri }
00501            - service : a triple { name, rosrpc uri, xmlrpc node uri }
00502            - action : ???
00503 
00504           @param connection: representation of a connection (topic, service, action)
00505           @type  connection: str
00506           @raise .exceptions.ConnectionTypeError: if connection arg is invalid.
00507         '''
00508         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
00509         msg_str = utils.serialize_connection(connection)
00510         self._redis_server.sadd(key, msg_str)
00511 
00512     def unadvertise(self, connection):
00513         '''
00514           Removes a topic, service or action from the public interface.
00515 
00516           @param connection: representation of a connection (topic, service, action)
00517           @type  connection: str
00518           @raise .exceptions.ConnectionTypeError: if connectionarg is invalid.
00519         '''
00520         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
00521         msg_str = utils.serialize_connection(connection)
00522         self._redis_server.srem(key, msg_str)
00523 
00524     def post_flip_details(self, gateway, name, connection_type, node):
00525         '''
00526           Post flip details to the redis server. This has no actual functionality,
00527           it is just useful for debugging with the remote_gateway_info service.
00528 
00529           @param gateway : the target of the flip
00530           @type string
00531           @param name : the name of the connection
00532           @type string
00533           @param type : the type of the connection (one of ConnectionType.xxx
00534           @type string
00535           @param node : the node name it was pulled from
00536           @type string
00537         '''
00538         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips')
00539         serialized_data = utils.serialize([gateway, name, connection_type, node])
00540         self._redis_server.sadd(key, serialized_data)
00541 
00542     def remove_flip_details(self, gateway, name, connection_type, node):
00543         '''
00544           Post flip details to the redis server. This has no actual functionality,
00545           it is just useful for debugging with the remote_gateway_info service.
00546 
00547           @param gateway : the target of the flip
00548           @type string
00549           @param name : the name of the connection
00550           @type string
00551           @param type : the type of the connection (one of ConnectionType.xxx
00552           @type string
00553           @param node : the node name it was pulled from
00554           @type string
00555         '''
00556         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips')
00557         serialized_data = utils.serialize([gateway, name, connection_type, node])
00558         self._redis_server.srem(key, serialized_data)
00559 
00560     def post_pull_details(self, gateway, name, connection_type, node):
00561         '''
00562           Post pull details to the hub. This has no actual functionality,
00563           it is just useful for debugging with the remote_gateway_info service.
00564 
00565           @param gateway : the gateway it is pulling from
00566           @type string
00567           @param name : the name of the connection
00568           @type string
00569           @param type : the type of the connection (one of ConnectionType.xxx
00570           @type string
00571           @param node : the node name it was pulled from
00572           @type string
00573         '''
00574         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls')
00575         serialized_data = utils.serialize([gateway, name, connection_type, node])
00576         self._redis_server.sadd(key, serialized_data)
00577 
00578     def remove_pull_details(self, gateway, name, connection_type, node):
00579         '''
00580           Post pull details to the hub. This has no actual functionality,
00581           it is just useful for debugging with the remote_gateway_info service.
00582 
00583           @param gateway : the gateway it was pulling from
00584           @type string
00585           @param name : the name of the connection
00586           @type string
00587           @param type : the type of the connection (one of ConnectionType.xxx
00588           @type string
00589           @param node : the node name it was pulled from
00590           @type string
00591         '''
00592         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls')
00593         serialized_data = utils.serialize([gateway, name, connection_type, node])
00594         self._redis_server.srem(key, serialized_data)
00595 
00596     ##########################################################################
00597     # Flip specific communication
00598     ##########################################################################
00599 
00600     def _resend_all_flip_ins(self):
00601         '''
00602           Marks all flip ins to be resent. Until these flips are resent, they
00603           will not be processed
00604         '''
00605         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
00606         encoded_flip_ins = []
00607         try:
00608             encoded_flip_ins = self._redis_server.smembers(key)
00609             self._redis_server.delete(key)
00610             for flip_in in encoded_flip_ins:
00611                 status, source, connection_list = utils.deserialize_request(flip_in)
00612                 connection = utils.get_connection_from_list(connection_list)
00613                 status = FlipStatus.RESEND
00614                 serialized_data = utils.serialize_connection_request(status,
00615                                                                      source,
00616                                                                      connection)
00617                 self._redis_server.sadd(key, serialized_data)
00618         except (redis.ConnectionError, AttributeError) as unused_e:
00619             # probably disconnected from the hub
00620             pass
00621 
00622     def get_unblocked_flipped_in_connections(self):
00623         '''
00624           Gets all the flipped in connections listed on the hub that are interesting
00625           for this gateway (i.e. all unblocked/pending). This is used by the
00626           watcher loop to work out how it needs to update the local registrations.
00627 
00628           :returns: the flipped in registration strings and status.
00629           :rtype: list of (utils.Registration, FlipStatus.XXX) tuples.
00630         '''
00631         registrations = []
00632         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
00633         encoded_flip_ins = []
00634         try:
00635             encoded_flip_ins = self._redis_server.smembers(key)
00636         except (redis.ConnectionError, AttributeError) as unused_e:
00637             # probably disconnected from the hub
00638             pass
00639         for flip_in in encoded_flip_ins:
00640             status, source, connection_list = utils.deserialize_request(flip_in)
00641             if source not in self.list_remote_gateway_names():
00642                 continue
00643             connection = utils.get_connection_from_list(connection_list)
00644             connection = utils.decrypt_connection(connection, self.private_key)
00645             if status != FlipStatus.BLOCKED and status != FlipStatus.RESEND:
00646                 registrations.append((utils.Registration(connection, source), status))
00647         return registrations
00648 
00649     def update_flip_request_status(self, registration_with_status):
00650         '''
00651           Updates the flip request status for this hub
00652 
00653           @param registration_with_status : the flip registration for which we are updating status
00654           @type (utils.Registration, str) where str is the status
00655 
00656           @param status : pending/accepted/blocked
00657           @type same as gateway_msgs.msg.RemoteRuleWithStatus.status
00658 
00659           @return True if this hub was used to send the flip request, and the status was updated. False otherwise.
00660           @rtype Boolean
00661         '''
00662         result = self.update_multiple_flip_request_status([registration_with_status])
00663         return result[0]
00664 
00665     def update_multiple_flip_request_status(self, registrations_with_status):
00666         '''
00667           Updates the flip request status for multiple registrations on this hub
00668 
00669           @param registrations_with_status : the flip registration for which we are updating status
00670           @type list of (utils.Registration, str) where str is the status
00671 
00672           @param status : pending/accepted/blocked
00673           @type same as gateway_msgs.msg.RemoteRuleWithStatus.status
00674 
00675           @return True if this hub was used to send the flip request, false otherwise.
00676           @rtype Boolean
00677         '''
00678         result = [False] * len(registrations_with_status)
00679         update_registrations = []
00680         hub_found = False
00681         key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
00682         try:
00683             encoded_flip_ins = self._redis_server.smembers(key)
00684             for flip_in in encoded_flip_ins:
00685                 old_status, source, connection_list = utils.deserialize_request(flip_in)
00686                 connection = utils.get_connection_from_list(connection_list)
00687                 connection = utils.decrypt_connection(connection, self.private_key)
00688                 for index, (registration, new_status) in enumerate(registrations_with_status):
00689                     if source == registration.remote_gateway and connection == registration.connection:
00690                         if new_status != old_status:
00691                             self._redis_server.srem(key, flip_in)
00692                             update_registrations.append((index, (registration, new_status)))
00693                         else:
00694                             result[index] = True
00695 
00696             for (index, (registration, new_status)) in update_registrations:
00697                 encrypted_connection = utils.encrypt_connection(registration.connection,
00698                                                                 self.private_key)
00699                 serialized_data = utils.serialize_connection_request(new_status,
00700                                                                      registration.remote_gateway,
00701                                                                      encrypted_connection)
00702                 self._redis_server.sadd(key, serialized_data)
00703                 result[index] = True
00704         except redis.exceptions.ConnectionError:
00705             # Means the hub has gone down (typically on shutdown so just be quiet)
00706             # If we really need to know that a hub is crashed, change this policy
00707             pass
00708         return result
00709 
00710     def get_flip_request_status(self, remote_rule):
00711         '''
00712           Get the status of a flipped registration. If the flip request does not
00713           exist (for instance, in the case where this hub was not used to send
00714           the request), then None is returned
00715 
00716           @return the flip status or None
00717           @rtype same as gateway_msgs.msg.RemoteRuleWithStatus.status or None
00718         '''
00719         status = self.get_multiple_flip_request_status([remote_rule])
00720         return status[0]
00721 
00722     def get_multiple_flip_request_status(self, remote_rules):
00723         '''
00724           Get the status of multiple flipped registration. If the flip request
00725           does not exist (for instance, in the case where this hub was not used
00726           to send the request), then None is returned. Multiple requests are
00727           batched together for efficiency.
00728 
00729           @return the flip status, ordered as per the input remote rules
00730           @rtype list of gateway_msgs.msg.RemoteRuleWithStatus.status or None
00731         '''
00732         gateway_specific_rules = {}
00733         status = [None] * len(remote_rules)
00734         for i, remote_rule in enumerate(remote_rules):
00735             if remote_rule.gateway not in gateway_specific_rules:
00736                 gateway_specific_rules[remote_rule.gateway] = []
00737             gateway_specific_rules[remote_rule.gateway].append((i, remote_rule))
00738 
00739         source_gateway = self._unique_gateway_name  # me!
00740 
00741         for gateway in gateway_specific_rules:
00742             key = hub_api.create_rocon_gateway_key(gateway, 'flip_ins')
00743             encoded_flips = []
00744             try:
00745                 encoded_flips = self._redis_server.smembers(key)
00746             except (redis.ConnectionError, AttributeError) as unused_e:
00747                 # probably disconnected from the hub
00748                 pass
00749             for flip in encoded_flips:
00750                 rule_status, source, connection_list = utils.deserialize_request(flip)
00751                 if source != source_gateway:
00752                     continue
00753                 connection = utils.get_connection_from_list(connection_list)
00754                 # Compare rules only as xmlrpc_uri and type_info are encrypted
00755                 for (index, remote_rule) in gateway_specific_rules[gateway]:
00756                     if connection.rule == remote_rule.rule:
00757                         status[index] = rule_status
00758                         break
00759         return status
00760 
00761     def send_flip_request(self, remote_gateway, connection, timeout=15.0):
00762         '''
00763           Sends a message to the remote gateway via redis pubsub channel. This is called from the
00764           watcher thread, when a flip rule gets activated.
00765 
00766            - redis channel name: rocon:<remote_gateway_name>
00767            - data : list of [ command, gateway, rule type, type, xmlrpc_uri ]
00768             - [0] - command       : in this case 'flip'
00769             - [1] - gateway       : the name of this gateway, i.e. the flipper
00770             - [2] - name          : local name
00771             - [3] - node          : local node name
00772             - [4] - connection_type : one of ConnectionType.PUBLISHER etc
00773             - [5] - type_info     : a ros format type (e.g. std_msgs/String or service api)
00774             - [6] - xmlrpc_uri    : the xmlrpc node uri
00775 
00776           @param command : string command name - either 'flip' or 'unflip'
00777           @type str
00778 
00779           @param flip_rule : the flip to send
00780           @type gateway_msgs.RemoteRule
00781 
00782           @param type_info : topic type (e.g. std_msgs/String)
00783           @param str
00784 
00785           @param xmlrpc_uri : the node uri
00786           @param str
00787         '''
00788         key = hub_api.create_rocon_gateway_key(remote_gateway, 'flip_ins')
00789         source = hub_api.key_base_name(self._redis_keys['gateway'])
00790 
00791         # Check if a flip request already exists on the hub
00792         if self.get_flip_request_status(RemoteRule(remote_gateway, connection.rule)) is not None:
00793             return True
00794 
00795         # Encrypt the transmission
00796         start_time = time.time()
00797         while time.time() - start_time <= timeout:
00798             remote_gateway_public_key_str = self._redis_server.get(
00799                 hub_api.create_rocon_gateway_key(remote_gateway, 'public_key'))
00800             if remote_gateway_public_key_str is not None:
00801                 break
00802         if remote_gateway_public_key_str is None:
00803             rospy.logerr("Gateway : flip to " + remote_gateway +
00804                          " failed as public key not found")
00805             return False
00806 
00807         remote_gateway_public_key = utils.deserialize_key(remote_gateway_public_key_str)
00808         encrypted_connection = utils.encrypt_connection(connection, remote_gateway_public_key)
00809 
00810         # Send data
00811         serialized_data = utils.serialize_connection_request(
00812             FlipStatus.PENDING, source, encrypted_connection)
00813         self._redis_server.sadd(key, serialized_data)
00814         return True
00815 
00816     def send_unflip_request(self, remote_gateway, rule):
00817         if rule.type == gateway_msgs.ConnectionType.ACTION_CLIENT:
00818             action_name = rule.name
00819             rule.type = gateway_msgs.ConnectionType.PUBLISHER
00820             rule.name = action_name + "/goal"
00821             self._send_unflip_request(remote_gateway, rule)
00822             rule.name = action_name + "/cancel"
00823             self._send_unflip_request(remote_gateway, rule)
00824             rule.type = gateway_msgs.ConnectionType.SUBSCRIBER
00825             rule.name = action_name + "/feedback"
00826             self._send_unflip_request(remote_gateway, rule)
00827             rule.name = action_name + "/status"
00828             self._send_unflip_request(remote_gateway, rule)
00829             rule.name = action_name + "/result"
00830             self._send_unflip_request(remote_gateway, rule)
00831         elif rule.type == gateway_msgs.ConnectionType.ACTION_SERVER:
00832             action_name = rule.name
00833             rule.type = gateway_msgs.ConnectionType.SUBSCRIBER
00834             rule.name = action_name + "/goal"
00835             self._send_unflip_request(remote_gateway, rule)
00836             rule.name = action_name + "/cancel"
00837             self._send_unflip_request(remote_gateway, rule)
00838             rule.type = gateway_msgs.ConnectionType.PUBLISHER
00839             rule.name = action_name + "/feedback"
00840             self._send_unflip_request(remote_gateway, rule)
00841             rule.name = action_name + "/status"
00842             self._send_unflip_request(remote_gateway, rule)
00843             rule.name = action_name + "/result"
00844             self._send_unflip_request(remote_gateway, rule)
00845         else:
00846             self._send_unflip_request(remote_gateway, rule)
00847 
00848     def _send_unflip_request(self, remote_gateway, rule):
00849         '''
00850           Unflip a previously flipped registration. If the flip request does not
00851           exist (for instance, in the case where this hub was not used to send
00852           the request), then False is returned
00853 
00854           @return True if the flip existed and was removed, False otherwise
00855           @rtype Boolean
00856         '''
00857         key = hub_api.create_rocon_gateway_key(remote_gateway, 'flip_ins')
00858         try:
00859             encoded_flip_ins = self._redis_server.smembers(key)
00860             for flip_in in encoded_flip_ins:
00861                 unused_status, source, connection_list = utils.deserialize_request(flip_in)
00862                 connection = utils.get_connection_from_list(connection_list)
00863                 if source == hub_api.key_base_name(self._redis_keys['gateway']) and \
00864                    rule == connection.rule:
00865                     self._redis_server.srem(key, flip_in)
00866                     return True
00867         except redis.exceptions.ConnectionError:
00868             # usually just means the hub has gone down just before us or is in the
00869             # middle of doing so let it die nice and peacefully
00870             if not rospy.is_shutdown():
00871                 rospy.logwarn("Gateway : hub connection error while sending unflip request.")
00872         return False


rocon_gateway
Author(s): Daniel Stonier , Jihoon Lee , Piyush Khandelwal
autogenerated on Sat Jun 8 2019 18:48:44