00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import redis
00011 import threading
00012 import roslib
00013 roslib.load_manifest('rocon_gateway')
00014 import rospy
00015 import re
00016 import utils
00017 import gateway_msgs.msg
00018 import rosgraph
00019 from .exceptions import UnavailableGatewayError, HubConnectionLostError
00020
00021
00022
00023
00024
00025
00026 def create_key(key):
00027 '''
00028 Root the specified redis key name in our pseudo redis database.
00029 '''
00030 if re.match('rocon:', key):
00031 return key
00032 else:
00033 return 'rocon:' + key
00034
00035
00036 def create_hub_key(key):
00037 '''
00038 Root the specified redis key name in our pseudo redis database under
00039 the hub namespace
00040 '''
00041 if re.match('rocon:hub:', key):
00042 return key
00043 else:
00044 return 'rocon:hub:' + key
00045
00046
00047 def create_gateway_key(unique_gateway_name, key):
00048 '''
00049 Root the specified redis key name in our pseudo redis database under
00050 the gateway namespace.
00051
00052 @note : currently does no checking of the incoming keys
00053 '''
00054 return 'rocon:' + unique_gateway_name + ":" + key
00055
00056
00057 def extract_key(key):
00058 '''
00059 Extract the specified redis key name from our pseudo redis database.
00060 '''
00061 if re.match('rocon:', key):
00062 return re.sub(r'rocon:', '', key)
00063 else:
00064 return key
00065
00066
00067 def key_base_name(key):
00068 '''
00069 Extract the base name (i.e. last value) from the key.
00070 e.g. rocon:key:pirate24 -> pirate24
00071 '''
00072 return key.split(':')[-1]
00073
00074
00075 def resolve_hub(ip, port):
00076 '''
00077 Pings the hub for identification. We currently use this to check
00078 against the gateway whitelist/blacklists to determine if a rule
00079 should proceed or not.
00080
00081 @return string - hub name
00082 '''
00083 r = redis.Redis(host=ip, port=port)
00084 return r.get("rocon:hub:name")
00085
00086
00087
00088
00089
00090
00091 class RedisListenerThread(threading.Thread):
00092 '''
00093 Tunes into the redis channels that have been subscribed to and
00094 calls the apropriate callbacks.
00095 '''
00096 def __init__(self, redis_pubsub_server, remote_gateway_request_callbacks):
00097 threading.Thread.__init__(self)
00098 self._redis_pubsub_server = redis_pubsub_server
00099 self._remote_gateway_request_callbacks = remote_gateway_request_callbacks
00100
00101 def run(self):
00102 '''
00103 Used as a callback for incoming requests on redis pubsub channels.
00104
00105 The received argument is a list of strings for 'flip':
00106
00107 - [0] - command : this one is 'flip'
00108 - [1] - remote_gateway : the name of the gateway that is flipping to us
00109 - [2] - remote_name
00110 - [3] - remote_node
00111 - [4] - type : one of ConnectionType.PUBLISHER etc
00112 - [5] - type_info : a ros format type (e.g. std_msgs/String or service api)
00113 - [6] - xmlrpc_uri : the xmlrpc node uri
00114
00115 The command 'unflip' is the same, not including args 5 and 6.
00116 '''
00117 try:
00118 for r in self._redis_pubsub_server.listen():
00119 if r['type'] != 'unsubscribe' and r['type'] != 'subscribe':
00120 command, source, contents = utils.deserialize_request(r['data'])
00121 rospy.logdebug("Gateway : redis listener received a channel publication from %s : [%s]" % (source, command))
00122 if command == 'flip':
00123 registration = utils.Registration(utils.get_connection_from_list(contents), source)
00124 self._remote_gateway_request_callbacks['flip'](registration)
00125 elif command == 'unflip':
00126 self._remote_gateway_request_callbacks['unflip'](utils.get_rule_from_list(contents), source)
00127 else:
00128 rospy.logerr("Gateway : received an unknown command from the hub.")
00129 except redis.exceptions.ConnectionError:
00130 rospy.logwarn("Gateway : lost connection to the hub (probably shut down)")
00131
00132
00133
00134
00135
00136
00137 class Hub(object):
00138
00139 pool = None
00140 server = None
00141 pubsub = None
00142 callback = None
00143
00144 def __init__(self, remote_gateway_request_callbacks, gateway_name, firewall):
00145 '''
00146 @param remote_gateway_request_callbacks : to handle redis responses
00147 @type list of function pointers (back to GatewaySync class
00148
00149 @param gateway_name : recommended name for this gateway
00150 @type string
00151
00152 @param firewall : whether this gateway blocks flips or not (we publish this info)
00153 @type Bool
00154 '''
00155 self.name = ''
00156 self.uri = ''
00157 self._gateway_name = gateway_name
00158 self._unique_gateway_name = ''
00159 self._firewall = 1 if firewall else 0
00160 self._remote_gateway_request_callbacks = remote_gateway_request_callbacks
00161 self._redis_keys = {}
00162
00163
00164 self._redis_keys['index'] = create_hub_key('index')
00165 self._redis_keys['gatewaylist'] = create_hub_key('gatewaylist')
00166 self._redis_channels = {}
00167 self._redis_pubsub_server = None
00168
00169
00170
00171
00172
00173 def connect(self, ip, portarg):
00174 try:
00175 self.pool = redis.ConnectionPool(host=ip, port=portarg, db=0)
00176 self.server = redis.Redis(connection_pool=self.pool)
00177 rospy.logdebug("Gateway : connected to the hub's redis server.")
00178 self._redis_pubsub_server = self.server.pubsub()
00179 self.uri = str(ip) + ":" + str(portarg)
00180 except redis.exceptions.ConnectionError as unused_e:
00181 rospy.logerror("Gateway : failed rule to the hub's redis server.")
00182 raise
00183
00184 def register_gateway(self, ip):
00185 '''
00186 Register a gateway with the hub. Note that you must have already
00187 connected before calling this function.
00188
00189 On registration, the hub will provide a unique identifier number
00190 which will be appended to the suggested name of this gateway to
00191 ensure a unique id string and key for this gateway.
00192
00193 @return: success or failure of the operation
00194 @rtype: bool
00195
00196 @todo - maybe merge with 'connect', or at the least check if it
00197 is actually connected here first.
00198 '''
00199 if self.server.sadd(self._redis_keys['gatewaylist'], create_key(self._gateway_name)):
00200 self._unique_gateway_name = self._gateway_name
00201 self._redis_keys['gateway'] = create_key(self._unique_gateway_name)
00202 else:
00203 unique_num = self.server.incr(self._redis_keys['index'])
00204 self._unique_gateway_name = self._gateway_name + str(unique_num)
00205 self._redis_keys['gateway'] = create_key(self._unique_gateway_name)
00206 unused_ret = self.server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
00207 unused_ret = self.server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
00208 self._redis_keys['firewall'] = create_gateway_key(self._unique_gateway_name, 'firewall')
00209 self.server.set(self._redis_keys['firewall'], self._firewall)
00210 self._redis_keys['ip'] = create_gateway_key(self._unique_gateway_name, 'ip')
00211 self.server.set(self._redis_keys['ip'], ip)
00212 self._redis_channels['gateway'] = self._redis_keys['gateway']
00213 self._redis_pubsub_server.subscribe(self._redis_channels['gateway'])
00214 self.remote_gateway_listener_thread = RedisListenerThread(self._redis_pubsub_server, self._remote_gateway_request_callbacks)
00215 self.remote_gateway_listener_thread.start()
00216 self.name = key_base_name(self.server.get("rocon:hub:name"))
00217 return key_base_name(self._redis_keys['gateway'])
00218
00219 def unregister_gateway(self):
00220 '''
00221 Remove all gateway info from the hub.
00222
00223 @return: success or failure of the operation
00224 @rtype: bool
00225 '''
00226 try:
00227 self._redis_pubsub_server.unsubscribe()
00228 gateway_keys = self.server.keys(self._redis_keys['gateway'] + ":*")
00229 pipe = self.server.pipeline()
00230 pipe.delete(*gateway_keys)
00231 pipe.srem(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
00232 pipe.execute()
00233 self._redis_channels = {}
00234 self._unique_gateway_name = ''
00235 self.name = ''
00236 except redis.exceptions.ConnectionError:
00237
00238
00239 return False
00240 rospy.loginfo("Gateway : unregistering gateway from the hub.")
00241 return True
00242
00243
00244
00245
00246
00247 def remote_gateway_info(self, gateway):
00248 '''
00249 Return remote gateway information for the specified gateway string id.
00250
00251 @param gateways : gateway id string to search for
00252 @type string
00253 @return remote gateway information
00254 @rtype gateway_msgs.msg.RemotGateway or None
00255 '''
00256 firewall = self.server.get(create_gateway_key(gateway, 'firewall'))
00257 ip = self.server.get(create_gateway_key(gateway, 'ip'))
00258 if firewall is None:
00259 return None
00260 else:
00261 remote_gateway = gateway_msgs.msg.RemoteGateway()
00262 remote_gateway.name = gateway
00263 remote_gateway.ip = ip
00264 remote_gateway.firewall = True if int(firewall) else False
00265 remote_gateway.public_interface = []
00266 encoded_advertisements = self.server.smembers(create_gateway_key(gateway, 'advertisements'))
00267 for encoded_advertisement in encoded_advertisements:
00268 advertisement = utils.deserialize_connection(encoded_advertisement)
00269 remote_gateway.public_interface.append(advertisement.rule)
00270 remote_gateway.flipped_interface = []
00271 encoded_flips = self.server.smembers(create_gateway_key(gateway, 'flips'))
00272 for encoded_flip in encoded_flips:
00273 [target_gateway, name, type, node] = utils.deserialize(encoded_flip)
00274 remote_rule = gateway_msgs.msg.RemoteRule(target_gateway, gateway_msgs.msg.Rule(name, type, node))
00275 remote_gateway.flipped_interface.append(remote_rule)
00276 remote_gateway.pulled_interface = []
00277 encoded_pulls = self.server.smembers(create_gateway_key(gateway, 'pulls'))
00278 for encoded_pull in encoded_pulls:
00279 [target_gateway, name, type, node] = utils.deserialize(encoded_pull)
00280 remote_rule = gateway_msgs.msg.RemoteRule(target_gateway, gateway_msgs.msg.Rule(name, type, node))
00281 remote_gateway.pulled_interface.append(remote_rule)
00282 return remote_gateway
00283
00284 def list_remote_gateway_names(self):
00285 '''
00286 Return a list of the gateways (name list, not redis keys).
00287 e.g. ['gateway32','pirate33']
00288 '''
00289 if not self.server:
00290 rospy.logerr("Gateway : cannot retrive remote gateway names [not connected to a hub]")
00291 return []
00292 try:
00293 gateway_keys = self.server.smembers(self._redis_keys['gatewaylist'])
00294 except redis.ConnectionError as unused_e:
00295 rospy.logwarn("Concert Client : lost connection to the hub (probably shut down)")
00296 raise HubConnectionLostError()
00297 gateways = []
00298 for gateway in gateway_keys:
00299 if key_base_name(gateway) != self._unique_gateway_name:
00300 gateways.append(key_base_name(gateway))
00301 return gateways
00302
00303 def matches_remote_gateway_name(self, gateway):
00304 '''
00305 Use this when gateway can be a regular expression and
00306 we need to check it off against list_remote_gateway_names()
00307 '''
00308 try:
00309 for remote_gateway in self.list_remote_gateway_names():
00310 if re.match(gateway, remote_gateway):
00311 return True
00312 except HubConnectionLostError:
00313 raise
00314 return False
00315
00316 def get_remote_connection_state(self, gateway):
00317 '''
00318 Equivalent to getConnectionState, but generates it from the public
00319 interface of a foreign gateway
00320 '''
00321 connections = utils.createEmptyConnectionTypeDictionary()
00322 key = create_gateway_key(gateway, 'advertisements')
00323 public_interface = self.server.smembers(key)
00324 for connection_str in public_interface:
00325 connection = utils.deserialize_connection(connection_str)
00326 connections[connection.rule.type].append(connection)
00327 return connections
00328
00329 def get_remote_gateway_firewall_flag(self, gateway):
00330 '''
00331 Returns the value of the remote gateway's firewall (flip)
00332 flag.
00333
00334 @param gateway : gateway string id
00335 @param string
00336
00337 @return state of the flag
00338 @rtype Bool
00339
00340 @raise UnavailableGatewayError when specified gateway is not on the hub
00341 '''
00342 firewall = self.server.get(create_gateway_key(gateway, 'firewall'))
00343 if firewall is not None:
00344 return True if int(firewall) else False
00345 else:
00346 raise UnavailableGatewayError
00347
00348
00349
00350
00351
00352 def advertise(self, connection):
00353 '''
00354 Places a topic, service or action on the public interface. On the
00355 redis server, this representation will always be:
00356
00357 - topic : a triple { name, type, xmlrpc node uri }
00358 - service : a triple { name, rosrpc uri, xmlrpc node uri }
00359 - action : ???
00360
00361 @param connection: representation of a connection (topic, service, action)
00362 @type connection: str
00363 @raise .exceptions.ConnectionTypeError: if connection arg is invalid.
00364 '''
00365 key = create_gateway_key(self._unique_gateway_name, 'advertisements')
00366 msg_str = utils.serialize_connection(connection)
00367 self.server.sadd(key, msg_str)
00368
00369 def unadvertise(self, connection):
00370 '''
00371 Removes a topic, service or action from the public interface.
00372
00373 @param connection: representation of a connection (topic, service, action)
00374 @type connection: str
00375 @raise .exceptions.ConnectionTypeError: if connectionarg is invalid.
00376 '''
00377 key = create_gateway_key(self._unique_gateway_name,'advertisements')
00378 msg_str = utils.serialize_connection(connection)
00379 self.server.srem(key,msg_str)
00380
00381 def post_flip_details(self, gateway, name, type, node):
00382 '''
00383 Post flip details to the redis server. This has no actual functionality,
00384 it is just useful for debugging with the remote_gateway_info service.
00385
00386 @param gateway : the target of the flip
00387 @type string
00388 @param name : the name of the connection
00389 @type string
00390 @param type : the type of the connection (one of ConnectionType.xxx
00391 @type string
00392 @param node : the node name it was pulled from
00393 @type string
00394 '''
00395 key = create_gateway_key(self._unique_gateway_name, 'flips')
00396 serialized_data = utils.serialize([gateway, name, type, node])
00397 self.server.sadd(key, serialized_data)
00398
00399 def remove_flip_details(self, gateway, name, type, node):
00400 '''
00401 Post flip details to the redis server. This has no actual functionality,
00402 it is just useful for debugging with the remote_gateway_info service.
00403
00404 @param gateway : the target of the flip
00405 @type string
00406 @param name : the name of the connection
00407 @type string
00408 @param type : the type of the connection (one of ConnectionType.xxx
00409 @type string
00410 @param node : the node name it was pulled from
00411 @type string
00412 '''
00413 key = create_gateway_key(self._unique_gateway_name, 'flips')
00414 serialized_data = utils.serialize([gateway, name, type, node])
00415 self.server.srem(key, serialized_data)
00416
00417 def post_pull_details(self, gateway, name, type, node):
00418 '''
00419 Post pull details to the hub. This has no actual functionality,
00420 it is just useful for debugging with the remote_gateway_info service.
00421
00422 @param gateway : the gateway it is pulling from
00423 @type string
00424 @param name : the name of the connection
00425 @type string
00426 @param type : the type of the connection (one of ConnectionType.xxx
00427 @type string
00428 @param node : the node name it was pulled from
00429 @type string
00430 '''
00431 key = create_gateway_key(self._unique_gateway_name, 'pulls')
00432 serialized_data = utils.serialize([gateway, name, type, node])
00433 self.server.sadd(key, serialized_data)
00434
00435 def remove_pull_details(self, gateway, name, type, node):
00436 '''
00437 Post pull details to the hub. This has no actual functionality,
00438 it is just useful for debugging with the remote_gateway_info service.
00439
00440 @param gateway : the gateway it was pulling from
00441 @type string
00442 @param name : the name of the connection
00443 @type string
00444 @param type : the type of the connection (one of ConnectionType.xxx
00445 @type string
00446 @param node : the node name it was pulled from
00447 @type string
00448 '''
00449 key = create_gateway_key(self._unique_gateway_name, 'pulls')
00450 serialized_data = utils.serialize([gateway, name, type, node])
00451 self.server.srem(key, serialized_data)
00452
00453
00454
00455
00456
00457 def send_flip_request(self, gateway, connection):
00458 '''
00459 Sends a message to the remote gateway via redis pubsub channel. This is called from the
00460 watcher thread, when a flip rule gets activated.
00461
00462 - redis channel name: rocon:<remote_gateway_name>
00463 - data : list of [ command, gateway, rule type, type, xmlrpc_uri ]
00464 - [0] - command : in this case 'flip'
00465 - [1] - gateway : the name of this gateway, i.e. the flipper
00466 - [2] - name : local name
00467 - [3] - node : local node name
00468 - [4] - connection_type : one of ConnectionType.PUBLISHER etc
00469 - [5] - type_info : a ros format type (e.g. std_msgs/String or service api)
00470 - [6] - xmlrpc_uri : the xmlrpc node uri
00471
00472 @param command : string command name - either 'flip' or 'unflip'
00473 @type str
00474
00475 @param flip_rule : the flip to send
00476 @type gateway_msgs.msg.RemoteRule
00477
00478 @param type_info : topic type (e.g. std_msgs/String)
00479 @param str
00480
00481 @param xmlrpc_uri : the node uri
00482 @param str
00483 '''
00484 source = key_base_name(self._redis_keys['gateway'])
00485 cmd = utils.serialize_connection_request('flip', source, connection)
00486 try:
00487 self.server.publish(create_key(gateway),cmd)
00488 except Exception as e:
00489 return False
00490 return True
00491
00492 def send_unflip_request(self, gateway, rule):
00493 if rule.type == gateway_msgs.msg.ConnectionType.ACTION_CLIENT:
00494 action_name = rule.name
00495 rule.type = gateway_msgs.msg.ConnectionType.PUBLISHER
00496 rule.name = action_name + "/goal"
00497 self._send_unflip_request(gateway, rule)
00498 rule.name = action_name + "/cancel"
00499 self._send_unflip_request(gateway, rule)
00500 rule.type = gateway_msgs.msg.ConnectionType.SUBSCRIBER
00501 rule.name = action_name + "/feedback"
00502 self._send_unflip_request(gateway, rule)
00503 rule.name = action_name + "/status"
00504 self._send_unflip_request(gateway, rule)
00505 rule.name = action_name + "/result"
00506 self._send_unflip_request(gateway, rule)
00507 elif rule.type == gateway_msgs.msg.ConnectionType.ACTION_SERVER:
00508 action_name = rule.name
00509 rule.type = gateway_msgs.msg.ConnectionType.SUBSCRIBER
00510 rule.name = action_name + "/goal"
00511 self._send_unflip_request(gateway, rule)
00512 rule.name = action_name + "/cancel"
00513 self._send_unflip_request(gateway, rule)
00514 rule.type = gateway_msgs.msg.ConnectionType.PUBLISHER
00515 rule.name = action_name + "/feedback"
00516 self._send_unflip_request(gateway, rule)
00517 rule.name = action_name + "/status"
00518 self._send_unflip_request(gateway, rule)
00519 rule.name = action_name + "/result"
00520 self._send_unflip_request(gateway, rule)
00521 else:
00522 self._send_unflip_request(gateway, rule)
00523
00524 def _send_unflip_request(self, gateway, rule):
00525 source = key_base_name(self._redis_keys['gateway'])
00526 cmd = utils.serialize_rule_request('unflip', source, rule)
00527 try:
00528 self.server.publish(create_key(gateway), cmd)
00529 except Exception as e:
00530 return False
00531 return True