gateway_sync.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #       
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/master/rocon_gateway/LICENSE 
00005 #
00006 
00007 ##############################################################################
00008 # Imports
00009 ##############################################################################
00010 
00011 import httplib
00012 
00013 import roslib
00014 roslib.load_manifest('rocon_gateway')
00015 import rospy
00016 
00017 # Ros msgs
00018 import gateway_msgs.msg
00019 import gateway_msgs.srv
00020 from gateway_msgs.msg import Rule
00021 from gateway_msgs.srv import AdvertiseResponse
00022 from gateway_msgs.srv import AdvertiseAllResponse
00023 
00024 # Local imports
00025 import utils
00026 import ros_parameters
00027 from .hub_api import Hub
00028 from .master_api import LocalMaster
00029 from .watcher_thread import WatcherThread
00030 from .exceptions import UnavailableGatewayError
00031 from .flipped_interface import FlippedInterface
00032 from .public_interface import PublicInterface
00033 from .pulled_interface import PulledInterface
00034 
00035 ##############################################################################
00036 # Gateway
00037 ##############################################################################
00038 
00039 
00040 class GatewaySync(object):
00041     '''
00042     The gateway between ros system and redis server
00043     '''
00044 
00045     def __init__(self, param):
00046         self.param = param
00047         self.unresolved_name = self.param['name']  # This gets used to build unique names after rule to the hub
00048         self.unique_name = None  # single string value set after hub rule (note: it is not a redis rocon:: rooted key!)
00049         self._ip = None
00050         self.is_connected = False
00051         default_rule_blacklist = ros_parameters.generate_rules(self.param["default_blacklist"])
00052 
00053         default_rules, all_targets = ros_parameters.generate_remote_rules(self.param["default_flips"])
00054         self.flipped_interface = FlippedInterface(
00055                                                   firewall = self.param['firewall'],
00056                                                   default_rule_blacklist = default_rule_blacklist,
00057                                                   default_rules = default_rules,
00058                                                   all_targets = all_targets)
00059         default_rules, all_targets = ros_parameters.generate_remote_rules(self.param["default_pulls"])
00060         self.pulled_interface = PulledInterface(default_rule_blacklist = default_rule_blacklist,
00061                                                 default_rules = default_rules,
00062                                                 all_targets = all_targets)
00063         self.public_interface = PublicInterface(default_rule_blacklist=default_rule_blacklist,
00064                                                 default_rules = ros_parameters.generate_rules(self.param['default_advertisements'])
00065                                                 )
00066         if self.param['advertise_all']:
00067             self.public_interface.advertiseAll([])  # no extra blacklist beyond the default (keeping it simple in yaml for now)
00068         self.master = LocalMaster()
00069         self.remote_gateway_request_callbacks = {}
00070         self.remote_gateway_request_callbacks['flip'] = self.process_remote_gateway_flip_request
00071         self.remote_gateway_request_callbacks['unflip'] = self.process_remote_gateway_unflip_request
00072         self.hub = Hub(self.remote_gateway_request_callbacks, self.unresolved_name, firewall=self.param['firewall'])
00073 
00074         # create a thread to watch local rule states
00075         self.watcher_thread = WatcherThread(self, self.param['watch_loop_period'])
00076 
00077     ##########################################################################
00078     # Rule Logic
00079     ##########################################################################
00080 
00081     def connect_to_hub(self, ip, port):
00082         try:
00083             self.hub.connect(ip, port)
00084             self._ip = self.master.get_ros_ip()
00085             self.unique_name = self.hub.register_gateway(self._ip)
00086             self.is_connected = True
00087         except Exception as e:
00088             print "Exception"
00089             rospy.logerr("Gateway : error connecting to the hub %s" % str(e))
00090             return False
00091         return True
00092 
00093     def shutdown(self):
00094         self.watcher_thread.shutdown()
00095         for connection_type in utils.connection_types:
00096             for flip in self.flipped_interface.flipped[connection_type]:
00097                 self.hub.send_unflip_request(flip.gateway, flip.rule)
00098             for registration in self.flipped_interface.registrations[connection_type]:
00099                 self.master.unregister(registration)
00100         self.hub.unregister_gateway()
00101 
00102     ##########################################################################
00103     # Incoming commands from local system (ros service callbacks)
00104     ##########################################################################
00105 
00106     def ros_service_advertise(self, request):
00107         '''
00108           Puts/Removes a number of rules on the public interface watchlist.
00109           As local rules matching these rules become available/go away,
00110           the public interface is modified accordingly. A manual update is done
00111           at the end of the advertise call to quickly capture existing
00112           rules
00113 
00114           @param request
00115           @type gateway_msgs.srv.AdvertiseRequest
00116           @return service response
00117           @rtype gateway_msgs.srv.AdvertiseReponse
00118         '''
00119         response = gateway_msgs.srv.AdvertiseResponse()
00120         response.result, response.error_message = self._ros_service_advertise_checks()
00121         if response.result == gateway_msgs.msg.Result.SUCCESS:
00122             try:
00123                 if not request.cancel:
00124                     for rule in request.rules:
00125                         if not self.public_interface.add_rule(rule):
00126                             response.result = gateway_msgs.msg.Result.ADVERTISEMENT_EXISTS
00127                             response.error_message = "advertisment rule already exists [%s:(%s,%s)]" % (rule.name, rule.type, rule.node)
00128                 else:
00129                     for rule in request.rules:
00130                         if not self.public_interface.remove_rule(rule):
00131                             response.result = gateway_msgs.msg.Result.ADVERTISEMENT_NOT_FOUND
00132                             response.error_message = "advertisment not found [%s:(%s,%s)]" % (rule.name, rule.type, rule.node)
00133             except Exception as e:
00134                 rospy.logerr("Gateway : unknown advertise error [%s]." % str(e))
00135                 response.result = gateway_msgs.msg.Result.UNKNOWN_ADVERTISEMENT_ERROR
00136 
00137         # Let the watcher get on with the update asap
00138         if response.result == gateway_msgs.msg.Result.SUCCESS:
00139             self.watcher_thread.trigger_update = True
00140         else:
00141             rospy.logerr("Gateway : %s." % response.error_message)
00142         response.watchlist = self.public_interface.getWatchlist()
00143         return response
00144 
00145     def ros_service_advertise_all(self, request):
00146         '''
00147           Toggles the advertise all mode. If advertising all, an additional
00148           blacklist parameter can be supplied which includes all the topics that
00149           will not be advertised/watched for. This blacklist is added to the
00150           default blacklist of the public interface
00151 
00152           @param request
00153           @type gateway_msgs.srv.AdvertiseAllRequest
00154           @return service response
00155           @rtype gateway_msgs.srv.AdvertiseAllReponse
00156         '''
00157         response = gateway_msgs.srv.AdvertiseAllResponse()
00158         response.result, response.error_message = self._ros_service_advertise_checks()
00159         if response.result == gateway_msgs.msg.Result.SUCCESS:
00160             try:
00161                 if not request.cancel:
00162                     if not self.public_interface.advertiseAll(request.blacklist):
00163                         response.result = gateway_msgs.msg.Result.ADVERTISEMENT_EXISTS
00164                         response.error_message = "already advertising all."
00165                 else:
00166                     self.public_interface.unadvertiseAll()
00167             except Exception as e:
00168                 response.result = gateway_msgs.msg.Result.UNKNOWN_ADVERTISEMENT_ERROR
00169                 response.error_message = "unknown advertise all error [%s]" % (str(e))
00170 
00171         # Let the watcher get on with the update asap
00172         if response.result == gateway_msgs.msg.Result.SUCCESS:
00173             self.watcher_thread.trigger_update = True
00174         else:
00175             rospy.logerr("Gateway : %s." % response.error_message)
00176         response.blacklist = self.public_interface.getBlacklist()
00177         return response
00178 
00179     def ros_service_flip(self, request):
00180         '''
00181           Puts flip rules on a watchlist which (un)flips them when they
00182           become (un)available.
00183 
00184           @param request
00185           @type gateway_msgs.srv.RemoteRequest
00186           @return service response
00187           @rtype gateway_msgs.srv.RemoteResponse
00188         '''
00189         response = gateway_msgs.srv.RemoteResponse()
00190         for remote in request.remotes:
00191             response.result, response.error_message = self._ros_service_flip_checks(remote.gateway)
00192             if response.result != gateway_msgs.msg.Result.SUCCESS:
00193                 rospy.logerr("Gateway : %s." % response.error_message)
00194                 return response
00195 
00196         # result is currently SUCCESS
00197         added_rules = []
00198         for remote in request.remotes:
00199             if not request.cancel:
00200                 flip_rule = self.flipped_interface.add_rule(remote)
00201                 if flip_rule:
00202                     added_rules.append(flip_rule)
00203                     rospy.loginfo("Gateway : added flip rule [%s:(%s,%s)]" % (flip_rule.gateway, flip_rule.rule.name, flip_rule.rule.type))
00204                 else:
00205                     response.result = gateway_msgs.msg.Result.FLIP_RULE_ALREADY_EXISTS
00206                     response.error_message = "flip rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
00207                     break
00208             else:  # request.cancel
00209                 removed_flip_rules = self.flipped_interface.remove_rule(remote)
00210                 if removed_flip_rules:
00211                     rospy.loginfo("Gateway : removed flip rule [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type))
00212 
00213         if response.result == gateway_msgs.msg.Result.SUCCESS:
00214             self.watcher_thread.trigger_update = True
00215         else:
00216             if added_rules:  # completely abort any added rules
00217                 for added_rule in added_rules:
00218                     self.flipped_interface.remove_rule(added_rule)
00219             rospy.logerr("Gateway : %s." % response.error_message)
00220         return response
00221 
00222     def ros_service_flip_all(self, request):
00223         '''
00224           Flips everything except a specified blacklist to a particular gateway,
00225           or if the cancel flag is set, clears all flips to that gateway.
00226 
00227           @param request
00228           @type gateway_msgs.srv.RemoteAllRequest
00229           @return service response
00230           @rtype gateway_msgs.srv.RemoteAllResponse
00231         '''
00232         response = gateway_msgs.srv.RemoteAllResponse()
00233         response.result, response.error_message = self._ros_service_flip_checks(request.gateway)
00234         if response.result == gateway_msgs.msg.Result.SUCCESS:
00235             if not request.cancel:
00236                 if self.flipped_interface.flip_all(request.gateway, request.blacklist):
00237                     rospy.loginfo("Gateway : flipping all to gateway '%s'" % (request.gateway))
00238                 else:
00239                     response.result = gateway_msgs.msg.Result.FLIP_RULE_ALREADY_EXISTS
00240                     response.error_message = "already flipping all to gateway '%s' " + request.gateway
00241             else:  # request.cancel
00242                 self.flipped_interface.un_flip_all(request.gateway)
00243                 rospy.loginfo("Gateway : cancelling a previous flip all request [%s]" % (request.gateway))
00244         if response.result == gateway_msgs.msg.Result.SUCCESS:
00245             self.watcher_thread.trigger_update = True
00246         else:
00247             rospy.logerr("Gateway : %s." % response.error_message)
00248         return response
00249 
00250     def ros_service_pull(self, request):
00251         '''
00252           Puts a single rule on a watchlist and (un)flips it to a particular
00253           gateway when it becomes (un)available. Note that this can also
00254           completely reconfigure the fully qualified name for the rule when
00255           flipping (remapping). If not specified, it will simply reroot rule
00256           under <unique_gateway_name>.
00257 
00258           @param request
00259           @type gateway_msgs.srv.RemoteRequest
00260           @return service response
00261           @rtype gateway_msgs.srv.RemoteResponse
00262         '''
00263         response = gateway_msgs.srv.RemoteResponse()
00264         for remote in request.remotes:
00265             response.result, response.error_message = self._ros_service_flip_checks(remote.gateway)
00266             if response.result != gateway_msgs.msg.Result.SUCCESS:
00267                 rospy.logerr("Gateway : %s." % response.error_message)
00268                 return response
00269 
00270         # result is currently SUCCESS
00271         added_rules = []
00272         for remote in request.remotes:
00273             if not request.cancel:
00274                 pull_rule = self.pulled_interface.add_rule(remote)
00275                 if pull_rule:
00276                     added_rules.append(pull_rule)
00277                     rospy.loginfo("Gateway : added pull rule [%s:(%s,%s)]" % (pull_rule.gateway, pull_rule.rule.name, pull_rule.rule.type))
00278                 else:
00279                     response.result = gateway_msgs.msg.Result.PULL_RULE_ALREADY_EXISTS
00280                     response.error_message = "pull rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
00281                     break
00282             else:  # request.cancel
00283                 for remote in request.remotes:
00284                     removed_pull_rules = self.pulled_interface.remove_rule(remote)
00285                     if removed_pull_rules:
00286                         rospy.loginfo("Gateway : removed pull rule [%s:%s]" % (remote.gateway, remote.rule.name))
00287         if response.result == gateway_msgs.msg.Result.SUCCESS:
00288             self.watcher_thread.trigger_update = True
00289         else:
00290             if added_rules:  # completely abort any added rules
00291                 for added_rule in added_rules:
00292                     self.pulled_interface.remove_rule(added_rule)
00293             rospy.logerr("Gateway : %s." % response.error_message)
00294         return response
00295 
00296     def ros_service_pull_all(self, request):
00297         '''
00298           Pull everything except a specified blacklist from a particular gateway,
00299           or if the cancel flag is set, clears all pulls from that gateway.
00300 
00301           @param request
00302           @type gateway_msgs.srv.RemoteAllRequest
00303           @return service response
00304           @rtype gateway_msgs.srv.RemoteAllResponse
00305         '''
00306         response = gateway_msgs.srv.RemoteAllResponse()
00307         response.result, response.error_message = self._ros_service_remote_checks(request.gateway)
00308         if response.result == gateway_msgs.msg.Result.SUCCESS:
00309             if not request.cancel:
00310                 if self.pulled_interface.pull_all(request.gateway, request.blacklist):
00311                     rospy.loginfo("Gateway : pulling all from gateway '%s'" % (request.gateway))
00312                 else:
00313                     response.result = gateway_msgs.msg.Result.FLIP_RULE_ALREADY_EXISTS
00314                     response.error_message = "already pulling all from gateway '%s' " + request.gateway
00315             else:  # request.cancel
00316                 self.pulled_interface.unpull_all(request.gateway)
00317                 rospy.loginfo("Gateway : cancelling a previous pull all request [%s]" % (request.gateway))
00318         if response.result == gateway_msgs.msg.Result.SUCCESS:
00319             self.watcher_thread.trigger_update = True
00320         else:
00321             rospy.logerr("Gateway : %s." % response.error_message)
00322         return response
00323 
00324     def _ros_service_advertise_checks(self):
00325         if not self.is_connected:
00326             return gateway_msgs.msg.Result.NO_HUB_CONNECTION, "not connected to hub, aborting"
00327         else:
00328             return gateway_msgs.msg.Result.SUCCESS, ""
00329 
00330     def _ros_service_flip_checks(self, gateway):
00331         '''
00332           Some simple checks for ros service flips.
00333 
00334           @param gateway : target gateway string of the flip
00335           @type string
00336           @return pair of result type and message
00337           @rtype gateway_msgs.msg.Result.xxx, string
00338         '''
00339         result, error_message = self._ros_service_remote_checks(gateway)
00340         if result == gateway_msgs.msg.Result.SUCCESS:
00341             firewall_flag = False
00342             try:
00343                 firewall_flag = self.hub.get_remote_gateway_firewall_flag(gateway)
00344                 if firewall_flag:
00345                     return gateway_msgs.msg.Result.FLIP_REMOTE_GATEWAY_FIREWALLING, "remote gateway is firewalling flip requests, aborting [%s]" % gateway
00346             except UnavailableGatewayError:
00347                 pass  # handled earlier in rosServiceRemoteChecks
00348         return result, error_message
00349 
00350     def _ros_service_remote_checks(self, gateway):
00351         '''
00352           Some simple checks for ros service pulls
00353 
00354           @param gateway : target gateway string of the pull
00355           @type string
00356           @return pair of result type and message
00357           @rtype gateway_msgs.msg.Result.xxx, string
00358         '''
00359         if not self.is_connected:
00360             return gateway_msgs.msg.Result.NO_HUB_CONNECTION, "not connected to hub, aborting"
00361         elif gateway == self.unique_name:
00362             return gateway_msgs.msg.Result.FLIP_NO_TO_SELF, "gateway cannot flip to itself"
00363         elif not self.hub.matches_remote_gateway_name(gateway):
00364             return gateway_msgs.msg.Result.FLIP_REMOTE_GATEWAY_NOT_CONNECTED, "remote gateway is currently not connected [%s]" % gateway
00365         else:
00366             return gateway_msgs.msg.Result.SUCCESS, ""
00367 
00368     ##########################################################################
00369     # Update interface states (usually from watcher thread)
00370     ##########################################################################
00371 
00372     def update_flip_interface(self, connections, gateways):
00373         '''
00374           Process the list of local connections and check against
00375           the current flip rules and patterns for changes. If a rule
00376           has become (un)available take appropriate action.
00377 
00378           @param connections : list of current local connections parsed from the master
00379           @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
00380 
00381           @param gateways : list of remote gateway string id's
00382           @type string
00383         '''
00384         new_flips, lost_flips = self.flipped_interface.update(connections, gateways, self.unique_name)
00385         for connection_type in connections:
00386             for flip in new_flips[connection_type]:
00387                 # for actions, need to post flip details here
00388                 connections = self.master.generate_connection_details(flip.rule.type, flip.rule.name, flip.rule.node)
00389                 if connection_type == utils.ConnectionType.ACTION_CLIENT or connection_type == utils.ConnectionType.ACTION_SERVER:
00390                     rospy.loginfo("Flipping to %s : %s" % (flip.gateway, utils.formatRule(flip.rule)))
00391                     self.hub.post_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
00392                     for connection in connections:
00393                         self.hub.send_flip_request(flip.gateway, connection)  # flip the individual pubs/subs
00394                 else:
00395                     for connection in connections:
00396                         rospy.loginfo("Flipping to %s : %s" % (flip.gateway, utils.formatRule(connection.rule)))
00397                         self.hub.send_flip_request(flip.gateway, connection)
00398                         self.hub.post_flip_details(flip.gateway, connection.rule.name, connection.rule.type, connection.rule.node)
00399             for flip in lost_flips[connection_type]:
00400                 rospy.loginfo("Unflipping to %s : %s" % (flip.gateway, utils.formatRule(flip.rule)))
00401                 self.hub.send_unflip_request(flip.gateway, flip.rule)
00402                 self.hub.remove_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
00403 
00404     def update_pulled_interface(self, connections, gateways):
00405         '''
00406           Process the list of local connections and check against
00407           the current pull rules and patterns for changes. If a rule
00408           has become (un)available take appropriate action.
00409 
00410           @param connections : list of current local connections parsed from the master
00411           @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
00412 
00413           @param gateways : list of remote gateway string id's
00414           @type string
00415         '''
00416         for gateway in gateways + self.pulled_interface.list_remote_gateway_names():
00417             connections = self.hub.get_remote_connection_state(gateway)
00418             new_pulls, lost_pulls = self.pulled_interface.update(connections, gateway, self.unique_name)
00419             for connection_type in connections:
00420                 for pull in new_pulls[connection_type]:
00421                     for connection in connections[pull.rule.type]:
00422                         if connection.rule.name == pull.rule.name and \
00423                            connection.rule.node == pull.rule.node:
00424                             corresponding_connection = connection
00425                             break
00426                     # Register this pull
00427                     existing_registration = self.pulled_interface.findRegistrationMatch(gateway, pull.rule.name, pull.rule.node, pull.rule.type)
00428                     if not existing_registration:
00429                         registration = utils.Registration(connection, gateway)
00430                         new_registration = self.master.register(registration)
00431                         self.pulled_interface.registrations[registration.connection.rule.type].append(new_registration)
00432                         self.hub.post_pull_details(gateway, pull.rule.name, pull.rule.type, pull.rule.node)
00433                 for pull in lost_pulls[connection_type]:
00434                     # Unregister this pull
00435                     existing_registration = self.pulled_interface.findRegistrationMatch(gateway, pull.rule.name, pull.rule.node, pull.rule.type)
00436                     if existing_registration:
00437                         self.master.unregister(existing_registration)
00438                         self.hub.remove_pull_details(gateway, pull.rule.name, pull.rule.type, pull.rule.node)
00439                         self.pulled_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration)
00440 
00441     def update_public_interface(self, connections=None):
00442         '''
00443           Process the list of local connections and check against
00444           the current rules and patterns for changes. If a rule
00445           has become (un)available take appropriate action.
00446 
00447           @param connections : list of current local connections parsed from the master
00448           @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
00449         '''
00450         if not self.is_connected:
00451             rospy.logerr("Gateway : advertise call failed [no hub rule].")
00452             return None
00453         if not connections:
00454             try:
00455                 connections = self.master.getConnectionState()
00456             except httplib.ResponseNotReady as unused_e:
00457                 rospy.logwarn("Received ResponseNotReady from master api")
00458                 return None
00459         new_conns, lost_conns = self.public_interface.update(connections)
00460         public_interface = self.public_interface.getInterface()
00461         for connection_type in utils.connection_types:
00462             for connection in new_conns[connection_type]:
00463                 rospy.loginfo("Gateway : adding rule to public interface %s" % utils.formatRule(connection.rule))
00464                 self.hub.advertise(connection)
00465             for connection in lost_conns[connection_type]:
00466                 rospy.loginfo("Gateway : removing rule to public interface %s" % utils.formatRule(connection.rule))
00467                 self.hub.unadvertise(connection)
00468         return public_interface
00469 
00470     ##########################################################################
00471     # Incoming commands from remote gateways
00472     ##########################################################################
00473 
00474     def process_remote_gateway_flip_request(self, registration):
00475         '''
00476           Used as a callback for incoming requests on redis pubsub channels.
00477           It gets assigned to RedisManager.callback.
00478 
00479           @param registration : fully detailed registration to be processed
00480           @type utils.Registration
00481         '''
00482         if self.flipped_interface.firewall:
00483             rospy.logwarn("Gateway : firewalling a flip request %s" % registration)
00484         else:
00485             rospy.loginfo("Gateway : received a flip request %s" % registration)
00486             # probably not necessary as the flipping gateway will already check this
00487             existing_registration = self.flipped_interface.findRegistrationMatch(registration.remote_gateway, registration.connection.rule.name, registration.connection.rule.node, registration.connection.rule.type)
00488             if not existing_registration:
00489                 new_registration = self.master.register(registration)
00490                 if new_registration:
00491                     self.flipped_interface.registrations[registration.connection.rule.type].append(new_registration)
00492 
00493     def process_remote_gateway_unflip_request(self, rule, remote_gateway):
00494         rospy.loginfo("Gateway : received an unflip request from gateway %s: %s" % (remote_gateway, utils.formatRule(rule)))
00495         existing_registration = self.flipped_interface.findRegistrationMatch(remote_gateway, rule.name, rule.node, rule.type)
00496         if existing_registration:
00497             self.master.unregister(existing_registration)
00498             self.flipped_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration)
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends


rocon_gateway
Author(s): Daniel Stonier, Jihoon Lee,
autogenerated on Tue Jan 15 2013 17:43:24