Package rocon_gateway :: Module gateway_hub
[frames] | no frames]

Source Code for Module rocon_gateway.gateway_hub

  1  #!/usr/bin/env python 
  2  # 
  3  # License: BSD 
  4  #   https://raw.github.com/robotics-in-concert/rocon_multimaster/master/rocon_gateway/LICENSE 
  5  # 
  6  ############################################################################### 
  7  # Imports 
  8  ############################################################################### 
  9   
 10  import redis 
 11  import threading 
 12  import rospy 
 13  import re 
 14  import utils 
 15  import gateway_msgs.msg as gateway_msgs 
 16  import rocon_utilities 
 17  import rocon_hub_client 
 18  from rocon_hub_client import hub_api 
 19  from rocon_hub_client.exceptions import HubConnectionLostError, \ 
 20                HubNameNotFoundError, HubNotFoundError 
 21   
 22   
 23  # local imports 
 24  from .exceptions import GatewayUnavailableError 
 25   
 26   
 27  ############################################################################### 
 28  # Redis Callback Handler 
 29  ############################################################################## 
 30   
31 -class RedisListenerThread(threading.Thread):
32 ''' 33 Tunes into the redis channels that have been subscribed to and 34 calls the apropriate callbacks. 35 '''
36 - def __init__(self, redis_pubsub_server, remote_gateway_request_callbacks, hub_connection_lost_hook):
37 threading.Thread.__init__(self) 38 self._redis_pubsub_server = redis_pubsub_server 39 self._remote_gateway_request_callbacks = remote_gateway_request_callbacks 40 self._hub_connection_lost_hook = hub_connection_lost_hook
41
42 - def run(self):
43 ''' 44 Used as a callback for incoming requests on redis pubsub channels. 45 46 The received argument is a list of strings for 'flip': 47 48 - [0] - command : this one is 'flip' 49 - [1] - remote_gateway : the name of the gateway that is flipping to us 50 - [2] - remote_name 51 - [3] - remote_node 52 - [4] - type : one of ConnectionType.PUBLISHER etc 53 - [5] - type_info : a ros format type (e.g. std_msgs/String or service api) 54 - [6] - xmlrpc_uri : the xmlrpc node uri 55 56 The command 'unflip' is the same, not including args 5 and 6. 57 ''' 58 try: 59 # This is a generator so it will keep spitting out (alt. to having a while loop here) 60 for r in self._redis_pubsub_server.listen(): 61 if r['type'] != 'unsubscribe' and r['type'] != 'subscribe': 62 command, source, contents = utils.deserialize_request(r['data']) 63 rospy.logdebug("Gateway : redis listener received a channel publication from %s : [%s]" % (source, command)) 64 if command == 'flip': 65 registration = utils.Registration(utils.get_connection_from_list(contents), source) 66 self._remote_gateway_request_callbacks['flip'](registration) 67 elif command == 'unflip': 68 self._remote_gateway_request_callbacks['unflip'](utils.get_rule_from_list(contents), source) 69 else: 70 rospy.logerr("Gateway : received an unknown command from the hub.") 71 except redis.exceptions.ConnectionError: 72 self._hub_connection_lost_hook()
73 74 ############################################################################## 75 # Hub 76 ############################################################################## 77 78
79 -class GatewayHub(rocon_hub_client.Hub):
80
81 - def __init__(self, ip, port, whitelist, blacklist):
82 ''' 83 @param remote_gateway_request_callbacks : to handle redis responses 84 @type list of function pointers (back to GatewaySync class 85 86 @param ip : redis server ip 87 @param port : redis server port 88 89 @raise HubNameNotFoundError, HubNotFoundError 90 ''' 91 try: 92 super(GatewayHub, self).__init__(ip, port, whitelist, blacklist) # can just do super() in python3 93 except HubNotFoundError: 94 raise 95 except HubNameNotFoundError: 96 raise 97 self._hub_connection_lost_gateway_hook = None 98 self._firewall = 0
99 100 ########################################################################## 101 # Hub Connections 102 ########################################################################## 103
104 - def register_gateway(self, firewall, unique_gateway_name, remote_gateway_request_callbacks, hub_connection_lost_gateway_hook, gateway_ip):
105 ''' 106 Register a gateway with the hub. 107 108 @param firewall 109 @param unique_gateway_name 110 @param remote_gateway_request_callbacks 111 @param hub_connection_lost_hook : used to trigger Gateway.disengage_hub(hub) on lost hub connections in redis pubsub listener thread. 112 @gateway_ip 113 114 @raise HubConnectionLostError if for some reason, the redis server has become unavailable. 115 ''' 116 if not self._redis_server: 117 raise HubConnectionLostError() 118 self._unique_gateway_name = unique_gateway_name 119 self._redis_keys['gateway'] = hub_api.create_rocon_key(unique_gateway_name) 120 self._redis_keys['firewall'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'firewall') 121 self._firewall = 1 if firewall else 0 122 self._redis_keys['gatewaylist'] = hub_api.create_rocon_hub_key('gatewaylist') 123 self._remote_gateway_request_callbacks = remote_gateway_request_callbacks 124 self._hub_connection_lost_gateway_hook = hub_connection_lost_gateway_hook 125 if not self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway']): 126 # should never get here - unique should be unique 127 pass 128 unused_ret = self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway']) 129 self._redis_server.set(self._redis_keys['firewall'], self._firewall) 130 # I think we just used this for debugging, but we might want to hide it in future (it's the ros master hostname/ip) 131 self._redis_keys['ip'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'ip') 132 self._redis_server.set(self._redis_keys['ip'], gateway_ip) 133 self._redis_channels['gateway'] = self._redis_keys['gateway'] 134 self._redis_pubsub_server.subscribe(self._redis_channels['gateway']) 135 self.remote_gateway_listener_thread = RedisListenerThread(self._redis_pubsub_server, self._remote_gateway_request_callbacks, self._hub_connection_lost_hook) 136 self.remote_gateway_listener_thread.start()
137
139 ''' 140 This gets triggered by the redis pubsub listener when the hub connection is lost. 141 The trigger is passed to the gateway who needs to remove the hub. 142 ''' 143 if self._hub_connection_lost_gateway_hook is not None: 144 self._hub_connection_lost_gateway_hook(self)
145
146 - def unregister_gateway(self):
147 ''' 148 Remove all gateway info from the hub. 149 150 @return: success or failure of the operation 151 @rtype: bool 152 ''' 153 try: 154 self._redis_pubsub_server.unsubscribe() 155 gateway_keys = self._redis_server.keys(self._redis_keys['gateway'] + ":*") 156 pipe = self._redis_server.pipeline() 157 pipe.delete(*gateway_keys) 158 pipe.srem(self._redis_keys['gatewaylist'], self._redis_keys['gateway']) 159 pipe.execute() 160 self._redis_channels = {} 161 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError): 162 # usually just means the hub has gone down just before us or is in the 163 # middel of doing so let it die nice and peacefully 164 # rospy.logwarn("Gateway : problem unregistering from the hub (likely that hub shutdown before the gateway).") 165 pass 166 # should we not also shut down self.remote_gatew 167 rospy.loginfo("Gateway : unregistered from the hub [%s]" % self.name)
168 169 ########################################################################## 170 # Hub Data Retrieval 171 ########################################################################## 172
173 - def remote_gateway_info(self, gateway):
174 ''' 175 Return remote gateway information for the specified gateway string id. 176 177 @param gateways : gateway id string to search for 178 @type string 179 @return remote gateway information 180 @rtype gateway_msgs.RemotGateway or None 181 ''' 182 firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall')) 183 ip = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'ip')) 184 if firewall is None: 185 return None # equivalent to saying no gateway of this id found 186 else: 187 remote_gateway = gateway_msgs.RemoteGateway() 188 remote_gateway.name = gateway 189 remote_gateway.ip = ip 190 remote_gateway.firewall = True if int(firewall) else False 191 remote_gateway.public_interface = [] 192 encoded_advertisements = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'advertisements')) 193 for encoded_advertisement in encoded_advertisements: 194 advertisement = utils.deserialize_connection(encoded_advertisement) 195 remote_gateway.public_interface.append(advertisement.rule) 196 remote_gateway.flipped_interface = [] 197 encoded_flips = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'flips')) 198 for encoded_flip in encoded_flips: 199 [target_gateway, name, connection_type, node] = utils.deserialize(encoded_flip) 200 remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node)) 201 remote_gateway.flipped_interface.append(remote_rule) 202 remote_gateway.pulled_interface = [] 203 encoded_pulls = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'pulls')) 204 for encoded_pull in encoded_pulls: 205 [target_gateway, name, connection_type, node] = utils.deserialize(encoded_pull) 206 remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node)) 207 remote_gateway.pulled_interface.append(remote_rule) 208 return remote_gateway
209
211 ''' 212 Return a list of the gateways (name list, not redis keys). 213 e.g. ['gateway32adcda32','pirate21fasdf']. If not connected, just 214 returns an empty list. 215 ''' 216 if not self._redis_server: 217 rospy.logerr("Gateway : cannot retrieve remote gateway names [%s][%s]." % (self.name, self.uri)) 218 return [] 219 gateways = [] 220 try: 221 gateway_keys = self._redis_server.smembers(self._redis_keys['gatewaylist']) 222 for gateway in gateway_keys: 223 if hub_api.key_base_name(gateway) != self._unique_gateway_name: 224 gateways.append(hub_api.key_base_name(gateway)) 225 except redis.ConnectionError as unused_e: 226 pass 227 return gateways
228
229 - def matches_remote_gateway_name(self, gateway):
230 ''' 231 Use this when gateway can be a regular expression and 232 we need to check it off against list_remote_gateway_names() 233 234 @return a list of matches (higher level decides on action for duplicates). 235 @rtype list[str] : list of remote gateway names. 236 ''' 237 matches = [] 238 try: 239 for remote_gateway in self.list_remote_gateway_names(): 240 if re.match(gateway, remote_gateway): 241 matches.append(remote_gateway) 242 except HubConnectionLostError: 243 raise 244 return matches
245
246 - def matches_remote_gateway_basename(self, gateway):
247 ''' 248 Use this when gateway can be a regular expression and 249 we need to check it off against list_remote_gateway_names() 250 ''' 251 weak_matches = [] 252 try: 253 for remote_gateway in self.list_remote_gateway_names(): 254 if re.match(gateway, rocon_utilities.gateway_basename(remote_gateway)): 255 weak_matches.append(remote_gateway) 256 except HubConnectionLostError: 257 raise 258 return weak_matches
259
260 - def get_remote_connection_state(self, remote_gateway):
261 ''' 262 Equivalent to get_connection_state, but generates it from the public 263 interface of a remote gateway 264 265 @param remote_gateway : hash name for a remote gateway 266 @type str 267 @return dictionary of remote advertisements 268 @rtype dictionary of connection type keyed connection values 269 ''' 270 connections = utils.create_empty_connection_type_dictionary() 271 key = hub_api.create_rocon_gateway_key(remote_gateway, 'advertisements') 272 public_interface = self._redis_server.smembers(key) 273 for connection_str in public_interface: 274 connection = utils.deserialize_connection(connection_str) 275 connections[connection.rule.type].append(connection) 276 return connections
277
278 - def get_remote_gateway_firewall_flag(self, gateway):
279 ''' 280 Returns the value of the remote gateway's firewall (flip) 281 flag. 282 283 @param gateway : gateway string id 284 @param string 285 286 @return state of the flag 287 @rtype Bool 288 289 @raise GatewayUnavailableError when specified gateway is not on the hub 290 ''' 291 firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall')) 292 if firewall is not None: 293 return True if int(firewall) else False 294 else: 295 raise GatewayUnavailableError
296 297 ########################################################################## 298 # Posting Information to the Hub 299 ########################################################################## 300
301 - def advertise(self, connection):
302 ''' 303 Places a topic, service or action on the public interface. On the 304 redis server, this representation will always be: 305 306 - topic : a triple { name, type, xmlrpc node uri } 307 - service : a triple { name, rosrpc uri, xmlrpc node uri } 308 - action : ??? 309 310 @param connection: representation of a connection (topic, service, action) 311 @type connection: str 312 @raise .exceptions.ConnectionTypeError: if connection arg is invalid. 313 ''' 314 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements') 315 msg_str = utils.serialize_connection(connection) 316 self._redis_server.sadd(key, msg_str)
317
318 - def unadvertise(self, connection):
319 ''' 320 Removes a topic, service or action from the public interface. 321 322 @param connection: representation of a connection (topic, service, action) 323 @type connection: str 324 @raise .exceptions.ConnectionTypeError: if connectionarg is invalid. 325 ''' 326 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements') 327 msg_str = utils.serialize_connection(connection) 328 self._redis_server.srem(key, msg_str)
329
330 - def post_flip_details(self, gateway, name, connection_type, node):
331 ''' 332 Post flip details to the redis server. This has no actual functionality, 333 it is just useful for debugging with the remote_gateway_info service. 334 335 @param gateway : the target of the flip 336 @type string 337 @param name : the name of the connection 338 @type string 339 @param type : the type of the connection (one of ConnectionType.xxx 340 @type string 341 @param node : the node name it was pulled from 342 @type string 343 ''' 344 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips') 345 serialized_data = utils.serialize([gateway, name, connection_type, node]) 346 self._redis_server.sadd(key, serialized_data)
347
348 - def remove_flip_details(self, gateway, name, connection_type, node):
349 ''' 350 Post flip details to the redis server. This has no actual functionality, 351 it is just useful for debugging with the remote_gateway_info service. 352 353 @param gateway : the target of the flip 354 @type string 355 @param name : the name of the connection 356 @type string 357 @param type : the type of the connection (one of ConnectionType.xxx 358 @type string 359 @param node : the node name it was pulled from 360 @type string 361 ''' 362 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips') 363 serialized_data = utils.serialize([gateway, name, connection_type, node]) 364 self._redis_server.srem(key, serialized_data)
365
366 - def post_pull_details(self, gateway, name, connection_type, node):
367 ''' 368 Post pull details to the hub. This has no actual functionality, 369 it is just useful for debugging with the remote_gateway_info service. 370 371 @param gateway : the gateway it is pulling from 372 @type string 373 @param name : the name of the connection 374 @type string 375 @param type : the type of the connection (one of ConnectionType.xxx 376 @type string 377 @param node : the node name it was pulled from 378 @type string 379 ''' 380 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls') 381 serialized_data = utils.serialize([gateway, name, connection_type, node]) 382 self._redis_server.sadd(key, serialized_data)
383
384 - def remove_pull_details(self, gateway, name, connection_type, node):
385 ''' 386 Post pull details to the hub. This has no actual functionality, 387 it is just useful for debugging with the remote_gateway_info service. 388 389 @param gateway : the gateway it was pulling from 390 @type string 391 @param name : the name of the connection 392 @type string 393 @param type : the type of the connection (one of ConnectionType.xxx 394 @type string 395 @param node : the node name it was pulled from 396 @type string 397 ''' 398 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls') 399 serialized_data = utils.serialize([gateway, name, connection_type, node]) 400 self._redis_server.srem(key, serialized_data)
401 402 ########################################################################## 403 # Gateway-Gateway Communications 404 ########################################################################## 405
406 - def send_flip_request(self, remote_gateway, connection):
407 ''' 408 Sends a message to the remote gateway via redis pubsub channel. This is called from the 409 watcher thread, when a flip rule gets activated. 410 411 - redis channel name: rocon:<remote_gateway_name> 412 - data : list of [ command, gateway, rule type, type, xmlrpc_uri ] 413 - [0] - command : in this case 'flip' 414 - [1] - gateway : the name of this gateway, i.e. the flipper 415 - [2] - name : local name 416 - [3] - node : local node name 417 - [4] - connection_type : one of ConnectionType.PUBLISHER etc 418 - [5] - type_info : a ros format type (e.g. std_msgs/String or service api) 419 - [6] - xmlrpc_uri : the xmlrpc node uri 420 421 @param command : string command name - either 'flip' or 'unflip' 422 @type str 423 424 @param flip_rule : the flip to send 425 @type gateway_msgs.RemoteRule 426 427 @param type_info : topic type (e.g. std_msgs/String) 428 @param str 429 430 @param xmlrpc_uri : the node uri 431 @param str 432 ''' 433 source = hub_api.key_base_name(self._redis_keys['gateway']) 434 cmd = utils.serialize_connection_request('flip', source, connection) 435 try: 436 self._redis_server.publish(hub_api.create_rocon_key(remote_gateway), cmd) 437 except Exception as unused_e: 438 return False 439 return True
440
441 - def send_unflip_request(self, remote_gateway, rule):
442 if rule.type == gateway_msgs.ConnectionType.ACTION_CLIENT: 443 action_name = rule.name 444 rule.type = gateway_msgs.ConnectionType.PUBLISHER 445 rule.name = action_name + "/goal" 446 self._send_unflip_request(remote_gateway, rule) 447 rule.name = action_name + "/cancel" 448 self._send_unflip_request(remote_gateway, rule) 449 rule.type = gateway_msgs.ConnectionType.SUBSCRIBER 450 rule.name = action_name + "/feedback" 451 self._send_unflip_request(remote_gateway, rule) 452 rule.name = action_name + "/status" 453 self._send_unflip_request(remote_gateway, rule) 454 rule.name = action_name + "/result" 455 self._send_unflip_request(remote_gateway, rule) 456 elif rule.type == gateway_msgs.ConnectionType.ACTION_SERVER: 457 action_name = rule.name 458 rule.type = gateway_msgs.ConnectionType.SUBSCRIBER 459 rule.name = action_name + "/goal" 460 self._send_unflip_request(remote_gateway, rule) 461 rule.name = action_name + "/cancel" 462 self._send_unflip_request(remote_gateway, rule) 463 rule.type = gateway_msgs.ConnectionType.PUBLISHER 464 rule.name = action_name + "/feedback" 465 self._send_unflip_request(remote_gateway, rule) 466 rule.name = action_name + "/status" 467 self._send_unflip_request(remote_gateway, rule) 468 rule.name = action_name + "/result" 469 self._send_unflip_request(remote_gateway, rule) 470 else: 471 self._send_unflip_request(remote_gateway, rule)
472
473 - def _send_unflip_request(self, remote_gateway, rule):
474 source = hub_api.key_base_name(self._redis_keys['gateway']) 475 cmd = utils.serialize_rule_request('unflip', source, rule) 476 try: 477 self._redis_server.publish(hub_api.create_rocon_key(remote_gateway), cmd) 478 except Exception as unused_e: 479 return False 480 return True
481