hub_api.py
Go to the documentation of this file.
00001 #!/usr/bin/env pythonupdate
00002 #
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/master/rocon_gateway/LICENSE
00005 #
00006 ###############################################################################
00007 # Imports
00008 ###############################################################################
00009 
00010 import redis
00011 import threading
00012 import roslib
00013 roslib.load_manifest('rocon_gateway')
00014 import rospy
00015 import re
00016 import utils
00017 import gateway_msgs.msg
00018 import rosgraph
00019 from .exceptions import UnavailableGatewayError, HubConnectionLostError
00020 
00021 ###############################################################################
00022 # Utility Functions
00023 ###############################################################################
00024 
00025 
00026 def create_key(key):
00027     '''
00028       Root the specified redis key name in our pseudo redis database.
00029     '''
00030     if re.match('rocon:', key):  # checks if leading rocon: is foundupdate
00031         return key
00032     else:
00033         return 'rocon:' + key
00034 
00035 
00036 def create_hub_key(key):
00037     '''
00038       Root the specified redis key name in our pseudo redis database under
00039       the hub namespace
00040     '''
00041     if re.match('rocon:hub:', key):  # checks if leading rocon: is foundupdate
00042         return key
00043     else:
00044         return 'rocon:hub:' + key
00045 
00046 
00047 def create_gateway_key(unique_gateway_name, key):
00048     '''
00049       Root the specified redis key name in our pseudo redis database under
00050       the gateway namespace.
00051 
00052       @note : currently does no checking of the incoming keys
00053     '''
00054     return 'rocon:' + unique_gateway_name + ":" + key
00055 
00056 
00057 def extract_key(key):
00058     '''
00059       Extract the specified redis key name from our pseudo redis database.
00060     '''
00061     if re.match('rocon:', key):  # checks if leading rocon: is found
00062         return re.sub(r'rocon:', '', key)
00063     else:
00064         return key
00065 
00066 
00067 def key_base_name(key):
00068     '''
00069       Extract the base name (i.e. last value) from the key.
00070       e.g. rocon:key:pirate24 -> pirate24
00071     '''
00072     return key.split(':')[-1]
00073 
00074 
00075 def resolve_hub(ip, port):
00076     '''
00077       Pings the hub for identification. We currently use this to check
00078       against the gateway whitelist/blacklists to determine if a rule
00079       should proceed or not.
00080 
00081       @return string - hub name
00082     '''
00083     r = redis.Redis(host=ip, port=port)
00084     return r.get("rocon:hub:name")  # perhaps should store all key names somewhere central
00085 
00086 ###############################################################################
00087 # Redis Callback Handler
00088 ##############################################################################
00089 
00090 
00091 class RedisListenerThread(threading.Thread):
00092     '''
00093       Tunes into the redis channels that have been subscribed to and
00094       calls the apropriate callbacks.
00095     '''
00096     def __init__(self, redis_pubsub_server, remote_gateway_request_callbacks):
00097         threading.Thread.__init__(self)
00098         self._redis_pubsub_server = redis_pubsub_server
00099         self._remote_gateway_request_callbacks = remote_gateway_request_callbacks
00100 
00101     def run(self):
00102         '''
00103           Used as a callback for incoming requests on redis pubsub channels.
00104 
00105           The received argument is a list of strings for 'flip':
00106 
00107             - [0] - command : this one is 'flip'
00108             - [1] - remote_gateway : the name of the gateway that is flipping to us
00109             - [2] - remote_name
00110             - [3] - remote_node
00111             - [4] - type : one of ConnectionType.PUBLISHER etc
00112             - [5] - type_info : a ros format type (e.g. std_msgs/String or service api)
00113             - [6] - xmlrpc_uri : the xmlrpc node uri
00114 
00115           The command 'unflip' is the same, not including args 5 and 6.
00116         '''
00117         try:
00118             for r in self._redis_pubsub_server.listen():
00119                 if r['type'] != 'unsubscribe' and r['type'] != 'subscribe':
00120                     command, source, contents = utils.deserialize_request(r['data'])
00121                     rospy.logdebug("Gateway : redis listener received a channel publication from %s : [%s]" % (source, command))
00122                     if command == 'flip':
00123                         registration = utils.Registration(utils.get_connection_from_list(contents), source)
00124                         self._remote_gateway_request_callbacks['flip'](registration)
00125                     elif command == 'unflip':
00126                         self._remote_gateway_request_callbacks['unflip'](utils.get_rule_from_list(contents), source)
00127                     else:
00128                         rospy.logerr("Gateway : received an unknown command from the hub.")
00129         except redis.exceptions.ConnectionError:
00130             rospy.logwarn("Gateway : lost connection to the hub (probably shut down)")
00131 
00132 ##############################################################################
00133 # Hub Manager - Redis Implementation
00134 ##############################################################################
00135 
00136 
00137 class Hub(object):
00138 
00139     pool = None
00140     server = None
00141     pubsub = None
00142     callback = None
00143 
00144     def __init__(self, remote_gateway_request_callbacks, gateway_name, firewall):
00145         '''
00146           @param remote_gateway_request_callbacks : to handle redis responses
00147           @type list of function pointers (back to GatewaySync class
00148 
00149           @param gateway_name : recommended name for this gateway
00150           @type string
00151 
00152           @param firewall : whether this gateway blocks flips or not (we publish this info)
00153           @type Bool
00154         '''
00155         self.name = ''  # the hub name
00156         self.uri = ''
00157         self._gateway_name = gateway_name  # used to generate the unique name key later
00158         self._unique_gateway_name = ''  # set when gateway is registered
00159         self._firewall = 1 if firewall else 0
00160         self._remote_gateway_request_callbacks = remote_gateway_request_callbacks
00161         self._redis_keys = {}
00162         #self._redis_keys['gateway'] = '' # it's a unique id set when gateway is registered
00163         #self._redis_keys['firewall'] = '' # it's also generated later when gateway is registered
00164         self._redis_keys['index'] = create_hub_key('index')  # used for uniquely id'ing the gateway (client)
00165         self._redis_keys['gatewaylist'] = create_hub_key('gatewaylist')
00166         self._redis_channels = {}
00167         self._redis_pubsub_server = None
00168 
00169     ##########################################################################
00170     # Hub Connections
00171     ##########################################################################
00172 
00173     def connect(self, ip, portarg):
00174         try:
00175             self.pool = redis.ConnectionPool(host=ip, port=portarg, db=0)
00176             self.server = redis.Redis(connection_pool=self.pool)
00177             rospy.logdebug("Gateway : connected to the hub's redis server.")
00178             self._redis_pubsub_server = self.server.pubsub()
00179             self.uri = str(ip) + ":" + str(portarg)
00180         except redis.exceptions.ConnectionError as unused_e:
00181             rospy.logerror("Gateway : failed rule to the hub's redis server.")
00182             raise
00183 
00184     def register_gateway(self, ip):
00185         '''
00186           Register a gateway with the hub. Note that you must have already
00187           connected before calling this function.
00188 
00189           On registration, the hub will provide a unique identifier number
00190           which will be appended to the suggested name of this gateway to
00191           ensure a unique id string and key for this gateway.
00192 
00193           @return: success or failure of the operation
00194           @rtype: bool
00195 
00196           @todo - maybe merge with 'connect', or at the least check if it
00197           is actually connected here first.
00198         '''
00199         if self.server.sadd(self._redis_keys['gatewaylist'], create_key(self._gateway_name)):
00200             self._unique_gateway_name = self._gateway_name
00201             self._redis_keys['gateway'] = create_key(self._unique_gateway_name)
00202         else:
00203             unique_num = self.server.incr(self._redis_keys['index'])
00204             self._unique_gateway_name = self._gateway_name + str(unique_num)
00205             self._redis_keys['gateway'] = create_key(self._unique_gateway_name)
00206             unused_ret = self.server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
00207         unused_ret = self.server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
00208         self._redis_keys['firewall'] = create_gateway_key(self._unique_gateway_name, 'firewall')
00209         self.server.set(self._redis_keys['firewall'], self._firewall)
00210         self._redis_keys['ip'] = create_gateway_key(self._unique_gateway_name, 'ip')
00211         self.server.set(self._redis_keys['ip'], ip)
00212         self._redis_channels['gateway'] = self._redis_keys['gateway']
00213         self._redis_pubsub_server.subscribe(self._redis_channels['gateway'])
00214         self.remote_gateway_listener_thread = RedisListenerThread(self._redis_pubsub_server, self._remote_gateway_request_callbacks)
00215         self.remote_gateway_listener_thread.start()
00216         self.name = key_base_name(self.server.get("rocon:hub:name"))
00217         return key_base_name(self._redis_keys['gateway'])
00218 
00219     def unregister_gateway(self):
00220         '''
00221           Remove all gateway info from the hub.
00222 
00223           @return: success or failure of the operation
00224           @rtype: bool
00225         '''
00226         try:
00227             self._redis_pubsub_server.unsubscribe()
00228             gateway_keys = self.server.keys(self._redis_keys['gateway'] + ":*")
00229             pipe = self.server.pipeline()
00230             pipe.delete(*gateway_keys)
00231             pipe.srem(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
00232             pipe.execute()
00233             self._redis_channels = {}
00234             self._unique_gateway_name = ''
00235             self.name = ''
00236         except redis.exceptions.ConnectionError:
00237             # usually just means the hub has gone down just before us, let it go quietly
00238             # rospy.logwarn("Gateway : problem unregistering from the hub (likely that hub shutdown before the gateway).")
00239             return False
00240         rospy.loginfo("Gateway : unregistering gateway from the hub.")
00241         return True
00242 
00243     ##########################################################################
00244     # Hub Data Retrieval
00245     ##########################################################################
00246 
00247     def remote_gateway_info(self, gateway):
00248         '''
00249           Return remote gateway information for the specified gateway string id.
00250 
00251           @param gateways : gateway id string to search for
00252           @type string
00253           @return remote gateway information
00254           @rtype gateway_msgs.msg.RemotGateway or None
00255         '''
00256         firewall = self.server.get(create_gateway_key(gateway, 'firewall'))
00257         ip = self.server.get(create_gateway_key(gateway, 'ip'))
00258         if firewall is None:
00259             return None  # equivalent to saying no gateway of this id found
00260         else:
00261             remote_gateway = gateway_msgs.msg.RemoteGateway()
00262             remote_gateway.name = gateway
00263             remote_gateway.ip = ip
00264             remote_gateway.firewall = True if int(firewall) else False
00265             remote_gateway.public_interface = []
00266             encoded_advertisements = self.server.smembers(create_gateway_key(gateway, 'advertisements'))
00267             for encoded_advertisement in encoded_advertisements:
00268                 advertisement = utils.deserialize_connection(encoded_advertisement)
00269                 remote_gateway.public_interface.append(advertisement.rule)
00270             remote_gateway.flipped_interface = []
00271             encoded_flips = self.server.smembers(create_gateway_key(gateway, 'flips'))
00272             for encoded_flip in encoded_flips:
00273                 [target_gateway, name, type, node] = utils.deserialize(encoded_flip)
00274                 remote_rule = gateway_msgs.msg.RemoteRule(target_gateway, gateway_msgs.msg.Rule(name, type, node))
00275                 remote_gateway.flipped_interface.append(remote_rule)
00276             remote_gateway.pulled_interface = []
00277             encoded_pulls = self.server.smembers(create_gateway_key(gateway, 'pulls'))
00278             for encoded_pull in encoded_pulls:
00279                 [target_gateway, name, type, node] = utils.deserialize(encoded_pull)
00280                 remote_rule = gateway_msgs.msg.RemoteRule(target_gateway, gateway_msgs.msg.Rule(name, type, node))
00281                 remote_gateway.pulled_interface.append(remote_rule)
00282             return remote_gateway
00283 
00284     def list_remote_gateway_names(self):
00285         '''
00286           Return a list of the gateways (name list, not redis keys).
00287           e.g. ['gateway32','pirate33']
00288         '''
00289         if not self.server:
00290             rospy.logerr("Gateway : cannot retrive remote gateway names [not connected to a hub]")
00291             return []
00292         try:
00293             gateway_keys = self.server.smembers(self._redis_keys['gatewaylist'])
00294         except redis.ConnectionError as unused_e:
00295             rospy.logwarn("Concert Client : lost connection to the hub (probably shut down)")
00296             raise HubConnectionLostError()
00297         gateways = []
00298         for gateway in gateway_keys:
00299             if key_base_name(gateway) != self._unique_gateway_name:
00300                 gateways.append(key_base_name(gateway))
00301         return gateways
00302 
00303     def matches_remote_gateway_name(self, gateway):
00304         '''
00305           Use this when gateway can be a regular expression and
00306           we need to check it off against list_remote_gateway_names()
00307         '''
00308         try:
00309             for remote_gateway in self.list_remote_gateway_names():
00310                 if re.match(gateway, remote_gateway):
00311                     return True
00312         except HubConnectionLostError:
00313             raise
00314         return False
00315 
00316     def get_remote_connection_state(self, gateway):
00317         '''
00318           Equivalent to getConnectionState, but generates it from the public
00319           interface of a foreign gateway
00320        '''
00321         connections = utils.createEmptyConnectionTypeDictionary()
00322         key = create_gateway_key(gateway, 'advertisements')
00323         public_interface = self.server.smembers(key)
00324         for connection_str in public_interface:
00325             connection = utils.deserialize_connection(connection_str)
00326             connections[connection.rule.type].append(connection)
00327         return connections
00328 
00329     def get_remote_gateway_firewall_flag(self, gateway):
00330         '''
00331           Returns the value of the remote gateway's firewall (flip)
00332           flag.
00333 
00334           @param gateway : gateway string id
00335           @param string
00336 
00337           @return state of the flag
00338           @rtype Bool
00339 
00340           @raise UnavailableGatewayError when specified gateway is not on the hub
00341         '''
00342         firewall = self.server.get(create_gateway_key(gateway, 'firewall'))
00343         if firewall is not None:
00344             return True if int(firewall) else False
00345         else:
00346             raise UnavailableGatewayError
00347 
00348     ##########################################################################
00349     # Posting Information to the Hub
00350     ##########################################################################
00351 
00352     def advertise(self, connection):
00353         '''
00354           Places a topic, service or action on the public interface. On the
00355           redis server, this representation will always be:
00356 
00357            - topic : a triple { name, type, xmlrpc node uri }
00358            - service : a triple { name, rosrpc uri, xmlrpc node uri }
00359            - action : ???
00360 
00361           @param connection: representation of a connection (topic, service, action)
00362           @type  connection: str
00363           @raise .exceptions.ConnectionTypeError: if connection arg is invalid.
00364         '''
00365         key = create_gateway_key(self._unique_gateway_name, 'advertisements')
00366         msg_str = utils.serialize_connection(connection)
00367         self.server.sadd(key, msg_str)
00368 
00369     def unadvertise(self, connection):
00370         '''
00371           Removes a topic, service or action from the public interface.
00372 
00373           @param connection: representation of a connection (topic, service, action)
00374           @type  connection: str
00375           @raise .exceptions.ConnectionTypeError: if connectionarg is invalid.
00376         '''
00377         key = create_gateway_key(self._unique_gateway_name,'advertisements')
00378         msg_str = utils.serialize_connection(connection)
00379         self.server.srem(key,msg_str)
00380 
00381     def post_flip_details(self, gateway, name, type, node):
00382         '''
00383           Post flip details to the redis server. This has no actual functionality,
00384           it is just useful for debugging with the remote_gateway_info service.
00385 
00386           @param gateway : the target of the flip
00387           @type string
00388           @param name : the name of the connection
00389           @type string
00390           @param type : the type of the connection (one of ConnectionType.xxx
00391           @type string
00392           @param node : the node name it was pulled from
00393           @type string
00394         '''
00395         key = create_gateway_key(self._unique_gateway_name, 'flips')
00396         serialized_data = utils.serialize([gateway, name, type, node])
00397         self.server.sadd(key, serialized_data)
00398 
00399     def remove_flip_details(self, gateway, name, type, node):
00400         '''
00401           Post flip details to the redis server. This has no actual functionality,
00402           it is just useful for debugging with the remote_gateway_info service.
00403 
00404           @param gateway : the target of the flip
00405           @type string
00406           @param name : the name of the connection
00407           @type string
00408           @param type : the type of the connection (one of ConnectionType.xxx
00409           @type string
00410           @param node : the node name it was pulled from
00411           @type string
00412         '''
00413         key = create_gateway_key(self._unique_gateway_name, 'flips')
00414         serialized_data = utils.serialize([gateway, name, type, node])
00415         self.server.srem(key, serialized_data)
00416 
00417     def post_pull_details(self, gateway, name, type, node):
00418         '''
00419           Post pull details to the hub. This has no actual functionality,
00420           it is just useful for debugging with the remote_gateway_info service.
00421 
00422           @param gateway : the gateway it is pulling from
00423           @type string
00424           @param name : the name of the connection
00425           @type string
00426           @param type : the type of the connection (one of ConnectionType.xxx
00427           @type string
00428           @param node : the node name it was pulled from
00429           @type string
00430         '''
00431         key = create_gateway_key(self._unique_gateway_name, 'pulls')
00432         serialized_data = utils.serialize([gateway, name, type, node])
00433         self.server.sadd(key, serialized_data)
00434 
00435     def remove_pull_details(self, gateway, name, type, node):
00436         '''
00437           Post pull details to the hub. This has no actual functionality,
00438           it is just useful for debugging with the remote_gateway_info service.
00439 
00440           @param gateway : the gateway it was pulling from
00441           @type string
00442           @param name : the name of the connection
00443           @type string
00444           @param type : the type of the connection (one of ConnectionType.xxx
00445           @type string
00446           @param node : the node name it was pulled from
00447           @type string
00448         '''
00449         key = create_gateway_key(self._unique_gateway_name, 'pulls')
00450         serialized_data = utils.serialize([gateway, name, type, node])
00451         self.server.srem(key, serialized_data)
00452 
00453     ##########################################################################
00454     # Gateway-Gateway Communications
00455     ##########################################################################
00456 
00457     def send_flip_request(self, gateway, connection):
00458         '''
00459           Sends a message to the remote gateway via redis pubsub channel. This is called from the
00460           watcher thread, when a flip rule gets activated.
00461 
00462            - redis channel name: rocon:<remote_gateway_name>
00463            - data : list of [ command, gateway, rule type, type, xmlrpc_uri ]
00464             - [0] - command       : in this case 'flip'
00465             - [1] - gateway       : the name of this gateway, i.e. the flipper
00466             - [2] - name          : local name
00467             - [3] - node          : local node name
00468             - [4] - connection_type : one of ConnectionType.PUBLISHER etc
00469             - [5] - type_info     : a ros format type (e.g. std_msgs/String or service api)
00470             - [6] - xmlrpc_uri    : the xmlrpc node uri
00471 
00472           @param command : string command name - either 'flip' or 'unflip'
00473           @type str
00474 
00475           @param flip_rule : the flip to send
00476           @type gateway_msgs.msg.RemoteRule
00477 
00478           @param type_info : topic type (e.g. std_msgs/String)
00479           @param str
00480 
00481           @param xmlrpc_uri : the node uri
00482           @param str
00483         '''
00484         source = key_base_name(self._redis_keys['gateway'])
00485         cmd = utils.serialize_connection_request('flip', source, connection)
00486         try:
00487             self.server.publish(create_key(gateway),cmd)
00488         except Exception as e:
00489             return False
00490         return True
00491 
00492     def send_unflip_request(self, gateway, rule):
00493         if rule.type == gateway_msgs.msg.ConnectionType.ACTION_CLIENT:
00494             action_name = rule.name
00495             rule.type = gateway_msgs.msg.ConnectionType.PUBLISHER
00496             rule.name = action_name + "/goal"
00497             self._send_unflip_request(gateway, rule)
00498             rule.name = action_name + "/cancel"
00499             self._send_unflip_request(gateway, rule)
00500             rule.type = gateway_msgs.msg.ConnectionType.SUBSCRIBER
00501             rule.name = action_name + "/feedback"
00502             self._send_unflip_request(gateway, rule)
00503             rule.name = action_name + "/status"
00504             self._send_unflip_request(gateway, rule)
00505             rule.name = action_name + "/result"
00506             self._send_unflip_request(gateway, rule)
00507         elif rule.type == gateway_msgs.msg.ConnectionType.ACTION_SERVER:
00508             action_name = rule.name
00509             rule.type = gateway_msgs.msg.ConnectionType.SUBSCRIBER
00510             rule.name = action_name + "/goal"
00511             self._send_unflip_request(gateway, rule)
00512             rule.name = action_name + "/cancel"
00513             self._send_unflip_request(gateway, rule)
00514             rule.type = gateway_msgs.msg.ConnectionType.PUBLISHER
00515             rule.name = action_name + "/feedback"
00516             self._send_unflip_request(gateway, rule)
00517             rule.name = action_name + "/status"
00518             self._send_unflip_request(gateway, rule)
00519             rule.name = action_name + "/result"
00520             self._send_unflip_request(gateway, rule)
00521         else:
00522             self._send_unflip_request(gateway, rule)
00523 
00524     def _send_unflip_request(self, gateway, rule):
00525         source = key_base_name(self._redis_keys['gateway'])
00526         cmd = utils.serialize_rule_request('unflip', source, rule)
00527         try:
00528             self.server.publish(create_key(gateway), cmd)
00529         except Exception as e:
00530             return False
00531         return True
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends


rocon_gateway
Author(s): Daniel Stonier, Jihoon Lee,
autogenerated on Tue Jan 15 2013 17:43:24