00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import threading
00011 import rospy
00012 import re
00013 import utils
00014 from gateway_msgs.msg import RemoteRuleWithStatus as FlipStatus, RemoteRule
00015 import gateway_msgs.msg as gateway_msgs
00016 import rocon_python_comms
00017 import rocon_python_utils
00018 import rocon_gateway_utils
00019 import rocon_hub_client
00020 import rocon_python_redis as redis
00021 import time
00022 from rocon_hub_client import hub_api, hub_client
00023 from rocon_hub_client.exceptions import HubConnectionLostError, \
00024 HubNameNotFoundError, HubNotFoundError
00025
00026 from .exceptions import GatewayUnavailableError
00027
00028
00029
00030
00031
00032
00033 class HubConnectionCheckerThread(threading.Thread):
00034
00035 '''
00036 Pings redis periodically to figure out if redis is still alive.
00037 '''
00038
00039 def __init__(self, ip, port, hub_connection_lost_hook):
00040 threading.Thread.__init__(self)
00041 self.daemon = True
00042 self.ping_frequency = 0.2
00043 self._hub_connection_lost_hook = hub_connection_lost_hook
00044 self.ip = ip
00045 self.port = port
00046 self.pinger = rocon_python_utils.network.Pinger(self.ip, self.ping_frequency)
00047
00048 def get_latency(self):
00049 return self.pinger.get_latency()
00050
00051 def run(self):
00052
00053
00054 self.pinger.start()
00055 rate = rocon_python_comms.WallRate(self.ping_frequency)
00056 alive = True
00057 timeout = 1 / self.ping_frequency
00058 while alive:
00059 alive, message = hub_client.ping_hub(self.ip, self.port, timeout)
00060 rate.sleep()
00061 rospy.logwarn("Gatew Hub pinger : %s"%message)
00062 self._hub_connection_lost_hook()
00063
00064
00065
00066
00067
00068
00069 class GatewayHub(rocon_hub_client.Hub):
00070
00071 def __init__(self, ip, port, whitelist, blacklist):
00072 '''
00073 @param remote_gateway_request_callbacks : to handle redis responses
00074 @type list of function pointers (back to GatewaySync class
00075
00076 @param ip : redis server ip
00077 @param port : redis server port
00078
00079 @raise HubNameNotFoundError, HubNotFoundError
00080 '''
00081 try:
00082 super(GatewayHub, self).__init__(ip, port, whitelist, blacklist)
00083 except HubNotFoundError:
00084 raise
00085 except HubNameNotFoundError:
00086 raise
00087 self._hub_connection_lost_gateway_hook = None
00088 self._firewall = 0
00089
00090
00091 self._redis_keys['gatewaylist'] = hub_api.create_rocon_hub_key('gatewaylist')
00092 self._unique_gateway_name = ''
00093
00094
00095
00096
00097
00098 def register_gateway(self, firewall, unique_gateway_name, hub_connection_lost_gateway_hook, gateway_ip):
00099 '''
00100 Register a gateway with the hub.
00101
00102 @param firewall
00103 @param unique_gateway_name
00104 @param hub_connection_lost_gateway_hook : used to trigger Gateway.disengage_hub(hub)
00105 on lost hub connections in redis pubsub listener thread.
00106 @gateway_ip
00107
00108 @raise HubConnectionLostError if for some reason, the redis server has become unavailable.
00109 '''
00110 if not self._redis_server:
00111 raise HubConnectionLostError()
00112 self._unique_gateway_name = unique_gateway_name
00113 self._redis_keys['gateway'] = hub_api.create_rocon_key(unique_gateway_name)
00114 self._redis_keys['firewall'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'firewall')
00115 self._firewall = 1 if firewall else 0
00116 self._hub_connection_lost_gateway_hook = hub_connection_lost_gateway_hook
00117 if not self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway']):
00118
00119 pass
00120 self.mark_named_gateway_available(self._redis_keys['gateway'])
00121 self._redis_server.set(self._redis_keys['firewall'], self._firewall)
00122
00123
00124 self._redis_keys['ip'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'ip')
00125 self._redis_server.set(self._redis_keys['ip'], gateway_ip)
00126
00127 self.private_key, public_key = utils.generate_private_public_key()
00128 self._redis_keys['public_key'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'public_key')
00129 old_key = self._redis_server.get(self._redis_keys['public_key'])
00130 serialized_public_key = utils.serialize_key(public_key)
00131 self._redis_server.set(self._redis_keys['public_key'], serialized_public_key)
00132 if serialized_public_key != old_key:
00133 rospy.loginfo('Gateway : Found existing mismatched public key on the hub. ' +
00134 'Requesting resend for all flip-ins.')
00135 self._resend_all_flip_ins()
00136
00137
00138 self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
00139 self.hub_connection_checker_thread = HubConnectionCheckerThread(
00140 self.ip, self.port, self._hub_connection_lost_hook)
00141 self.hub_connection_checker_thread.start()
00142 self.connection_lost_lock = threading.Lock()
00143
00144
00145 ping_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, ':ping')
00146 self._redis_server.set(ping_key, True)
00147 self._redis_server.expire(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL)
00148
00149 def _hub_connection_lost_hook(self):
00150 '''
00151 This gets triggered by the redis connection checker thread when the hub connection is lost.
00152 The trigger is passed to the gateway who needs to remove the hub.
00153 '''
00154 self.connection_lost_lock.acquire()
00155
00156 if self._hub_connection_lost_gateway_hook is not None:
00157 rospy.loginfo("Gateway : lost connection with hub, attempting to disconnect...")
00158 self._hub_connection_lost_gateway_hook(self)
00159 self._hub_connection_lost_gateway_hook = None
00160 self.connection_lost_lock.release()
00161
00162 def unregister_gateway(self):
00163 '''
00164 Remove all gateway info from the hub.
00165
00166 @return: success or failure of the operation
00167 @rtype: bool
00168 '''
00169 try:
00170 self.unregister_named_gateway(self._redis_keys['gateway'])
00171 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
00172
00173
00174
00175
00176 pass
00177
00178 rospy.loginfo("Gateway : unregistered from the hub [%s]" % self.name)
00179
00180 def publish_network_statistics(self, statistics):
00181 '''
00182 Publish network interface information to the hub
00183
00184 @param statistics
00185 @type gateway_msgs.RemoteGateway
00186 '''
00187 try:
00188
00189
00190 if not statistics.network_info_available:
00191 rospy.logdebug("Gateway : unable to publish network statistics [network info unavailable]")
00192 return
00193 network_info_available = hub_api.create_rocon_gateway_key(
00194 self._unique_gateway_name, 'network:info_available')
00195 self._redis_server.set(network_info_available, statistics.network_info_available)
00196 network_type = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'network:type')
00197 self._redis_server.set(network_type, statistics.network_type)
00198
00199
00200
00201 ping_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, ':ping')
00202 self._redis_server.set(ping_key, True)
00203 self._redis_server.expire(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL)
00204
00205 latency = self.hub_connection_checker_thread.get_latency()
00206 self.update_named_gateway_latency_stats(self._unique_gateway_name, latency)
00207
00208 if statistics.network_type == gateway_msgs.RemoteGateway.WIRED:
00209 return
00210 wireless_bitrate_key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:bitrate')
00211 self._redis_server.set(wireless_bitrate_key, statistics.wireless_bitrate)
00212 wireless_link_quality = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:quality')
00213 self._redis_server.set(wireless_link_quality, statistics.wireless_link_quality)
00214 wireless_signal_level = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:signal_level')
00215 self._redis_server.set(wireless_signal_level, statistics.wireless_signal_level)
00216 wireless_noise_level = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'wireless:noise_level')
00217 self._redis_server.set(wireless_noise_level, statistics.wireless_noise_level)
00218 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
00219 rospy.logerr("Gateway : unable to publish network statistics [no connection to the hub]")
00220
00221 def unregister_named_gateway(self, gateway_key):
00222 '''
00223 Remove all gateway info for given gateway key from the hub.
00224 '''
00225 try:
00226 gateway_keys = self._redis_server.keys(gateway_key + ":*")
00227 pipe = self._redis_server.pipeline()
00228 pipe.delete(*gateway_keys)
00229 pipe.srem(self._redis_keys['gatewaylist'], gateway_key)
00230 pipe.execute()
00231 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
00232 pass
00233
00234 def update_named_gateway_latency_stats(self, gateway_name, latency_stats):
00235 '''
00236 For a given gateway, update the latency statistics
00237
00238 #param gateway_name : gateway name, not the redis key
00239 @type str
00240 @param latency_stats : ping statistics to the gateway from the hub
00241 @type list : 4-tuple of float values [min, avg, max, mean deviation]
00242 '''
00243 try:
00244 min_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:min')
00245 avg_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:avg')
00246 max_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:max')
00247 mdev_latency_key = hub_api.create_rocon_gateway_key(gateway_name, 'latency:mdev')
00248 self._redis_server.set(min_latency_key, latency_stats[0])
00249 self._redis_server.set(avg_latency_key, latency_stats[1])
00250 self._redis_server.set(max_latency_key, latency_stats[2])
00251 self._redis_server.set(mdev_latency_key, latency_stats[3])
00252 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
00253 rospy.logerr("Gateway: unable to update latency stats for " + gateway_name)
00254
00255 def mark_named_gateway_available(self, gateway_key, available=True,
00256 time_since_last_seen=0.0):
00257 '''
00258 This function is used by the hub to mark if a gateway can be pinged.
00259 If a gateway cannot be pinged, the hub indicates how longs has it been
00260 since the hub was last seen
00261
00262 @param gateway_key : The gateway key (not the name)
00263 @type str
00264 @param available: If the gateway can be pinged right now
00265 @type bool
00266 @param time_since_last_seen: If available is false, how long has it
00267 been since the gateway was last seen (in seconds)
00268 @type float
00269 '''
00270 available_key = gateway_key + ":available"
00271 self._redis_server.set(available_key, available)
00272 time_since_last_seen_key = gateway_key + ":time_since_last_seen"
00273 self._redis_server.set(time_since_last_seen_key, int(time_since_last_seen))
00274
00275
00276
00277
00278
00279 def remote_gateway_info(self, gateway):
00280 '''
00281 Return remote gateway information for the specified gateway string id.
00282
00283 @param gateways : gateway id string to search for
00284 @type string
00285 @return remote gateway information
00286 @rtype gateway_msgs.RemotGateway or None
00287 '''
00288 firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall'))
00289 if firewall is None:
00290 return None
00291 ip = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'ip'))
00292 if ip is None:
00293 return None
00294 remote_gateway = gateway_msgs.RemoteGateway()
00295 remote_gateway.name = gateway
00296 remote_gateway.ip = ip
00297 remote_gateway.firewall = True if int(firewall) else False
00298 remote_gateway.public_interface = []
00299 encoded_advertisements = self._redis_server.smembers(
00300 hub_api.create_rocon_gateway_key(gateway, 'advertisements'))
00301 for encoded_advertisement in encoded_advertisements:
00302 advertisement = utils.deserialize_connection(encoded_advertisement)
00303 remote_gateway.public_interface.append(advertisement.rule)
00304 remote_gateway.flipped_interface = []
00305 encoded_flips = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'flips'))
00306 for encoded_flip in encoded_flips:
00307 [target_gateway, name, connection_type, node] = utils.deserialize(encoded_flip)
00308 remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
00309 remote_gateway.flipped_interface.append(remote_rule)
00310 remote_gateway.pulled_interface = []
00311 encoded_pulls = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'pulls'))
00312 for encoded_pull in encoded_pulls:
00313 [target_gateway, name, connection_type, node] = utils.deserialize(encoded_pull)
00314 remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
00315 remote_gateway.pulled_interface.append(remote_rule)
00316
00317
00318 gateway_available_key = hub_api.create_rocon_gateway_key(gateway, 'available')
00319 remote_gateway.conn_stats.gateway_available = \
00320 self._parse_redis_bool(self._redis_server.get(gateway_available_key))
00321 time_since_last_seen_key = hub_api.create_rocon_gateway_key(gateway, 'time_since_last_seen')
00322 remote_gateway.conn_stats.time_since_last_seen = \
00323 self._parse_redis_int(self._redis_server.get(time_since_last_seen_key))
00324
00325 ping_latency_min_key = hub_api.create_rocon_gateway_key(gateway, 'latency:min')
00326 remote_gateway.conn_stats.ping_latency_min = \
00327 self._parse_redis_float(self._redis_server.get(ping_latency_min_key))
00328 ping_latency_max_key = hub_api.create_rocon_gateway_key(gateway, 'latency:max')
00329 remote_gateway.conn_stats.ping_latency_max = \
00330 self._parse_redis_float(self._redis_server.get(ping_latency_max_key))
00331 ping_latency_avg_key = hub_api.create_rocon_gateway_key(gateway, 'latency:avg')
00332 remote_gateway.conn_stats.ping_latency_avg = \
00333 self._parse_redis_float(self._redis_server.get(ping_latency_avg_key))
00334 ping_latency_mdev_key = hub_api.create_rocon_gateway_key(gateway, 'latency:mdev')
00335 remote_gateway.conn_stats.ping_latency_mdev = \
00336 self._parse_redis_float(self._redis_server.get(ping_latency_mdev_key))
00337
00338
00339 network_info_available_key = hub_api.create_rocon_gateway_key(gateway, 'network:info_available')
00340 remote_gateway.conn_stats.network_info_available = \
00341 self._parse_redis_bool(self._redis_server.get(network_info_available_key))
00342 if not remote_gateway.conn_stats.network_info_available:
00343 return remote_gateway
00344 network_type_key = hub_api.create_rocon_gateway_key(gateway, 'network:type')
00345 remote_gateway.conn_stats.network_type = \
00346 self._parse_redis_int(self._redis_server.get(network_type_key))
00347 if remote_gateway.conn_stats.network_type == gateway_msgs.RemoteGateway.WIRED:
00348 return remote_gateway
00349 wireless_bitrate_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:bitrate')
00350 remote_gateway.conn_stats.wireless_bitrate = \
00351 self._parse_redis_float(self._redis_server.get(wireless_bitrate_key))
00352 wireless_link_quality_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:quality')
00353 remote_gateway.conn_stats.wireless_link_quality = \
00354 self._parse_redis_int(self._redis_server.get(wireless_link_quality_key))
00355 wireless_signal_level_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:signal_level')
00356 remote_gateway.conn_stats.wireless_signal_level = \
00357 self._parse_redis_float(self._redis_server.get(wireless_signal_level_key))
00358 wireless_noise_level_key = hub_api.create_rocon_gateway_key(gateway, 'wireless:noise_level')
00359 remote_gateway.conn_stats.wireless_noise_level = \
00360 self._parse_redis_float(self._redis_server.get(wireless_noise_level_key))
00361 return remote_gateway
00362
00363 def list_remote_gateway_names(self):
00364 '''
00365 Return a list of the gateways (name list, not redis keys).
00366 e.g. ['gateway32adcda32','pirate21fasdf']. If not connected, just
00367 returns an empty list.
00368 '''
00369 if not self._redis_server:
00370 rospy.logerr("Gateway : cannot retrieve remote gateway names [%s][%s]." % (self.name, self.uri))
00371 return []
00372 gateways = []
00373 try:
00374 gateway_keys = self._redis_server.smembers(self._redis_keys['gatewaylist'])
00375 for gateway in gateway_keys:
00376 if hub_api.key_base_name(gateway) != self._unique_gateway_name:
00377 gateways.append(hub_api.key_base_name(gateway))
00378 except (redis.ConnectionError, AttributeError) as unused_e:
00379
00380
00381
00382 pass
00383 return gateways
00384
00385 def matches_remote_gateway_name(self, gateway):
00386 '''
00387 Use this when gateway can be a regular expression and
00388 we need to check it off against list_remote_gateway_names()
00389
00390 @return a list of matches (higher level decides on action for duplicates).
00391 @rtype list[str] : list of remote gateway names.
00392 '''
00393 matches = []
00394 try:
00395 for remote_gateway in self.list_remote_gateway_names():
00396 if re.match(gateway, remote_gateway):
00397 matches.append(remote_gateway)
00398 except HubConnectionLostError:
00399 raise
00400 return matches
00401
00402 def matches_remote_gateway_basename(self, gateway):
00403 '''
00404 Use this when gateway can be a regular expression and
00405 we need to check it off against list_remote_gateway_names()
00406 '''
00407 weak_matches = []
00408 try:
00409 for remote_gateway in self.list_remote_gateway_names():
00410 if re.match(gateway, rocon_gateway_utils.gateway_basename(remote_gateway)):
00411 weak_matches.append(remote_gateway)
00412 except HubConnectionLostError:
00413 raise
00414 return weak_matches
00415
00416 def get_remote_connection_state(self, remote_gateway):
00417 '''
00418 Equivalent to get_connection_state, but generates it from the public
00419 interface of a remote gateway
00420
00421 @param remote_gateway : hash name for a remote gateway
00422 @type str
00423 @return dictionary of remote advertisements
00424 @rtype dictionary of connection type keyed connection values
00425 '''
00426 connections = utils.create_empty_connection_type_dictionary()
00427 key = hub_api.create_rocon_gateway_key(remote_gateway, 'advertisements')
00428 try:
00429 public_interface = self._redis_server.smembers(key)
00430 for connection_str in public_interface:
00431 connection = utils.deserialize_connection(connection_str)
00432 connections[connection.rule.type].append(connection)
00433 except redis.exceptions.ConnectionError:
00434
00435 pass
00436 return connections
00437
00438 def get_remote_gateway_firewall_flag(self, gateway):
00439 '''
00440 Returns the value of the remote gateway's firewall (flip)
00441 flag.
00442
00443 @param gateway : gateway string id
00444 @param string
00445
00446 @return state of the flag
00447 @rtype Bool
00448
00449 @raise GatewayUnavailableError when specified gateway is not on the hub
00450 '''
00451 firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall'))
00452 if firewall is not None:
00453 return True if int(firewall) else False
00454 else:
00455 raise GatewayUnavailableError
00456
00457 def get_local_advertisements(self):
00458 '''
00459 Retrieves the local list of advertisements from the hub. This
00460 gets used to sync across multiple hubs.
00461
00462 @return dictionary of remote advertisements
00463 @rtype dictionary of connection type keyed connection values
00464 '''
00465 connections = utils.create_empty_connection_type_dictionary()
00466 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
00467 public_interface = self._redis_server.smembers(key)
00468 for connection_str in public_interface:
00469 connection = utils.deserialize_connection(connection_str)
00470 connections[connection.rule.type].append(connection)
00471 return connections
00472
00473 def _parse_redis_float(self, val):
00474 if val:
00475 return float(val)
00476 else:
00477 return 0.0
00478
00479 def _parse_redis_int(self, val):
00480 if val:
00481 return int(val)
00482 else:
00483 return 0.0
00484
00485 def _parse_redis_bool(self, val):
00486 if val and (val == 'True'):
00487 return True
00488 else:
00489 return False
00490
00491
00492
00493
00494
00495 def advertise(self, connection):
00496 '''
00497 Places a topic, service or action on the public interface. On the
00498 redis server, this representation will always be:
00499
00500 - topic : a triple { name, type, xmlrpc node uri }
00501 - service : a triple { name, rosrpc uri, xmlrpc node uri }
00502 - action : ???
00503
00504 @param connection: representation of a connection (topic, service, action)
00505 @type connection: str
00506 @raise .exceptions.ConnectionTypeError: if connection arg is invalid.
00507 '''
00508 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
00509 msg_str = utils.serialize_connection(connection)
00510 self._redis_server.sadd(key, msg_str)
00511
00512 def unadvertise(self, connection):
00513 '''
00514 Removes a topic, service or action from the public interface.
00515
00516 @param connection: representation of a connection (topic, service, action)
00517 @type connection: str
00518 @raise .exceptions.ConnectionTypeError: if connectionarg is invalid.
00519 '''
00520 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
00521 msg_str = utils.serialize_connection(connection)
00522 self._redis_server.srem(key, msg_str)
00523
00524 def post_flip_details(self, gateway, name, connection_type, node):
00525 '''
00526 Post flip details to the redis server. This has no actual functionality,
00527 it is just useful for debugging with the remote_gateway_info service.
00528
00529 @param gateway : the target of the flip
00530 @type string
00531 @param name : the name of the connection
00532 @type string
00533 @param type : the type of the connection (one of ConnectionType.xxx
00534 @type string
00535 @param node : the node name it was pulled from
00536 @type string
00537 '''
00538 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips')
00539 serialized_data = utils.serialize([gateway, name, connection_type, node])
00540 self._redis_server.sadd(key, serialized_data)
00541
00542 def remove_flip_details(self, gateway, name, connection_type, node):
00543 '''
00544 Post flip details to the redis server. This has no actual functionality,
00545 it is just useful for debugging with the remote_gateway_info service.
00546
00547 @param gateway : the target of the flip
00548 @type string
00549 @param name : the name of the connection
00550 @type string
00551 @param type : the type of the connection (one of ConnectionType.xxx
00552 @type string
00553 @param node : the node name it was pulled from
00554 @type string
00555 '''
00556 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips')
00557 serialized_data = utils.serialize([gateway, name, connection_type, node])
00558 self._redis_server.srem(key, serialized_data)
00559
00560 def post_pull_details(self, gateway, name, connection_type, node):
00561 '''
00562 Post pull details to the hub. This has no actual functionality,
00563 it is just useful for debugging with the remote_gateway_info service.
00564
00565 @param gateway : the gateway it is pulling from
00566 @type string
00567 @param name : the name of the connection
00568 @type string
00569 @param type : the type of the connection (one of ConnectionType.xxx
00570 @type string
00571 @param node : the node name it was pulled from
00572 @type string
00573 '''
00574 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls')
00575 serialized_data = utils.serialize([gateway, name, connection_type, node])
00576 self._redis_server.sadd(key, serialized_data)
00577
00578 def remove_pull_details(self, gateway, name, connection_type, node):
00579 '''
00580 Post pull details to the hub. This has no actual functionality,
00581 it is just useful for debugging with the remote_gateway_info service.
00582
00583 @param gateway : the gateway it was pulling from
00584 @type string
00585 @param name : the name of the connection
00586 @type string
00587 @param type : the type of the connection (one of ConnectionType.xxx
00588 @type string
00589 @param node : the node name it was pulled from
00590 @type string
00591 '''
00592 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls')
00593 serialized_data = utils.serialize([gateway, name, connection_type, node])
00594 self._redis_server.srem(key, serialized_data)
00595
00596
00597
00598
00599
00600 def _resend_all_flip_ins(self):
00601 '''
00602 Marks all flip ins to be resent. Until these flips are resent, they
00603 will not be processed
00604 '''
00605 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
00606 encoded_flip_ins = []
00607 try:
00608 encoded_flip_ins = self._redis_server.smembers(key)
00609 self._redis_server.delete(key)
00610 for flip_in in encoded_flip_ins:
00611 status, source, connection_list = utils.deserialize_request(flip_in)
00612 connection = utils.get_connection_from_list(connection_list)
00613 status = FlipStatus.RESEND
00614 serialized_data = utils.serialize_connection_request(status,
00615 source,
00616 connection)
00617 self._redis_server.sadd(key, serialized_data)
00618 except (redis.ConnectionError, AttributeError) as unused_e:
00619
00620 pass
00621
00622 def get_unblocked_flipped_in_connections(self):
00623 '''
00624 Gets all the flipped in connections listed on the hub that are interesting
00625 for this gateway (i.e. all unblocked/pending). This is used by the
00626 watcher loop to work out how it needs to update the local registrations.
00627
00628 :returns: the flipped in registration strings and status.
00629 :rtype: list of (utils.Registration, FlipStatus.XXX) tuples.
00630 '''
00631 registrations = []
00632 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
00633 encoded_flip_ins = []
00634 try:
00635 encoded_flip_ins = self._redis_server.smembers(key)
00636 except (redis.ConnectionError, AttributeError) as unused_e:
00637
00638 pass
00639 for flip_in in encoded_flip_ins:
00640 status, source, connection_list = utils.deserialize_request(flip_in)
00641 if source not in self.list_remote_gateway_names():
00642 continue
00643 connection = utils.get_connection_from_list(connection_list)
00644 connection = utils.decrypt_connection(connection, self.private_key)
00645 if status != FlipStatus.BLOCKED and status != FlipStatus.RESEND:
00646 registrations.append((utils.Registration(connection, source), status))
00647 return registrations
00648
00649 def update_flip_request_status(self, registration_with_status):
00650 '''
00651 Updates the flip request status for this hub
00652
00653 @param registration_with_status : the flip registration for which we are updating status
00654 @type (utils.Registration, str) where str is the status
00655
00656 @param status : pending/accepted/blocked
00657 @type same as gateway_msgs.msg.RemoteRuleWithStatus.status
00658
00659 @return True if this hub was used to send the flip request, and the status was updated. False otherwise.
00660 @rtype Boolean
00661 '''
00662 result = self.update_multiple_flip_request_status([registration_with_status])
00663 return result[0]
00664
00665 def update_multiple_flip_request_status(self, registrations_with_status):
00666 '''
00667 Updates the flip request status for multiple registrations on this hub
00668
00669 @param registrations_with_status : the flip registration for which we are updating status
00670 @type list of (utils.Registration, str) where str is the status
00671
00672 @param status : pending/accepted/blocked
00673 @type same as gateway_msgs.msg.RemoteRuleWithStatus.status
00674
00675 @return True if this hub was used to send the flip request, false otherwise.
00676 @rtype Boolean
00677 '''
00678 result = [False] * len(registrations_with_status)
00679 update_registrations = []
00680 hub_found = False
00681 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flip_ins')
00682 try:
00683 encoded_flip_ins = self._redis_server.smembers(key)
00684 for flip_in in encoded_flip_ins:
00685 old_status, source, connection_list = utils.deserialize_request(flip_in)
00686 connection = utils.get_connection_from_list(connection_list)
00687 connection = utils.decrypt_connection(connection, self.private_key)
00688 for index, (registration, new_status) in enumerate(registrations_with_status):
00689 if source == registration.remote_gateway and connection == registration.connection:
00690 if new_status != old_status:
00691 self._redis_server.srem(key, flip_in)
00692 update_registrations.append((index, (registration, new_status)))
00693 else:
00694 result[index] = True
00695
00696 for (index, (registration, new_status)) in update_registrations:
00697 encrypted_connection = utils.encrypt_connection(registration.connection,
00698 self.private_key)
00699 serialized_data = utils.serialize_connection_request(new_status,
00700 registration.remote_gateway,
00701 encrypted_connection)
00702 self._redis_server.sadd(key, serialized_data)
00703 result[index] = True
00704 except redis.exceptions.ConnectionError:
00705
00706
00707 pass
00708 return result
00709
00710 def get_flip_request_status(self, remote_rule):
00711 '''
00712 Get the status of a flipped registration. If the flip request does not
00713 exist (for instance, in the case where this hub was not used to send
00714 the request), then None is returned
00715
00716 @return the flip status or None
00717 @rtype same as gateway_msgs.msg.RemoteRuleWithStatus.status or None
00718 '''
00719 status = self.get_multiple_flip_request_status([remote_rule])
00720 return status[0]
00721
00722 def get_multiple_flip_request_status(self, remote_rules):
00723 '''
00724 Get the status of multiple flipped registration. If the flip request
00725 does not exist (for instance, in the case where this hub was not used
00726 to send the request), then None is returned. Multiple requests are
00727 batched together for efficiency.
00728
00729 @return the flip status, ordered as per the input remote rules
00730 @rtype list of gateway_msgs.msg.RemoteRuleWithStatus.status or None
00731 '''
00732 gateway_specific_rules = {}
00733 status = [None] * len(remote_rules)
00734 for i, remote_rule in enumerate(remote_rules):
00735 if remote_rule.gateway not in gateway_specific_rules:
00736 gateway_specific_rules[remote_rule.gateway] = []
00737 gateway_specific_rules[remote_rule.gateway].append((i, remote_rule))
00738
00739 source_gateway = self._unique_gateway_name
00740
00741 for gateway in gateway_specific_rules:
00742 key = hub_api.create_rocon_gateway_key(gateway, 'flip_ins')
00743 encoded_flips = []
00744 try:
00745 encoded_flips = self._redis_server.smembers(key)
00746 except (redis.ConnectionError, AttributeError) as unused_e:
00747
00748 pass
00749 for flip in encoded_flips:
00750 rule_status, source, connection_list = utils.deserialize_request(flip)
00751 if source != source_gateway:
00752 continue
00753 connection = utils.get_connection_from_list(connection_list)
00754
00755 for (index, remote_rule) in gateway_specific_rules[gateway]:
00756 if connection.rule == remote_rule.rule:
00757 status[index] = rule_status
00758 break
00759 return status
00760
00761 def send_flip_request(self, remote_gateway, connection, timeout=15.0):
00762 '''
00763 Sends a message to the remote gateway via redis pubsub channel. This is called from the
00764 watcher thread, when a flip rule gets activated.
00765
00766 - redis channel name: rocon:<remote_gateway_name>
00767 - data : list of [ command, gateway, rule type, type, xmlrpc_uri ]
00768 - [0] - command : in this case 'flip'
00769 - [1] - gateway : the name of this gateway, i.e. the flipper
00770 - [2] - name : local name
00771 - [3] - node : local node name
00772 - [4] - connection_type : one of ConnectionType.PUBLISHER etc
00773 - [5] - type_info : a ros format type (e.g. std_msgs/String or service api)
00774 - [6] - xmlrpc_uri : the xmlrpc node uri
00775
00776 @param command : string command name - either 'flip' or 'unflip'
00777 @type str
00778
00779 @param flip_rule : the flip to send
00780 @type gateway_msgs.RemoteRule
00781
00782 @param type_info : topic type (e.g. std_msgs/String)
00783 @param str
00784
00785 @param xmlrpc_uri : the node uri
00786 @param str
00787 '''
00788 key = hub_api.create_rocon_gateway_key(remote_gateway, 'flip_ins')
00789 source = hub_api.key_base_name(self._redis_keys['gateway'])
00790
00791
00792 if self.get_flip_request_status(RemoteRule(remote_gateway, connection.rule)) is not None:
00793 return True
00794
00795
00796 start_time = time.time()
00797 while time.time() - start_time <= timeout:
00798 remote_gateway_public_key_str = self._redis_server.get(
00799 hub_api.create_rocon_gateway_key(remote_gateway, 'public_key'))
00800 if remote_gateway_public_key_str is not None:
00801 break
00802 if remote_gateway_public_key_str is None:
00803 rospy.logerr("Gateway : flip to " + remote_gateway +
00804 " failed as public key not found")
00805 return False
00806
00807 remote_gateway_public_key = utils.deserialize_key(remote_gateway_public_key_str)
00808 encrypted_connection = utils.encrypt_connection(connection, remote_gateway_public_key)
00809
00810
00811 serialized_data = utils.serialize_connection_request(
00812 FlipStatus.PENDING, source, encrypted_connection)
00813 self._redis_server.sadd(key, serialized_data)
00814 return True
00815
00816 def send_unflip_request(self, remote_gateway, rule):
00817 if rule.type == gateway_msgs.ConnectionType.ACTION_CLIENT:
00818 action_name = rule.name
00819 rule.type = gateway_msgs.ConnectionType.PUBLISHER
00820 rule.name = action_name + "/goal"
00821 self._send_unflip_request(remote_gateway, rule)
00822 rule.name = action_name + "/cancel"
00823 self._send_unflip_request(remote_gateway, rule)
00824 rule.type = gateway_msgs.ConnectionType.SUBSCRIBER
00825 rule.name = action_name + "/feedback"
00826 self._send_unflip_request(remote_gateway, rule)
00827 rule.name = action_name + "/status"
00828 self._send_unflip_request(remote_gateway, rule)
00829 rule.name = action_name + "/result"
00830 self._send_unflip_request(remote_gateway, rule)
00831 elif rule.type == gateway_msgs.ConnectionType.ACTION_SERVER:
00832 action_name = rule.name
00833 rule.type = gateway_msgs.ConnectionType.SUBSCRIBER
00834 rule.name = action_name + "/goal"
00835 self._send_unflip_request(remote_gateway, rule)
00836 rule.name = action_name + "/cancel"
00837 self._send_unflip_request(remote_gateway, rule)
00838 rule.type = gateway_msgs.ConnectionType.PUBLISHER
00839 rule.name = action_name + "/feedback"
00840 self._send_unflip_request(remote_gateway, rule)
00841 rule.name = action_name + "/status"
00842 self._send_unflip_request(remote_gateway, rule)
00843 rule.name = action_name + "/result"
00844 self._send_unflip_request(remote_gateway, rule)
00845 else:
00846 self._send_unflip_request(remote_gateway, rule)
00847
00848 def _send_unflip_request(self, remote_gateway, rule):
00849 '''
00850 Unflip a previously flipped registration. If the flip request does not
00851 exist (for instance, in the case where this hub was not used to send
00852 the request), then False is returned
00853
00854 @return True if the flip existed and was removed, False otherwise
00855 @rtype Boolean
00856 '''
00857 key = hub_api.create_rocon_gateway_key(remote_gateway, 'flip_ins')
00858 try:
00859 encoded_flip_ins = self._redis_server.smembers(key)
00860 for flip_in in encoded_flip_ins:
00861 unused_status, source, connection_list = utils.deserialize_request(flip_in)
00862 connection = utils.get_connection_from_list(connection_list)
00863 if source == hub_api.key_base_name(self._redis_keys['gateway']) and \
00864 rule == connection.rule:
00865 self._redis_server.srem(key, flip_in)
00866 return True
00867 except redis.exceptions.ConnectionError:
00868
00869
00870 if not rospy.is_shutdown():
00871 rospy.logwarn("Gateway : hub connection error while sending unflip request.")
00872 return False