00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 import httplib
00012
00013 import roslib
00014 roslib.load_manifest('rocon_gateway')
00015 import rospy
00016
00017
00018 import gateway_msgs.msg
00019 import gateway_msgs.srv
00020 from gateway_msgs.msg import Rule
00021 from gateway_msgs.srv import AdvertiseResponse
00022 from gateway_msgs.srv import AdvertiseAllResponse
00023
00024
00025 import utils
00026 import ros_parameters
00027 from .hub_api import Hub
00028 from .master_api import LocalMaster
00029 from .watcher_thread import WatcherThread
00030 from .exceptions import UnavailableGatewayError
00031 from .flipped_interface import FlippedInterface
00032 from .public_interface import PublicInterface
00033 from .pulled_interface import PulledInterface
00034
00035
00036
00037
00038
00039
00040 class GatewaySync(object):
00041 '''
00042 The gateway between ros system and redis server
00043 '''
00044
00045 def __init__(self, param):
00046 self.param = param
00047 self.unresolved_name = self.param['name']
00048 self.unique_name = None
00049 self._ip = None
00050 self.is_connected = False
00051 default_rule_blacklist = ros_parameters.generate_rules(self.param["default_blacklist"])
00052
00053 default_rules, all_targets = ros_parameters.generate_remote_rules(self.param["default_flips"])
00054 self.flipped_interface = FlippedInterface(
00055 firewall = self.param['firewall'],
00056 default_rule_blacklist = default_rule_blacklist,
00057 default_rules = default_rules,
00058 all_targets = all_targets)
00059 default_rules, all_targets = ros_parameters.generate_remote_rules(self.param["default_pulls"])
00060 self.pulled_interface = PulledInterface(default_rule_blacklist = default_rule_blacklist,
00061 default_rules = default_rules,
00062 all_targets = all_targets)
00063 self.public_interface = PublicInterface(default_rule_blacklist=default_rule_blacklist,
00064 default_rules = ros_parameters.generate_rules(self.param['default_advertisements'])
00065 )
00066 if self.param['advertise_all']:
00067 self.public_interface.advertiseAll([])
00068 self.master = LocalMaster()
00069 self.remote_gateway_request_callbacks = {}
00070 self.remote_gateway_request_callbacks['flip'] = self.process_remote_gateway_flip_request
00071 self.remote_gateway_request_callbacks['unflip'] = self.process_remote_gateway_unflip_request
00072 self.hub = Hub(self.remote_gateway_request_callbacks, self.unresolved_name, firewall=self.param['firewall'])
00073
00074
00075 self.watcher_thread = WatcherThread(self, self.param['watch_loop_period'])
00076
00077
00078
00079
00080
00081 def connect_to_hub(self, ip, port):
00082 try:
00083 self.hub.connect(ip, port)
00084 self._ip = self.master.get_ros_ip()
00085 self.unique_name = self.hub.register_gateway(self._ip)
00086 self.is_connected = True
00087 except Exception as e:
00088 print "Exception"
00089 rospy.logerr("Gateway : error connecting to the hub %s" % str(e))
00090 return False
00091 return True
00092
00093 def shutdown(self):
00094 self.watcher_thread.shutdown()
00095 for connection_type in utils.connection_types:
00096 for flip in self.flipped_interface.flipped[connection_type]:
00097 self.hub.send_unflip_request(flip.gateway, flip.rule)
00098 for registration in self.flipped_interface.registrations[connection_type]:
00099 self.master.unregister(registration)
00100 self.hub.unregister_gateway()
00101
00102
00103
00104
00105
00106 def ros_service_advertise(self, request):
00107 '''
00108 Puts/Removes a number of rules on the public interface watchlist.
00109 As local rules matching these rules become available/go away,
00110 the public interface is modified accordingly. A manual update is done
00111 at the end of the advertise call to quickly capture existing
00112 rules
00113
00114 @param request
00115 @type gateway_msgs.srv.AdvertiseRequest
00116 @return service response
00117 @rtype gateway_msgs.srv.AdvertiseReponse
00118 '''
00119 response = gateway_msgs.srv.AdvertiseResponse()
00120 response.result, response.error_message = self._ros_service_advertise_checks()
00121 if response.result == gateway_msgs.msg.Result.SUCCESS:
00122 try:
00123 if not request.cancel:
00124 for rule in request.rules:
00125 if not self.public_interface.add_rule(rule):
00126 response.result = gateway_msgs.msg.Result.ADVERTISEMENT_EXISTS
00127 response.error_message = "advertisment rule already exists [%s:(%s,%s)]" % (rule.name, rule.type, rule.node)
00128 else:
00129 for rule in request.rules:
00130 if not self.public_interface.remove_rule(rule):
00131 response.result = gateway_msgs.msg.Result.ADVERTISEMENT_NOT_FOUND
00132 response.error_message = "advertisment not found [%s:(%s,%s)]" % (rule.name, rule.type, rule.node)
00133 except Exception as e:
00134 rospy.logerr("Gateway : unknown advertise error [%s]." % str(e))
00135 response.result = gateway_msgs.msg.Result.UNKNOWN_ADVERTISEMENT_ERROR
00136
00137
00138 if response.result == gateway_msgs.msg.Result.SUCCESS:
00139 self.watcher_thread.trigger_update = True
00140 else:
00141 rospy.logerr("Gateway : %s." % response.error_message)
00142 response.watchlist = self.public_interface.getWatchlist()
00143 return response
00144
00145 def ros_service_advertise_all(self, request):
00146 '''
00147 Toggles the advertise all mode. If advertising all, an additional
00148 blacklist parameter can be supplied which includes all the topics that
00149 will not be advertised/watched for. This blacklist is added to the
00150 default blacklist of the public interface
00151
00152 @param request
00153 @type gateway_msgs.srv.AdvertiseAllRequest
00154 @return service response
00155 @rtype gateway_msgs.srv.AdvertiseAllReponse
00156 '''
00157 response = gateway_msgs.srv.AdvertiseAllResponse()
00158 response.result, response.error_message = self._ros_service_advertise_checks()
00159 if response.result == gateway_msgs.msg.Result.SUCCESS:
00160 try:
00161 if not request.cancel:
00162 if not self.public_interface.advertiseAll(request.blacklist):
00163 response.result = gateway_msgs.msg.Result.ADVERTISEMENT_EXISTS
00164 response.error_message = "already advertising all."
00165 else:
00166 self.public_interface.unadvertiseAll()
00167 except Exception as e:
00168 response.result = gateway_msgs.msg.Result.UNKNOWN_ADVERTISEMENT_ERROR
00169 response.error_message = "unknown advertise all error [%s]" % (str(e))
00170
00171
00172 if response.result == gateway_msgs.msg.Result.SUCCESS:
00173 self.watcher_thread.trigger_update = True
00174 else:
00175 rospy.logerr("Gateway : %s." % response.error_message)
00176 response.blacklist = self.public_interface.getBlacklist()
00177 return response
00178
00179 def ros_service_flip(self, request):
00180 '''
00181 Puts flip rules on a watchlist which (un)flips them when they
00182 become (un)available.
00183
00184 @param request
00185 @type gateway_msgs.srv.RemoteRequest
00186 @return service response
00187 @rtype gateway_msgs.srv.RemoteResponse
00188 '''
00189 response = gateway_msgs.srv.RemoteResponse()
00190 for remote in request.remotes:
00191 response.result, response.error_message = self._ros_service_flip_checks(remote.gateway)
00192 if response.result != gateway_msgs.msg.Result.SUCCESS:
00193 rospy.logerr("Gateway : %s." % response.error_message)
00194 return response
00195
00196
00197 added_rules = []
00198 for remote in request.remotes:
00199 if not request.cancel:
00200 flip_rule = self.flipped_interface.add_rule(remote)
00201 if flip_rule:
00202 added_rules.append(flip_rule)
00203 rospy.loginfo("Gateway : added flip rule [%s:(%s,%s)]" % (flip_rule.gateway, flip_rule.rule.name, flip_rule.rule.type))
00204 else:
00205 response.result = gateway_msgs.msg.Result.FLIP_RULE_ALREADY_EXISTS
00206 response.error_message = "flip rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
00207 break
00208 else:
00209 removed_flip_rules = self.flipped_interface.remove_rule(remote)
00210 if removed_flip_rules:
00211 rospy.loginfo("Gateway : removed flip rule [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type))
00212
00213 if response.result == gateway_msgs.msg.Result.SUCCESS:
00214 self.watcher_thread.trigger_update = True
00215 else:
00216 if added_rules:
00217 for added_rule in added_rules:
00218 self.flipped_interface.remove_rule(added_rule)
00219 rospy.logerr("Gateway : %s." % response.error_message)
00220 return response
00221
00222 def ros_service_flip_all(self, request):
00223 '''
00224 Flips everything except a specified blacklist to a particular gateway,
00225 or if the cancel flag is set, clears all flips to that gateway.
00226
00227 @param request
00228 @type gateway_msgs.srv.RemoteAllRequest
00229 @return service response
00230 @rtype gateway_msgs.srv.RemoteAllResponse
00231 '''
00232 response = gateway_msgs.srv.RemoteAllResponse()
00233 response.result, response.error_message = self._ros_service_flip_checks(request.gateway)
00234 if response.result == gateway_msgs.msg.Result.SUCCESS:
00235 if not request.cancel:
00236 if self.flipped_interface.flip_all(request.gateway, request.blacklist):
00237 rospy.loginfo("Gateway : flipping all to gateway '%s'" % (request.gateway))
00238 else:
00239 response.result = gateway_msgs.msg.Result.FLIP_RULE_ALREADY_EXISTS
00240 response.error_message = "already flipping all to gateway '%s' " + request.gateway
00241 else:
00242 self.flipped_interface.un_flip_all(request.gateway)
00243 rospy.loginfo("Gateway : cancelling a previous flip all request [%s]" % (request.gateway))
00244 if response.result == gateway_msgs.msg.Result.SUCCESS:
00245 self.watcher_thread.trigger_update = True
00246 else:
00247 rospy.logerr("Gateway : %s." % response.error_message)
00248 return response
00249
00250 def ros_service_pull(self, request):
00251 '''
00252 Puts a single rule on a watchlist and (un)flips it to a particular
00253 gateway when it becomes (un)available. Note that this can also
00254 completely reconfigure the fully qualified name for the rule when
00255 flipping (remapping). If not specified, it will simply reroot rule
00256 under <unique_gateway_name>.
00257
00258 @param request
00259 @type gateway_msgs.srv.RemoteRequest
00260 @return service response
00261 @rtype gateway_msgs.srv.RemoteResponse
00262 '''
00263 response = gateway_msgs.srv.RemoteResponse()
00264 for remote in request.remotes:
00265 response.result, response.error_message = self._ros_service_flip_checks(remote.gateway)
00266 if response.result != gateway_msgs.msg.Result.SUCCESS:
00267 rospy.logerr("Gateway : %s." % response.error_message)
00268 return response
00269
00270
00271 added_rules = []
00272 for remote in request.remotes:
00273 if not request.cancel:
00274 pull_rule = self.pulled_interface.add_rule(remote)
00275 if pull_rule:
00276 added_rules.append(pull_rule)
00277 rospy.loginfo("Gateway : added pull rule [%s:(%s,%s)]" % (pull_rule.gateway, pull_rule.rule.name, pull_rule.rule.type))
00278 else:
00279 response.result = gateway_msgs.msg.Result.PULL_RULE_ALREADY_EXISTS
00280 response.error_message = "pull rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
00281 break
00282 else:
00283 for remote in request.remotes:
00284 removed_pull_rules = self.pulled_interface.remove_rule(remote)
00285 if removed_pull_rules:
00286 rospy.loginfo("Gateway : removed pull rule [%s:%s]" % (remote.gateway, remote.rule.name))
00287 if response.result == gateway_msgs.msg.Result.SUCCESS:
00288 self.watcher_thread.trigger_update = True
00289 else:
00290 if added_rules:
00291 for added_rule in added_rules:
00292 self.pulled_interface.remove_rule(added_rule)
00293 rospy.logerr("Gateway : %s." % response.error_message)
00294 return response
00295
00296 def ros_service_pull_all(self, request):
00297 '''
00298 Pull everything except a specified blacklist from a particular gateway,
00299 or if the cancel flag is set, clears all pulls from that gateway.
00300
00301 @param request
00302 @type gateway_msgs.srv.RemoteAllRequest
00303 @return service response
00304 @rtype gateway_msgs.srv.RemoteAllResponse
00305 '''
00306 response = gateway_msgs.srv.RemoteAllResponse()
00307 response.result, response.error_message = self._ros_service_remote_checks(request.gateway)
00308 if response.result == gateway_msgs.msg.Result.SUCCESS:
00309 if not request.cancel:
00310 if self.pulled_interface.pull_all(request.gateway, request.blacklist):
00311 rospy.loginfo("Gateway : pulling all from gateway '%s'" % (request.gateway))
00312 else:
00313 response.result = gateway_msgs.msg.Result.FLIP_RULE_ALREADY_EXISTS
00314 response.error_message = "already pulling all from gateway '%s' " + request.gateway
00315 else:
00316 self.pulled_interface.unpull_all(request.gateway)
00317 rospy.loginfo("Gateway : cancelling a previous pull all request [%s]" % (request.gateway))
00318 if response.result == gateway_msgs.msg.Result.SUCCESS:
00319 self.watcher_thread.trigger_update = True
00320 else:
00321 rospy.logerr("Gateway : %s." % response.error_message)
00322 return response
00323
00324 def _ros_service_advertise_checks(self):
00325 if not self.is_connected:
00326 return gateway_msgs.msg.Result.NO_HUB_CONNECTION, "not connected to hub, aborting"
00327 else:
00328 return gateway_msgs.msg.Result.SUCCESS, ""
00329
00330 def _ros_service_flip_checks(self, gateway):
00331 '''
00332 Some simple checks for ros service flips.
00333
00334 @param gateway : target gateway string of the flip
00335 @type string
00336 @return pair of result type and message
00337 @rtype gateway_msgs.msg.Result.xxx, string
00338 '''
00339 result, error_message = self._ros_service_remote_checks(gateway)
00340 if result == gateway_msgs.msg.Result.SUCCESS:
00341 firewall_flag = False
00342 try:
00343 firewall_flag = self.hub.get_remote_gateway_firewall_flag(gateway)
00344 if firewall_flag:
00345 return gateway_msgs.msg.Result.FLIP_REMOTE_GATEWAY_FIREWALLING, "remote gateway is firewalling flip requests, aborting [%s]" % gateway
00346 except UnavailableGatewayError:
00347 pass
00348 return result, error_message
00349
00350 def _ros_service_remote_checks(self, gateway):
00351 '''
00352 Some simple checks for ros service pulls
00353
00354 @param gateway : target gateway string of the pull
00355 @type string
00356 @return pair of result type and message
00357 @rtype gateway_msgs.msg.Result.xxx, string
00358 '''
00359 if not self.is_connected:
00360 return gateway_msgs.msg.Result.NO_HUB_CONNECTION, "not connected to hub, aborting"
00361 elif gateway == self.unique_name:
00362 return gateway_msgs.msg.Result.FLIP_NO_TO_SELF, "gateway cannot flip to itself"
00363 elif not self.hub.matches_remote_gateway_name(gateway):
00364 return gateway_msgs.msg.Result.FLIP_REMOTE_GATEWAY_NOT_CONNECTED, "remote gateway is currently not connected [%s]" % gateway
00365 else:
00366 return gateway_msgs.msg.Result.SUCCESS, ""
00367
00368
00369
00370
00371
00372 def update_flip_interface(self, connections, gateways):
00373 '''
00374 Process the list of local connections and check against
00375 the current flip rules and patterns for changes. If a rule
00376 has become (un)available take appropriate action.
00377
00378 @param connections : list of current local connections parsed from the master
00379 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
00380
00381 @param gateways : list of remote gateway string id's
00382 @type string
00383 '''
00384 new_flips, lost_flips = self.flipped_interface.update(connections, gateways, self.unique_name)
00385 for connection_type in connections:
00386 for flip in new_flips[connection_type]:
00387
00388 connections = self.master.generate_connection_details(flip.rule.type, flip.rule.name, flip.rule.node)
00389 if connection_type == utils.ConnectionType.ACTION_CLIENT or connection_type == utils.ConnectionType.ACTION_SERVER:
00390 rospy.loginfo("Flipping to %s : %s" % (flip.gateway, utils.formatRule(flip.rule)))
00391 self.hub.post_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
00392 for connection in connections:
00393 self.hub.send_flip_request(flip.gateway, connection)
00394 else:
00395 for connection in connections:
00396 rospy.loginfo("Flipping to %s : %s" % (flip.gateway, utils.formatRule(connection.rule)))
00397 self.hub.send_flip_request(flip.gateway, connection)
00398 self.hub.post_flip_details(flip.gateway, connection.rule.name, connection.rule.type, connection.rule.node)
00399 for flip in lost_flips[connection_type]:
00400 rospy.loginfo("Unflipping to %s : %s" % (flip.gateway, utils.formatRule(flip.rule)))
00401 self.hub.send_unflip_request(flip.gateway, flip.rule)
00402 self.hub.remove_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
00403
00404 def update_pulled_interface(self, connections, gateways):
00405 '''
00406 Process the list of local connections and check against
00407 the current pull rules and patterns for changes. If a rule
00408 has become (un)available take appropriate action.
00409
00410 @param connections : list of current local connections parsed from the master
00411 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
00412
00413 @param gateways : list of remote gateway string id's
00414 @type string
00415 '''
00416 for gateway in gateways + self.pulled_interface.list_remote_gateway_names():
00417 connections = self.hub.get_remote_connection_state(gateway)
00418 new_pulls, lost_pulls = self.pulled_interface.update(connections, gateway, self.unique_name)
00419 for connection_type in connections:
00420 for pull in new_pulls[connection_type]:
00421 for connection in connections[pull.rule.type]:
00422 if connection.rule.name == pull.rule.name and \
00423 connection.rule.node == pull.rule.node:
00424 corresponding_connection = connection
00425 break
00426
00427 existing_registration = self.pulled_interface.findRegistrationMatch(gateway, pull.rule.name, pull.rule.node, pull.rule.type)
00428 if not existing_registration:
00429 registration = utils.Registration(connection, gateway)
00430 new_registration = self.master.register(registration)
00431 self.pulled_interface.registrations[registration.connection.rule.type].append(new_registration)
00432 self.hub.post_pull_details(gateway, pull.rule.name, pull.rule.type, pull.rule.node)
00433 for pull in lost_pulls[connection_type]:
00434
00435 existing_registration = self.pulled_interface.findRegistrationMatch(gateway, pull.rule.name, pull.rule.node, pull.rule.type)
00436 if existing_registration:
00437 self.master.unregister(existing_registration)
00438 self.hub.remove_pull_details(gateway, pull.rule.name, pull.rule.type, pull.rule.node)
00439 self.pulled_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration)
00440
00441 def update_public_interface(self, connections=None):
00442 '''
00443 Process the list of local connections and check against
00444 the current rules and patterns for changes. If a rule
00445 has become (un)available take appropriate action.
00446
00447 @param connections : list of current local connections parsed from the master
00448 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
00449 '''
00450 if not self.is_connected:
00451 rospy.logerr("Gateway : advertise call failed [no hub rule].")
00452 return None
00453 if not connections:
00454 try:
00455 connections = self.master.getConnectionState()
00456 except httplib.ResponseNotReady as unused_e:
00457 rospy.logwarn("Received ResponseNotReady from master api")
00458 return None
00459 new_conns, lost_conns = self.public_interface.update(connections)
00460 public_interface = self.public_interface.getInterface()
00461 for connection_type in utils.connection_types:
00462 for connection in new_conns[connection_type]:
00463 rospy.loginfo("Gateway : adding rule to public interface %s" % utils.formatRule(connection.rule))
00464 self.hub.advertise(connection)
00465 for connection in lost_conns[connection_type]:
00466 rospy.loginfo("Gateway : removing rule to public interface %s" % utils.formatRule(connection.rule))
00467 self.hub.unadvertise(connection)
00468 return public_interface
00469
00470
00471
00472
00473
00474 def process_remote_gateway_flip_request(self, registration):
00475 '''
00476 Used as a callback for incoming requests on redis pubsub channels.
00477 It gets assigned to RedisManager.callback.
00478
00479 @param registration : fully detailed registration to be processed
00480 @type utils.Registration
00481 '''
00482 if self.flipped_interface.firewall:
00483 rospy.logwarn("Gateway : firewalling a flip request %s" % registration)
00484 else:
00485 rospy.loginfo("Gateway : received a flip request %s" % registration)
00486
00487 existing_registration = self.flipped_interface.findRegistrationMatch(registration.remote_gateway, registration.connection.rule.name, registration.connection.rule.node, registration.connection.rule.type)
00488 if not existing_registration:
00489 new_registration = self.master.register(registration)
00490 if new_registration:
00491 self.flipped_interface.registrations[registration.connection.rule.type].append(new_registration)
00492
00493 def process_remote_gateway_unflip_request(self, rule, remote_gateway):
00494 rospy.loginfo("Gateway : received an unflip request from gateway %s: %s" % (remote_gateway, utils.formatRule(rule)))
00495 existing_registration = self.flipped_interface.findRegistrationMatch(remote_gateway, rule.name, rule.node, rule.type)
00496 if existing_registration:
00497 self.master.unregister(existing_registration)
00498 self.flipped_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration)