16 import gateway_msgs.msg
as gateway_msgs
17 import gateway_msgs.srv
as gateway_srvs
18 import rocon_python_comms
20 from gateway_msgs.msg
import RemoteRuleWithStatus
as FlipStatus
23 from .
import ros_parameters
25 from .flipped_interface
import FlippedInterface
26 from .public_interface
import PublicInterface
27 from .pulled_interface
import PulledInterface
28 from .master_api
import LocalMaster
29 from .network_interface_manager
import NetworkInterfaceManager
39 Used to synchronise with hubs. 42 def __init__(self, hub_manager, param, unique_name, publish_gateway_info_callback):
44 @param hub_manager : container for all the hubs this gateway connects to 45 @type hub_api.HubManmager 47 @param param : parameters set by ros_parameters.py 48 @type : dictionary of parameter key-value pairs 50 @param unique_name : gateway name (param['name']) with unique uuid hash appended 52 @param publish_gateway_info_callback : callback for publishing gateway info 59 self.
master = LocalMaster()
60 except rocon_python_comms.NotFoundException
as exc:
61 rospy.logwarn(str(exc))
62 rospy.logwarn(
"Cannot create Gateway's LocalMaster. Retrying...")
64 rospy.rostime.wallsleep(1)
66 self.
ip = self.master.get_ros_ip()
70 default_rule_blacklist = ros_parameters.generate_rules(self.
_param[
"default_blacklist"])
71 default_rules, all_targets = ros_parameters.generate_remote_rules(self.
_param[
"default_flips"])
73 firewall=self.
_param[
'firewall'],
74 default_rule_blacklist=default_rule_blacklist,
75 default_rules=default_rules,
76 all_targets=all_targets)
77 default_rules, all_targets = ros_parameters.generate_remote_rules(self.
_param[
"default_pulls"])
79 default_rules=default_rules,
80 all_targets=all_targets)
82 default_rule_blacklist=default_rule_blacklist,
83 default_rules=ros_parameters.generate_rules(self.
_param[
'default_advertisements']))
84 if self.
_param[
'advertise_all']:
86 self.public_interface.advertise_all([])
92 if not rospy.core.is_initialized():
93 raise rospy.exceptions.ROSInitException(
"client code must call rospy.init_node() first")
94 rospy.logdebug(
"node[%s, %s] entering spin(), pid[%s]", rospy.core.get_caller_id(), rospy.core.get_node_uri(), os.getpid())
96 while not rospy.core.is_shutdown():
98 remote_gateway_hub_index = self.hub_manager.create_remote_gateway_hub_index()
100 with self.master.get_connection_state()
as connections:
105 registrations = self.hub_manager.get_flip_requests()
107 rospy.rostime.wallsleep(1)
108 except KeyboardInterrupt:
109 rospy.logdebug(
"keyboard interrupt, shutting down")
110 rospy.core.signal_shutdown(
'keyboard interrupt')
114 We often check if we're connected to any hubs often just to ensure we 115 don't waste time processing if there is no-one listening. 117 @return True if at least one hub is connected, False otherwise 120 return self.hub_manager.is_connected()
124 Disengage from the specified hub. Don't actually need to clean up connections 125 here like we do in shutdown - that can be handled from the watcher thread itself. 127 @param hub : the hub that will be deleted. 129 self.hub_manager.disengage_hub(hub)
138 Process the list of local connections and check against 139 the current flip rules and patterns for changes. If a rule 140 has become (un)available take appropriate action. 142 @param local_connection_index : list of current local connections 143 @type : dictionary of ConnectionType.xxx keyed sets of utils.Connections 145 @param gateways : list of remote gateway string id's 148 state_changed =
False 151 flipped_connections = self.flipped_interface.get_flipped_connections()
152 for flip
in flipped_connections:
153 if flip.remote_rule.gateway
in remote_gateway_hub_index:
154 for hub
in remote_gateway_hub_index[flip.remote_rule.gateway]:
155 status = hub.get_flip_request_status(flip.remote_rule)
156 if status == FlipStatus.RESEND:
157 rospy.loginfo(
"Gateway : resend requested for flip request [%s]%s" %
158 (flip.remote_rule.gateway, utils.format_rule(flip.remote_rule.rule)))
160 self.flipped_interface.remove_flip(flip.remote_rule)
161 hub.send_unflip_request(flip.remote_rule.gateway, flip.remote_rule.rule)
162 hub.remove_flip_details(flip.remote_rule.gateway,
163 flip.remote_rule.rule.name,
164 flip.remote_rule.rule.type,
165 flip.remote_rule.rule.node)
168 new_flips, lost_flips = self.flipped_interface.update(
170 for connection_type
in utils.connection_types:
171 for flip
in new_flips[connection_type]:
172 firewall_flag = self.hub_manager.get_remote_gateway_firewall_flag(flip.gateway)
177 connections = self.master.generate_connection_details(flip.rule.type, flip.rule.name, flip.rule.node)
178 if (connection_type == gateway_msgs.ConnectionType.ACTION_CLIENT
or 179 connection_type == gateway_msgs.ConnectionType.ACTION_SERVER):
180 rospy.loginfo(
"Gateway : sending flip request [%s]%s" %
181 (flip.gateway, utils.format_rule(flip.rule)))
182 hub = remote_gateway_hub_index[flip.gateway][0]
183 hub.post_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
184 for connection
in connections:
185 hub.send_flip_request(flip.gateway, connection)
187 for connection
in connections:
188 rospy.loginfo(
"Gateway : sending flip request [%s]%s" %
189 (flip.gateway, utils.format_rule(flip.rule)))
190 hub = remote_gateway_hub_index[flip.gateway][0]
191 hub.send_flip_request(flip.gateway, connection)
192 hub.post_flip_details(
193 flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
194 for flip
in lost_flips[connection_type]:
196 rospy.loginfo(
"Gateway : sending unflip request [%s]%s" % (flip.gateway, utils.format_rule(flip.rule)))
197 for hub
in remote_gateway_hub_index[flip.gateway]:
198 rule = copy.deepcopy(flip.rule)
199 if hub.send_unflip_request(flip.gateway, rule):
201 hub.remove_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
205 flipped_connections = self.flipped_interface.get_flipped_connections()
207 for flip
in flipped_connections:
208 for hub
in remote_gateway_hub_index[flip.remote_rule.gateway]:
209 remote_rule = copy.deepcopy(flip.remote_rule)
210 remote_rule.rule.node = remote_rule.rule.node.split(
",")[0]
211 status = hub.get_flip_request_status(remote_rule)
212 if status
is not None:
213 flip_state_changed = self.flipped_interface.update_flip_status(flip.remote_rule, status)
214 state_changed = state_changed
or flip_state_changed
222 Process the list of local connections and check against 223 the current pull rules and patterns for changes. If a rule 224 has become (un)available take appropriate action. 226 This is called by the watcher thread. The remote_gateway_hub_index 227 is always a full dictionary of all remote gateway and hub key-value 228 pairs - it is only included as an argument here to save 229 processing doubly in the watcher thread. 231 @param connections : list of current local connections parsed from the master 232 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections 234 @param remote_gateway_hub_index : key-value remote gateway name-hub list pairs 235 @type dictionary of remote_gateway_name-list of hub_api.Hub objects key-value pairs 237 state_changed =
False 238 remote_connections = {}
239 for remote_gateway
in remote_gateway_hub_index.keys() + self.pulled_interface.list_remote_gateway_names():
240 remote_connections[remote_gateway] = {}
242 for hub
in remote_gateway_hub_index[remote_gateway]:
243 remote_connections[remote_gateway].update(hub.get_remote_connection_state(remote_gateway))
246 new_pulls, lost_pulls = self.pulled_interface.update(remote_connections, self.
_unique_name)
247 for connection_type
in utils.connection_types:
248 for pull
in new_pulls[connection_type]:
251 for remote_gateway
in remote_connections.keys():
252 for c
in remote_connections[remote_gateway][pull.rule.type]:
253 if c.rule.name == pull.rule.name
and \
254 c.rule.node == pull.rule.node:
260 existing_registration = self.pulled_interface.find_registration_match(
261 pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type)
262 if not existing_registration:
263 rospy.loginfo(
"Gateway : pulling in connection %s[%s]" %
264 (utils.format_rule(pull.rule), remote_gateway))
266 new_registration = self.master.register(registration)
267 if new_registration
is not None:
268 self.pulled_interface.registrations[registration.connection.rule.type].append(new_registration)
269 hub = remote_gateway_hub_index[pull.gateway][0]
270 hub.post_pull_details(pull.gateway, pull.rule.name, pull.rule.type, pull.rule.node)
272 for pull
in lost_pulls[connection_type]:
274 existing_registration = self.pulled_interface.find_registration_match(
275 pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type)
276 if existing_registration:
277 rospy.loginfo(
"Gateway : abandoning pulled connection %s[%s]" % (
278 utils.format_rule(pull.rule), pull.gateway))
279 self.master.unregister(existing_registration)
285 self.pulled_interface.registrations[
286 existing_registration.connection.rule.type].remove(existing_registration)
293 Process the list of local connections and check against 294 the current rules and patterns for changes. If a rule 295 has become (un)available take appropriate action. 297 @param local_connection_index : list of current local connections parsed from the master 298 @type : { utils.ConnectionType.xxx : utils.Connection[] } dictionaries 300 state_changed =
False 302 new_conns, lost_conns = self.public_interface.update(
303 local_connection_index, self.master.generate_advertisement_connection_details)
305 public_interface = self.public_interface.getInterface()
306 for connection_type
in utils.connection_types:
307 for new_connection
in new_conns[connection_type]:
308 rospy.loginfo(
"Gateway : adding connection to public interface %s" %
309 utils.format_rule(new_connection.rule))
310 self.hub_manager.advertise(new_connection)
312 for lost_connection
in lost_conns[connection_type]:
313 rospy.loginfo(
"Gateway : removing connection from public interface %s" %
314 utils.format_rule(lost_connection.rule))
315 self.hub_manager.unadvertise(lost_connection)
319 return public_interface
323 Match the flipped in connections to supplied registrations using 324 supplied registrations, flipping and unflipping as necessary. 326 @param registrations : registrations (with status) to be processed 327 @type list of (utils.Registration, str) where the str contains the status 331 for gateway
in remote_gateway_hub_index:
332 for hub
in remote_gateway_hub_index[gateway]:
335 update_flip_status = {}
336 if self.flipped_interface.firewall:
337 if len(registrations) != 0:
338 rospy.logwarn(
"Gateway : firewalled, but received flip requests...")
339 for (registration, status)
in registrations:
340 for hub
in remote_gateway_hub_index[registration.remote_gateway]:
341 if hub.uri
not in update_flip_status:
342 update_flip_status[hub.uri] = []
343 update_flip_status[hub.uri].append((registration, FlipStatus.BLOCKED))
346 for hub_uri, hub
in hubs.iteritems():
347 if hub_uri
in update_flip_status:
348 hub.update_multiple_flip_request_status(update_flip_status[hub_uri])
351 state_changed =
False 354 for (registration, status)
in registrations:
356 existing_registration = self.flipped_interface.find_registration_match(
357 registration.remote_gateway,
358 registration.connection.rule.name,
359 registration.connection.rule.node,
360 registration.connection.rule.type)
361 if not existing_registration:
362 rospy.loginfo(
"Gateway : received a flip request %s" % str(registration))
364 new_registration = self.master.register(registration)
365 if new_registration
is not None:
366 self.flipped_interface.registrations[registration.connection.rule.type].append(new_registration)
368 if status != FlipStatus.ACCEPTED:
369 for hub
in remote_gateway_hub_index[registration.remote_gateway]:
370 if hub.uri
not in update_flip_status:
371 update_flip_status[hub.uri] = []
372 update_flip_status[hub.uri].append((registration, FlipStatus.ACCEPTED))
375 for hub_uri, hub
in hubs.iteritems():
376 if hub_uri
in update_flip_status:
377 hub.update_multiple_flip_request_status(update_flip_status[hub_uri])
380 local_registrations = copy.deepcopy(self.flipped_interface.registrations)
381 for connection_type
in utils.connection_types:
382 for local_registration
in local_registrations[connection_type]:
383 matched_registration =
None 384 for (registration, status)
in registrations:
385 if registration.connection == local_registration.connection
and \
386 registration.remote_gateway == local_registration.remote_gateway:
387 matched_registration = registration
391 if matched_registration
is None:
393 rospy.loginfo(
"Gateway : unflipping received flip %s" % str(local_registration))
394 self.master.unregister(local_registration)
395 self.flipped_interface.registrations[connection_type].remove(local_registration)
402 If we are running over a wired connection, then do nothing. 403 If over the wireless, updated data transfer rate and signal strength 404 for this gateway on the hub 406 statistics = self.network_interface_manager.get_statistics()
407 self.hub_manager.publish_network_statistics(statistics)
436 Puts/Removes a number of rules on the public interface watchlist. 437 As local rules matching these rules become available/go away, 438 the public interface is modified accordingly. A manual update is done 439 at the end of the advertise call to quickly capture existing 443 @type gateway_srvs.AdvertiseRequest 444 @return service response 445 @rtgateway_srvs.srv.AdvertiseReponse 447 response = gateway_srvs.AdvertiseResponse()
449 if not request.cancel:
450 for rule
in request.rules:
451 if not self.public_interface.add_rule(rule):
452 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS
453 response.error_message =
"advertisment rule already exists [%s:(%s,%s)]" % (
454 rule.name, rule.type, rule.node)
456 for rule
in request.rules:
457 if not self.public_interface.remove_rule(rule):
458 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_NOT_FOUND
459 response.error_message =
"advertisment not found [%s:(%s,%s)]" % (
460 rule.name, rule.type, rule.node)
461 except Exception
as e:
462 rospy.logerr(
"Gateway : unknown advertise error [%s]." % str(e))
463 response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR
466 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
470 rospy.logerr(
"Gateway : %s." % response.error_message)
471 response.watchlist = self.public_interface.getWatchlist()
476 Toggles the advertise all mode. If advertising all, an additional 477 blacklist parameter can be supplied which includes all the topics that 478 will not be advertised/watched for. This blacklist is added to the 479 default blacklist of the public interface 482 @type gateway_srvs.AdvertiseAllRequest 483 @return service response 484 @rtype gateway_srvs.AdvertiseAllReponse 486 response = gateway_srvs.AdvertiseAllResponse()
488 if not request.cancel:
489 if not self.public_interface.advertise_all(request.blacklist):
490 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS
491 response.error_message =
"already advertising all." 493 self.public_interface.unadvertise_all()
494 except Exception
as e:
495 response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR
496 response.error_message =
"unknown advertise all error [%s]" % (str(e))
499 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
503 rospy.logerr(
"Gateway : %s." % response.error_message)
504 response.blacklist = self.public_interface.getBlacklist()
509 Puts flip rules on a watchlist which (un)flips them when they 510 become (un)available. 513 @type gateway_srvs.RemoteRequest 514 @return service response 515 @rtype gateway_srvs.RemoteResponse 523 response = gateway_srvs.RemoteResponse(gateway_msgs.ErrorCodes.SUCCESS,
"")
527 if not request.cancel:
533 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
537 rospy.logerr(
"Gateway : %s." % response.error_message)
542 Flips everything except a specified blacklist to a particular gateway, 543 or if the cancel flag is set, clears all flips to that gateway. 546 @type gateway_srvs.RemoteAllRequest 547 @return service response 548 @rtype gateway_srvs.RemoteAllResponse 550 response = gateway_srvs.RemoteAllResponse()
553 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
554 if not request.cancel:
555 if self.flipped_interface.flip_all(remote_gateway_target_hash_name, request.blacklist):
556 rospy.loginfo(
"Gateway : flipping all to gateway '%s'" % (remote_gateway_target_hash_name))
558 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
559 response.error_message =
"already flipping all to gateway '%s' " + remote_gateway_target_hash_name
561 self.flipped_interface.unflip_all(remote_gateway_target_hash_name)
562 rospy.loginfo(
"Gateway : cancelling a previous flip all request [%s]" % (request.gateway))
563 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
567 rospy.logerr(
"Gateway : %s." % response.error_message)
572 Puts a single rule on a watchlist and pulls it from a particular 573 gateway when it becomes (un)available. 576 @type gateway_srvs.RemoteRequest 577 @return service response 578 @rtype gateway_srvs.RemoteResponse 586 response = gateway_srvs.RemoteResponse(gateway_msgs.ErrorCodes.SUCCESS,
"")
590 for remote
in request.remotes:
591 if not request.cancel:
592 pull_rule = self.pulled_interface.add_rule(remote)
594 added_rules.append(pull_rule)
595 rospy.loginfo(
"Gateway : added pull rule [%s:(%s,%s)]" %
596 (pull_rule.gateway, pull_rule.rule.name, pull_rule.rule.type))
598 response.result = gateway_msgs.ErrorCodes.PULL_RULE_ALREADY_EXISTS
599 response.error_message =
"pull rule already exists [%s:(%s,%s)]" % (
600 remote.gateway, remote.rule.name, remote.rule.type)
603 for remote
in request.remotes:
604 removed_pull_rules = self.pulled_interface.remove_rule(remote)
605 if removed_pull_rules:
606 rospy.loginfo(
"Gateway : removed pull rule [%s:%s]" % (remote.gateway, remote.rule.name))
607 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
612 for added_rule
in added_rules:
613 self.pulled_interface.remove_rule(added_rule)
614 rospy.logerr(
"Gateway : %s." % response.error_message)
619 Pull everything except a specified blacklist from a particular gateway, 620 or if the cancel flag is set, clears all pulls from that gateway. 623 @type gateway_srvs.RemoteAllRequest 624 @return service response 625 @rtype gateway_srvs.RemoteAllResponse 627 response = gateway_srvs.RemoteAllResponse()
630 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
631 if not request.cancel:
632 if self.pulled_interface.pull_all(remote_gateway_target_hash_name, request.blacklist):
633 rospy.loginfo(
"Gateway : pulling all from gateway [%s]" % (request.gateway))
635 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
636 response.error_message =
"already pulling all from gateway [%s] " % (request.gateway)
638 self.pulled_interface.unpull_all(remote_gateway_target_hash_name)
639 rospy.loginfo(
"Gateway : cancelling a previous pull all request [%s]" % (request.gateway))
640 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
644 rospy.logerr(
"Gateway : %s." % response.error_message)
649 Some simple checks when pulling or flipping to make sure that the remote gateway is visible. It 650 does a strict check on the hash names first, then falls back to looking for weak matches on the 653 @param gateway : remote gateway target name (can be hash name, basename or regex pattern) 655 @return pair of result type and message 656 @rtype gateway_msgs.ErrorCodes.xxx, string 659 return None, gateway_msgs.ErrorCodes.NO_HUB_CONNECTION,
"not connected to hub, aborting" 661 return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_SELF_IS_NOT,
"gateway cannot flip/pull to itself" 662 return gateway, gateway_msgs.ErrorCodes.SUCCESS,
"" 681 Check given gateways in remote rules are valid 683 :param remotes: remote rules 684 :type remotes: gateway_msgs.RemoteRule[] 686 :return: whether it is valid, error message if it fails 687 :rtypes: None or gateway_srvs.RemoteResponse 689 response = gateway_srvs.RemoteResponse()
690 for remote
in remotes:
692 if response.result != gateway_msgs.ErrorCodes.SUCCESS:
693 rospy.logerr(
"Gateway : %s." % response.error_message)
699 Add given rules into watcher list 701 :param remotes: remote rules 702 :type remotes: gateway_msgs.RemoteRule[] 703 :return: whether it is successful 704 :rtypes: gateway_srvs.RemoteResponse 706 response = gateway_srvs.RemoteResponse()
707 response.result = gateway_msgs.ErrorCodes.SUCCESS
710 for remote
in remotes:
711 flip_rule = self.flipped_interface.add_rule(remote)
713 added_rules.append(flip_rule)
714 rospy.loginfo(
"Gateway : added flip rule [%s:(%s,%s)]" % (flip_rule.gateway, flip_rule.rule.name, flip_rule.rule.type))
716 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
717 response.error_message =
"flip rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
719 if response.result != gateway_msgs.ErrorCodes.SUCCESS:
721 for added_rule
in added_rules:
722 self.flipped_interface.remove_rule(added_rule)
727 remove given rules into watcher list 729 :param remotes: remote rules 730 :type remotes: gateway_msgs.RemoteRule[] 731 :return: whether it is successful 732 :rtypes: gateway_srvs.RemoteResponse 734 response = gateway_srvs.RemoteResponse()
735 response.result = gateway_msgs.ErrorCodes.SUCCESS
737 for remote
in remotes:
738 removed_flip_rules = self.flipped_interface.remove_rule(remote)
739 if removed_flip_rules:
740 rospy.loginfo(
"Gateway : removed flip rule [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type))
def ros_service_flip_all(self, request)
def ros_service_flip(self, request)
def ros_service_pull_all(self, request)
network_interface_manager
def _ros_service_remote_checks(self, gateway)
def disengage_hub(self, hub)
def update_public_interface(self, local_connection_index)
def update_pulled_interface(self, unused_connections, remote_gateway_hub_index)
def _check_remote_gateways(self, remotes)
def __init__(self, hub_manager, param, unique_name, publish_gateway_info_callback)
def update_flipped_in_interface(self, registrations, remote_gateway_hub_index)
def ros_service_advertise(self, request)
Incoming commands from local system (ros service callbacks)
def ros_service_pull(self, request)
def ros_service_advertise_all(self, request)
def _add_flip_rules(self, remotes)
def update_flipped_interface(self, local_connection_index, remote_gateway_hub_index)
Update interface states (jobs assigned from connection_cache callback thread)
def _remove_flip_rules(self, remotes)
def update_network_information(self)