14 from gateway_msgs.msg
import RemoteRuleWithStatus
as FlipStatus, RemoteRule
15 import gateway_msgs.msg
as gateway_msgs
16 import rocon_python_comms
17 import rocon_python_utils
18 import rocon_gateway_utils
19 import rocon_hub_client
20 import rocon_python_redis
as redis
22 from rocon_hub_client
import hub_api, hub_client
24 HubNameNotFoundError, HubNotFoundError, HubConnectionFailedError
26 from .exceptions
import GatewayUnavailableError
28 import rocon_console.console
as console
37 Pings redis periodically to figure out if the redis connection is still alive. 40 def __init__(self, ip, port, hub_connection_lost_hook):
41 threading.Thread.__init__(self)
51 return self.pinger.get_latency()
55 This runs in the background to gather the latest connection statistics 56 Note - it's not used in the keep alive check 63 alive, message = hub_client.ping_hub(self.
ip, self.
port, timeout)
66 rospy.logwarn(
"Gateway : hub connection no longer alive, disengaging [%s]" % message)
78 This is used both by HubManager for the gateway node, and by the rocon hub watcher. 80 def __init__(self, ip, port, whitelist, blacklist):
82 @param remote_gateway_request_callbacks : to handle redis responses 83 @type list of function pointers (back to GatewaySync class 85 @param ip : redis server ip 86 @param port : redis server port 88 @raise HubNameNotFoundError, HubNotFoundError 91 super(GatewayHub, self).
__init__(ip, port, whitelist, blacklist)
92 except HubNotFoundError:
94 except HubNameNotFoundError:
100 self._redis_keys[
'gatewaylist'] = hub_api.create_rocon_hub_key(
'gatewaylist')
108 def register_gateway(self, firewall, unique_gateway_name, hub_connection_lost_gateway_hook, gateway_ip):
110 Register a gateway with the hub. 113 @param unique_gateway_name 114 @param hub_connection_lost_gateway_hook : used to trigger Gateway.disengage_hub(hub) 115 on lost hub connections in redis pubsub listener thread. 118 @raise HubConnectionLostError if for some reason, the redis server has become unavailable. 120 if not self._redis_server:
123 self.private_key, public_key = utils.generate_private_public_key()
125 serialized_public_key = utils.serialize_key(public_key)
129 self._redis_keys[
'ip'] = hub_api.create_rocon_gateway_key(unique_gateway_name,
'ip')
130 self._redis_keys[
'gateway'] = hub_api.create_rocon_key(unique_gateway_name)
131 self._redis_keys[
'firewall'] = hub_api.create_rocon_gateway_key(unique_gateway_name,
'firewall')
132 self._redis_keys[
'public_key'] = hub_api.create_rocon_gateway_key(unique_gateway_name,
'public_key')
137 pipe = self._redis_server.pipeline()
140 pipe.sadd(self._redis_keys[
'gatewaylist'], self._redis_keys[
'gateway'])
141 pipe.set(self._redis_keys[
'firewall'], self.
_firewall)
145 pipe.set(self._redis_keys[
'ip'], gateway_ip)
147 pipe.get(self._redis_keys[
'public_key'])
148 pipe.set(self._redis_keys[
'public_key'], serialized_public_key)
149 pipe.sadd(self._redis_keys[
'gatewaylist'], self._redis_keys[
'gateway'])
152 pipe.set(ping_key,
True)
153 pipe.expire(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL)
155 ret_pipe = pipe.execute()
156 [r_check_gateway, r_firewall, r_ip, r_oldkey, r_newkey, r_add_gateway, r_ping, r_expire] = ret_pipe
158 except (redis.WatchError, redis.ConnectionError)
as e:
159 raise HubConnectionFailedError(
"Connection Failed while registering hub[%s]" % str(e))
169 if serialized_public_key != r_oldkey:
170 rospy.loginfo(
'Gateway : found existing mismatched public key on the hub, ' +
171 'requesting resend for all flip-ins.')
177 self.hub_connection_checker_thread.start()
182 This gets triggered by the redis connection checker thread when the hub connection is lost. 183 It then passes the trigger to the gateway who needs to remove the hub. 185 self.connection_lost_lock.acquire()
188 rospy.loginfo(
"Gateway : lost connection with hub, attempting to disconnect...")
191 self.connection_lost_lock.release()
195 Checks if gateway info is on the hub. 197 @return: success or failure of the operation 207 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
216 Publish network interface information to the hub 219 @type gateway_msgs.RemoteGateway 226 self._redis_server.set(ping_key,
True)
227 self._redis_server.expire(ping_key, gateway_msgs.ConnectionStatistics.MAX_TTL)
231 if not statistics.network_info_available:
232 rospy.logdebug(
"Gateway : unable to publish network statistics [network info unavailable]")
234 network_info_available = hub_api.create_rocon_gateway_key(
236 self._redis_server.set(network_info_available, statistics.network_info_available)
238 self._redis_server.set(network_type, statistics.network_type)
241 latency = self.hub_connection_checker_thread.get_latency()
244 if statistics.network_type == gateway_msgs.RemoteGateway.WIRED:
246 wireless_bitrate_key = hub_api.create_rocon_gateway_key(self.
_unique_gateway_name,
'wireless:bitrate')
247 self._redis_server.set(wireless_bitrate_key, statistics.wireless_bitrate)
248 wireless_link_quality = hub_api.create_rocon_gateway_key(self.
_unique_gateway_name,
'wireless:quality')
249 self._redis_server.set(wireless_link_quality, statistics.wireless_link_quality)
250 wireless_signal_level = hub_api.create_rocon_gateway_key(self.
_unique_gateway_name,
'wireless:signal_level')
251 self._redis_server.set(wireless_signal_level, statistics.wireless_signal_level)
252 wireless_noise_level = hub_api.create_rocon_gateway_key(self.
_unique_gateway_name,
'wireless:noise_level')
253 self._redis_server.set(wireless_noise_level, statistics.wireless_noise_level)
254 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
255 rospy.logdebug(
"Gateway : unable to publish network statistics [no connection to the hub]")
259 Remove all gateway info for given gateway key from the hub. 262 gateway_keys = self._redis_server.keys(gateway_key +
":*")
263 pipe = self._redis_server.pipeline()
264 pipe.delete(*gateway_keys)
265 pipe.srem(self._redis_keys[
'gatewaylist'], gateway_key)
267 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
272 Check if the gateway exists in this hub 273 because sometimes the gateway ping can be there but all info has been wiped by the hub 276 return self._redis_server.sismember(self._redis_keys[
'gatewaylist'], gateway_key)
277 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
282 For a given gateway, update the latency statistics 284 #param gateway_name : gateway name, not the redis key 286 @param latency_stats : ping statistics to the gateway from the hub 287 @type list : 4-tuple of float values [min, avg, max, mean deviation] 290 min_latency_key = hub_api.create_rocon_gateway_key(gateway_name,
'latency:min')
291 avg_latency_key = hub_api.create_rocon_gateway_key(gateway_name,
'latency:avg')
292 max_latency_key = hub_api.create_rocon_gateway_key(gateway_name,
'latency:max')
293 mdev_latency_key = hub_api.create_rocon_gateway_key(gateway_name,
'latency:mdev')
294 self._redis_server.set(min_latency_key, latency_stats[0])
295 self._redis_server.set(avg_latency_key, latency_stats[1])
296 self._redis_server.set(max_latency_key, latency_stats[2])
297 self._redis_server.set(mdev_latency_key, latency_stats[3])
298 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
299 rospy.logerr(
"Gateway: unable to update latency stats for " + gateway_name)
302 time_since_last_seen=0.0):
304 This function is used by the hub to mark if a gateway can be pinged. 305 If a gateway cannot be pinged, the hub indicates how longs has it been 306 since the hub was last seen 308 @param gateway_key : The gateway key (not the name) 310 @param available: If the gateway can be pinged right now 312 @param time_since_last_seen: If available is false, how long has it 313 been since the gateway was last seen (in seconds) 316 pipe = self._redis_server.pipeline()
318 available_key = gateway_key +
":available" 319 pipe.set(available_key, available)
320 time_since_last_seen_key = gateway_key +
":time_since_last_seen" 321 pipe.set(time_since_last_seen_key, int(time_since_last_seen))
322 unused_ret_pipe = pipe.execute()
323 except (redis.WatchError, redis.ConnectionError)
as e:
324 raise HubConnectionFailedError(
"Connection Failed while registering hub[%s]" % str(e))
334 Return remote gateway information for the specified gateway string id. 336 @param gateways : gateway id string to search for 338 @return remote gateway information 339 @rtype gateway_msgs.RemotGateway or None 341 firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway,
'firewall'))
344 ip = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway,
'ip'))
347 remote_gateway = gateway_msgs.RemoteGateway()
348 remote_gateway.name = gateway
349 remote_gateway.ip = ip
350 remote_gateway.firewall =
True if int(firewall)
else False 351 remote_gateway.public_interface = []
352 encoded_advertisements = self._redis_server.smembers(
353 hub_api.create_rocon_gateway_key(gateway,
'advertisements'))
354 for encoded_advertisement
in encoded_advertisements:
355 advertisement = utils.deserialize_connection(encoded_advertisement)
356 remote_gateway.public_interface.append(advertisement.rule)
357 remote_gateway.flipped_interface = []
358 encoded_flips = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway,
'flips'))
359 for encoded_flip
in encoded_flips:
360 [target_gateway, name, connection_type, node] = utils.deserialize(encoded_flip)
361 remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
362 remote_gateway.flipped_interface.append(remote_rule)
363 remote_gateway.pulled_interface = []
364 encoded_pulls = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway,
'pulls'))
365 for encoded_pull
in encoded_pulls:
366 [target_gateway, name, connection_type, node] = utils.deserialize(encoded_pull)
367 remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
368 remote_gateway.pulled_interface.append(remote_rule)
371 gateway_available_key = hub_api.create_rocon_gateway_key(gateway,
'available')
372 remote_gateway.conn_stats.gateway_available = \
374 time_since_last_seen_key = hub_api.create_rocon_gateway_key(gateway,
'time_since_last_seen')
375 remote_gateway.conn_stats.time_since_last_seen = \
378 ping_latency_min_key = hub_api.create_rocon_gateway_key(gateway,
'latency:min')
379 remote_gateway.conn_stats.ping_latency_min = \
381 ping_latency_max_key = hub_api.create_rocon_gateway_key(gateway,
'latency:max')
382 remote_gateway.conn_stats.ping_latency_max = \
384 ping_latency_avg_key = hub_api.create_rocon_gateway_key(gateway,
'latency:avg')
385 remote_gateway.conn_stats.ping_latency_avg = \
387 ping_latency_mdev_key = hub_api.create_rocon_gateway_key(gateway,
'latency:mdev')
388 remote_gateway.conn_stats.ping_latency_mdev = \
392 network_info_available_key = hub_api.create_rocon_gateway_key(gateway,
'network:info_available')
393 remote_gateway.conn_stats.network_info_available = \
395 if not remote_gateway.conn_stats.network_info_available:
396 return remote_gateway
397 network_type_key = hub_api.create_rocon_gateway_key(gateway,
'network:type')
398 remote_gateway.conn_stats.network_type = \
400 if remote_gateway.conn_stats.network_type == gateway_msgs.RemoteGateway.WIRED:
401 return remote_gateway
402 wireless_bitrate_key = hub_api.create_rocon_gateway_key(gateway,
'wireless:bitrate')
403 remote_gateway.conn_stats.wireless_bitrate = \
405 wireless_link_quality_key = hub_api.create_rocon_gateway_key(gateway,
'wireless:quality')
406 remote_gateway.conn_stats.wireless_link_quality = \
408 wireless_signal_level_key = hub_api.create_rocon_gateway_key(gateway,
'wireless:signal_level')
409 remote_gateway.conn_stats.wireless_signal_level = \
411 wireless_noise_level_key = hub_api.create_rocon_gateway_key(gateway,
'wireless:noise_level')
412 remote_gateway.conn_stats.wireless_noise_level = \
414 return remote_gateway
418 Return a list of the gateways (name list, not redis keys). 419 e.g. ['gateway32adcda32','pirate21fasdf']. If not connected, just 420 returns an empty list. 422 if not self._redis_server:
423 rospy.logerr(
"Gateway : cannot retrieve remote gateway names [%s][%s]." % (self.name, self.uri))
427 gateway_keys = self._redis_server.smembers(self._redis_keys[
'gatewaylist'])
428 for gateway
in gateway_keys:
431 gateways.append(hub_api.key_base_name(gateway))
432 except (redis.ConnectionError, AttributeError)
as unused_e:
441 Use this when gateway can be a regular expression and 442 we need to check it off against list_remote_gateway_names() 444 @return a list of matches (higher level decides on action for duplicates). 445 @rtype list[str] : list of remote gateway names. 450 if re.match(gateway, remote_gateway):
451 matches.append(remote_gateway)
452 except HubConnectionLostError:
458 Use this when gateway can be a regular expression and 459 we need to check it off against list_remote_gateway_names() 464 if re.match(gateway, rocon_gateway_utils.gateway_basename(remote_gateway)):
465 weak_matches.append(remote_gateway)
466 except HubConnectionLostError:
472 Equivalent to get_connection_state, but generates it from the public 473 interface of a remote gateway 475 @param remote_gateway : hash name for a remote gateway 477 @return dictionary of remote advertisements 478 @rtype dictionary of connection type keyed connection values 480 connections = utils.create_empty_connection_type_dictionary()
481 key = hub_api.create_rocon_gateway_key(remote_gateway,
'advertisements')
483 public_interface = self._redis_server.smembers(key)
484 for connection_str
in public_interface:
485 connection = utils.deserialize_connection(connection_str)
486 connections[connection.rule.type].append(connection)
487 except redis.exceptions.ConnectionError:
494 Returns the value of the remote gateway's firewall (flip) 497 @param gateway : gateway string id 500 @return state of the flag 503 @raise GatewayUnavailableError when specified gateway is not on the hub 505 firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway,
'firewall'))
506 if firewall
is not None:
507 return True if int(firewall)
else False 509 raise GatewayUnavailableError
513 Retrieves the local list of advertisements from the hub. This 514 gets used to sync across multiple hubs. 516 @return dictionary of remote advertisements 517 @rtype dictionary of connection type keyed connection values 519 connections = utils.create_empty_connection_type_dictionary()
522 public_interface = self._redis_server.smembers(key)
523 for connection_str
in public_interface:
524 connection = utils.deserialize_connection(connection_str)
525 connections[connection.rule.type].append(connection)
526 except redis.exceptions.ConnectionError:
544 if val
and (val ==
'True'):
555 Places a topic, service or action on the public interface. On the 556 redis server, this representation will always be: 558 - topic : a triple { name, type, xmlrpc node uri } 559 - service : a triple { name, rosrpc uri, xmlrpc node uri } 562 @param connection: representation of a connection (topic, service, action) 563 @type connection: str 564 @raise .exceptions.ConnectionTypeError: if connection arg is invalid. 567 msg_str = utils.serialize_connection(connection)
568 self._redis_server.sadd(key, msg_str)
572 Removes a topic, service or action from the public interface. 574 @param connection: representation of a connection (topic, service, action) 575 @type connection: str 576 @raise .exceptions.ConnectionTypeError: if connectionarg is invalid. 579 msg_str = utils.serialize_connection(connection)
580 self._redis_server.srem(key, msg_str)
584 Post flip details to the redis server. This has no actual functionality, 585 it is just useful for debugging with the remote_gateway_info service. 587 @param gateway : the target of the flip 589 @param name : the name of the connection 591 @param type : the type of the connection (one of ConnectionType.xxx 593 @param node : the node name it was pulled from 597 serialized_data = utils.serialize([gateway, name, connection_type, node])
598 self._redis_server.sadd(key, serialized_data)
602 Post flip details to the redis server. This has no actual functionality, 603 it is just useful for debugging with the remote_gateway_info service. 605 @param gateway : the target of the flip 607 @param name : the name of the connection 609 @param type : the type of the connection (one of ConnectionType.xxx 611 @param node : the node name it was pulled from 615 serialized_data = utils.serialize([gateway, name, connection_type, node])
616 self._redis_server.srem(key, serialized_data)
620 Post pull details to the hub. This has no actual functionality, 621 it is just useful for debugging with the remote_gateway_info service. 623 @param gateway : the gateway it is pulling from 625 @param name : the name of the connection 627 @param type : the type of the connection (one of ConnectionType.xxx 629 @param node : the node name it was pulled from 633 serialized_data = utils.serialize([gateway, name, connection_type, node])
634 self._redis_server.sadd(key, serialized_data)
638 Post pull details to the hub. This has no actual functionality, 639 it is just useful for debugging with the remote_gateway_info service. 641 @param gateway : the gateway it was pulling from 643 @param name : the name of the connection 645 @param type : the type of the connection (one of ConnectionType.xxx 647 @param node : the node name it was pulled from 651 serialized_data = utils.serialize([gateway, name, connection_type, node])
652 self._redis_server.srem(key, serialized_data)
660 Marks all flip ins to be resent. Until these flips are resent, they 661 will not be processed 664 encoded_flip_ins = []
666 encoded_flip_ins = self._redis_server.smembers(key)
667 self._redis_server.delete(key)
668 for flip_in
in encoded_flip_ins:
669 status, source, connection_list = utils.deserialize_request(flip_in)
670 connection = utils.get_connection_from_list(connection_list)
671 status = FlipStatus.RESEND
672 serialized_data = utils.serialize_connection_request(status,
675 self._redis_server.sadd(key, serialized_data)
676 except (redis.ConnectionError, AttributeError)
as unused_e:
682 Gets all the flipped in connections listed on the hub that are interesting 683 for this gateway (i.e. all unblocked/pending). This is used by the 684 watcher loop to work out how it needs to update the local registrations. 686 :returns: the flipped in registration strings and status. 687 :rtype: list of (utils.Registration, FlipStatus.XXX) tuples. 691 encoded_flip_ins = []
693 encoded_flip_ins = self._redis_server.smembers(key)
694 except (redis.ConnectionError, AttributeError)
as unused_e:
697 for flip_in
in encoded_flip_ins:
698 status, source, connection_list = utils.deserialize_request(flip_in)
701 connection = utils.get_connection_from_list(connection_list)
702 connection = utils.decrypt_connection(connection, self.private_key)
703 if status != FlipStatus.BLOCKED
and status != FlipStatus.RESEND:
709 Updates the flip request status for this hub 711 @param registration_with_status : the flip registration for which we are updating status 712 @type (utils.Registration, str) where str is the status 714 @param status : pending/accepted/blocked 715 @type same as gateway_msgs.msg.RemoteRuleWithStatus.status 717 @return True if this hub was used to send the flip request, and the status was updated. False otherwise. 725 Updates the flip request status for multiple registrations on this hub 727 @param registrations_with_status : the flip registration for which we are updating status 728 @type list of (utils.Registration, str) where str is the status 730 @param status : pending/accepted/blocked 731 @type same as gateway_msgs.msg.RemoteRuleWithStatus.status 733 @return True if this hub was used to send the flip request, false otherwise. 736 result = [
False] * len(registrations_with_status)
737 update_registrations = []
741 encoded_flip_ins = self._redis_server.smembers(key)
742 for flip_in
in encoded_flip_ins:
743 old_status, source, connection_list = utils.deserialize_request(flip_in)
744 connection = utils.get_connection_from_list(connection_list)
745 connection = utils.decrypt_connection(connection, self.private_key)
746 for index, (registration, new_status)
in enumerate(registrations_with_status):
747 if source == registration.remote_gateway
and connection == registration.connection:
748 if new_status != old_status:
749 self._redis_server.srem(key, flip_in)
750 update_registrations.append((index, (registration, new_status)))
754 for (index, (registration, new_status))
in update_registrations:
755 encrypted_connection = utils.encrypt_connection(registration.connection,
757 serialized_data = utils.serialize_connection_request(new_status,
758 registration.remote_gateway,
759 encrypted_connection)
760 self._redis_server.sadd(key, serialized_data)
762 except redis.exceptions.ConnectionError:
770 Get the status of a flipped registration. If the flip request does not 771 exist (for instance, in the case where this hub was not used to send 772 the request), then None is returned 774 @return the flip status or None 775 @rtype same as gateway_msgs.msg.RemoteRuleWithStatus.status or None 782 Get the status of multiple flipped registration. If the flip request 783 does not exist (for instance, in the case where this hub was not used 784 to send the request), then None is returned. Multiple requests are 785 batched together for efficiency. 787 @return the flip status, ordered as per the input remote rules 788 @rtype list of gateway_msgs.msg.RemoteRuleWithStatus.status or None 790 gateway_specific_rules = {}
791 status = [
None] * len(remote_rules)
792 for i, remote_rule
in enumerate(remote_rules):
793 if remote_rule.gateway
not in gateway_specific_rules:
794 gateway_specific_rules[remote_rule.gateway] = []
795 gateway_specific_rules[remote_rule.gateway].append((i, remote_rule))
799 for gateway
in gateway_specific_rules:
800 key = hub_api.create_rocon_gateway_key(gateway,
'flip_ins')
803 encoded_flips = self._redis_server.smembers(key)
804 except (redis.ConnectionError, AttributeError)
as unused_e:
807 for flip
in encoded_flips:
808 rule_status, source, connection_list = utils.deserialize_request(flip)
809 if source != source_gateway:
811 connection = utils.get_connection_from_list(connection_list)
815 for (index, remote_rule)
in gateway_specific_rules[gateway]:
819 exploded_remote_rules = self.
rule_explode([remote_rule])
822 if len([r
for r
in exploded_remote_rules
if connection.rule == r.rule]) > 0:
823 if status[index]
is None:
825 status[index] = rule_status
826 elif status[index] != rule_status:
829 if rule_status == FlipStatus.UNKNOWN:
831 status[index] = rule_status
835 if ((status[index] == FlipStatus.PENDING
or status[index] == FlipStatus.ACCEPTED)
and 836 (rule_status == FlipStatus.BLOCKED
or rule_status == FlipStatus.RESEND)
839 status[index] = rule_status
846 Sends a message to the remote gateway via redis pubsub channel. This is called from the 847 watcher thread, when a flip rule gets activated. 849 - redis channel name: rocon:<remote_gateway_name> 850 - data : list of [ command, gateway, rule type, type, xmlrpc_uri ] 851 - [0] - command : in this case 'flip' 852 - [1] - gateway : the name of this gateway, i.e. the flipper 853 - [2] - name : local name 854 - [3] - node : local node name 855 - [4] - connection_type : one of ConnectionType.PUBLISHER etc 856 - [5] - type_info : a ros format type (e.g. std_msgs/String or service api) 857 - [6] - xmlrpc_uri : the xmlrpc node uri 859 @param command : string command name - either 'flip' or 'unflip' 862 @param flip_rule : the flip to send 863 @type gateway_msgs.RemoteRule 865 @param type_info : topic type (e.g. std_msgs/String) 868 @param xmlrpc_uri : the node uri 871 key = hub_api.create_rocon_gateway_key(remote_gateway,
'flip_ins')
872 source = hub_api.key_base_name(self._redis_keys[
'gateway'])
881 start_time = time.time()
882 while time.time() - start_time <= timeout:
883 remote_gateway_public_key_str = self._redis_server.get(
884 hub_api.create_rocon_gateway_key(remote_gateway,
'public_key'))
885 if remote_gateway_public_key_str
is not None:
887 if remote_gateway_public_key_str
is None:
888 rospy.logerr(
"Gateway : flip to " + remote_gateway +
889 " failed as public key not found")
892 remote_gateway_public_key = utils.deserialize_key(remote_gateway_public_key_str)
893 encrypted_connection = utils.encrypt_connection(connection, remote_gateway_public_key)
896 serialized_data = utils.serialize_connection_request(
897 FlipStatus.PENDING, source, encrypted_connection)
898 self._redis_server.sadd(key, serialized_data)
910 Unflip a previously flipped registration. If the flip request does not 911 exist (for instance, in the case where this hub was not used to send 912 the request), then False is returned 914 @return True if the flip existed and was removed, False otherwise 917 key = hub_api.create_rocon_gateway_key(remote_gateway,
'flip_ins')
920 rule.node = rule.node.split(
",")[0]
922 encoded_flip_ins = self._redis_server.smembers(key)
923 for flip_in
in encoded_flip_ins:
924 unused_status, source, connection_list = utils.deserialize_request(flip_in)
925 connection = utils.get_connection_from_list(connection_list)
926 if source == hub_api.key_base_name(self._redis_keys[
'gateway'])
and rule == connection.rule:
927 self._redis_server.srem(key, flip_in)
929 except redis.exceptions.ConnectionError:
932 if not rospy.is_shutdown():
933 rospy.logwarn(
"Gateway : hub connection error while sending unflip request.")
939 for rule
in rule_list:
940 if isinstance(rule, RemoteRule):
946 if asm_rule.type == gateway_msgs.ConnectionType.ACTION_CLIENT:
947 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/goal", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
948 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/cancel", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
949 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/feedback", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
950 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/status", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
951 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/result", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
952 elif asm_rule.type == gateway_msgs.ConnectionType.ACTION_SERVER:
953 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/goal", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
954 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/cancel", type= gateway_msgs.ConnectionType.SUBSCRIBER, node= asm_rule.node ))
955 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/feedback", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
956 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/status", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
957 exp_rules.append( gateway_msgs.Rule(name=asm_rule.name +
"/result", type= gateway_msgs.ConnectionType.PUBLISHER, node= asm_rule.node ))
959 exp_rules.append(asm_rule)
961 if isinstance(rule, RemoteRule):
963 exp_rules = [RemoteRule(gateway=rule.gateway, rule=r)
for r
in exp_rules]
965 result_list += exp_rules
972 for rule
in rule_list:
974 if isinstance(rule, RemoteRule):
980 action_name = exp_rule.name
981 action_client =
False 982 action_server =
False 984 if exp_rule.name.endswith(
"/goal")
and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
985 action_name = exp_rule.name[:-len(
"/goal")]
987 elif exp_rule.name.endswith(
"/cancel")
and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
988 action_name = exp_rule.name[:-len(
"/cancel")]
990 elif exp_rule.name.endswith(
"/feedback")
and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
991 action_name = exp_rule.name[:-len(
"/feedback")]
993 elif exp_rule.name.endswith(
"/status")
and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
994 action_name = exp_rule.name[:-len(
"/status")]
996 elif exp_rule.name.endswith(
"/result")
and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
997 action_name = exp_rule.name[:-len(
"/result")]
1000 if exp_rule.name.endswith(
"/goal")
and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
1001 action_name = exp_rule.name[:-len(
"/goal")]
1002 action_server =
True 1003 elif exp_rule.name.endswith(
"/cancel")
and exp_rule.type == gateway_msgs.ConnectionType.SUBSCRIBER:
1004 action_name = exp_rule.name[:-len(
"/cancel")]
1005 action_server =
True 1006 elif exp_rule.name.endswith(
"/feedback")
and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
1007 action_name = exp_rule.name[:-len(
"/feedback")]
1008 action_server =
True 1009 elif exp_rule.name.endswith(
"/status")
and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
1010 action_name = exp_rule.name[:-len(
"/status")]
1011 action_server =
True 1012 elif exp_rule.name.endswith(
"/result")
and exp_rule.type == gateway_msgs.ConnectionType.PUBLISHER:
1013 action_name = exp_rule.name[:-len(
"/result")]
1014 action_server =
True 1017 if action_client
and len([ a
for a
in result_list
if a.name == action_name
and a.type == gateway_msgs.ConnectionType.ACTION_CLIENT
and a.node == exp_rule.node]) == 0:
1018 result_rule = gateway_msgs.Rule(name=action_name, type=gateway_msgs.ConnectionType.ACTION_CLIENT, node=exp_rule.node)
1019 elif action_server
and len([ a
for a
in result_list
if a.name == action_name
and a.type == gateway_msgs.ConnectionType.ACTION_SERVER
and a.node == exp_rule.node]) == 0:
1020 result_rule = gateway_msgs.Rule(name=action_name, type=gateway_msgs.ConnectionType.ACTION_SERVER, node=exp_rule.node)
1021 elif not action_client
and not action_client:
1022 result_rule = exp_rule
1024 if result_rule
is not None:
1025 if isinstance(rule, RemoteRule):
1026 result_rule = RemoteRule(rule.gateway, result_rule)
1027 result_list.append(result_rule)
def mark_named_gateway_available(self, gateway_key, available=True, time_since_last_seen=0.0)
def update_named_gateway_latency_stats(self, gateway_name, latency_stats)
def register_gateway(self, firewall, unique_gateway_name, hub_connection_lost_gateway_hook, gateway_ip)
Hub Connections.
def _resend_all_flip_ins(self)
Flip specific communication.
def get_multiple_flip_request_status(self, remote_rules)
def _parse_redis_float(self, val)
def remove_flip_details(self, gateway, name, connection_type, node)
def _send_unflip_request(self, remote_gateway, rule)
hub_connection_checker_thread
def get_flip_request_status(self, remote_rule)
def update_flip_request_status(self, registration_with_status)
def get_unblocked_flipped_in_connections(self)
def unregister_named_gateway(self, gateway_key)
def send_unflip_request(self, remote_gateway, rule)
def __init__(self, ip, port, whitelist, blacklist)
def matches_remote_gateway_name(self, gateway)
def _parse_redis_int(self, val)
_hub_connection_lost_gateway_hook
def _hub_connection_lost_hook(self)
def matches_remote_gateway_basename(self, gateway)
def list_remote_gateway_names(self)
def remote_gateway_info(self, gateway)
Hub Data Retrieval.
def is_named_gateway_registered(self, gateway_key)
def send_flip_request(self, remote_gateway, connection, timeout=15.0)
def is_gateway_registered(self)
def publish_network_statistics(self, statistics)
def rule_assemble(self, rule_list)
Redis Connection Checker.
def _parse_redis_bool(self, val)
def unadvertise(self, connection)
def get_remote_gateway_firewall_flag(self, gateway)
def __init__(self, ip, port, hub_connection_lost_hook)
def get_remote_connection_state(self, remote_gateway)
def update_multiple_flip_request_status(self, registrations_with_status)
def get_local_advertisements(self)
def advertise(self, connection)
Posting Information to the Hub.
def post_flip_details(self, gateway, name, connection_type, node)
_hub_connection_lost_hook
def rule_explode(self, rule_list)
def post_pull_details(self, gateway, name, connection_type, node)
def remove_pull_details(self, gateway, name, connection_type, node)