1
2
3
4
5
6
7
8
9
10 import redis
11 import threading
12 import rospy
13 import re
14 import utils
15 import gateway_msgs.msg as gateway_msgs
16 import rocon_utilities
17 import rocon_hub_client
18 from rocon_hub_client import hub_api
19 from rocon_hub_client.exceptions import HubConnectionLostError, \
20 HubNameNotFoundError, HubNotFoundError
21
22
23
24 from .exceptions import GatewayUnavailableError
25
26
27
28
29
30
32 '''
33 Tunes into the redis channels that have been subscribed to and
34 calls the apropriate callbacks.
35 '''
36 - def __init__(self, redis_pubsub_server, remote_gateway_request_callbacks, hub_connection_lost_hook):
37 threading.Thread.__init__(self)
38 self._redis_pubsub_server = redis_pubsub_server
39 self._remote_gateway_request_callbacks = remote_gateway_request_callbacks
40 self._hub_connection_lost_hook = hub_connection_lost_hook
41
43 '''
44 Used as a callback for incoming requests on redis pubsub channels.
45
46 The received argument is a list of strings for 'flip':
47
48 - [0] - command : this one is 'flip'
49 - [1] - remote_gateway : the name of the gateway that is flipping to us
50 - [2] - remote_name
51 - [3] - remote_node
52 - [4] - type : one of ConnectionType.PUBLISHER etc
53 - [5] - type_info : a ros format type (e.g. std_msgs/String or service api)
54 - [6] - xmlrpc_uri : the xmlrpc node uri
55
56 The command 'unflip' is the same, not including args 5 and 6.
57 '''
58 try:
59
60 for r in self._redis_pubsub_server.listen():
61 if r['type'] != 'unsubscribe' and r['type'] != 'subscribe':
62 command, source, contents = utils.deserialize_request(r['data'])
63 rospy.logdebug("Gateway : redis listener received a channel publication from %s : [%s]" % (source, command))
64 if command == 'flip':
65 registration = utils.Registration(utils.get_connection_from_list(contents), source)
66 self._remote_gateway_request_callbacks['flip'](registration)
67 elif command == 'unflip':
68 self._remote_gateway_request_callbacks['unflip'](utils.get_rule_from_list(contents), source)
69 else:
70 rospy.logerr("Gateway : received an unknown command from the hub.")
71 except redis.exceptions.ConnectionError:
72 self._hub_connection_lost_hook()
73
74
75
76
77
78
80
81 - def __init__(self, ip, port, whitelist, blacklist):
82 '''
83 @param remote_gateway_request_callbacks : to handle redis responses
84 @type list of function pointers (back to GatewaySync class
85
86 @param ip : redis server ip
87 @param port : redis server port
88
89 @raise HubNameNotFoundError, HubNotFoundError
90 '''
91 try:
92 super(GatewayHub, self).__init__(ip, port, whitelist, blacklist)
93 except HubNotFoundError:
94 raise
95 except HubNameNotFoundError:
96 raise
97 self._hub_connection_lost_gateway_hook = None
98 self._firewall = 0
99
100
101
102
103
104 - def register_gateway(self, firewall, unique_gateway_name, remote_gateway_request_callbacks, hub_connection_lost_gateway_hook, gateway_ip):
105 '''
106 Register a gateway with the hub.
107
108 @param firewall
109 @param unique_gateway_name
110 @param remote_gateway_request_callbacks
111 @param hub_connection_lost_hook : used to trigger Gateway.disengage_hub(hub) on lost hub connections in redis pubsub listener thread.
112 @gateway_ip
113
114 @raise HubConnectionLostError if for some reason, the redis server has become unavailable.
115 '''
116 if not self._redis_server:
117 raise HubConnectionLostError()
118 self._unique_gateway_name = unique_gateway_name
119 self._redis_keys['gateway'] = hub_api.create_rocon_key(unique_gateway_name)
120 self._redis_keys['firewall'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'firewall')
121 self._firewall = 1 if firewall else 0
122 self._redis_keys['gatewaylist'] = hub_api.create_rocon_hub_key('gatewaylist')
123 self._remote_gateway_request_callbacks = remote_gateway_request_callbacks
124 self._hub_connection_lost_gateway_hook = hub_connection_lost_gateway_hook
125 if not self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway']):
126
127 pass
128 unused_ret = self._redis_server.sadd(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
129 self._redis_server.set(self._redis_keys['firewall'], self._firewall)
130
131 self._redis_keys['ip'] = hub_api.create_rocon_gateway_key(unique_gateway_name, 'ip')
132 self._redis_server.set(self._redis_keys['ip'], gateway_ip)
133 self._redis_channels['gateway'] = self._redis_keys['gateway']
134 self._redis_pubsub_server.subscribe(self._redis_channels['gateway'])
135 self.remote_gateway_listener_thread = RedisListenerThread(self._redis_pubsub_server, self._remote_gateway_request_callbacks, self._hub_connection_lost_hook)
136 self.remote_gateway_listener_thread.start()
137
139 '''
140 This gets triggered by the redis pubsub listener when the hub connection is lost.
141 The trigger is passed to the gateway who needs to remove the hub.
142 '''
143 if self._hub_connection_lost_gateway_hook is not None:
144 self._hub_connection_lost_gateway_hook(self)
145
147 '''
148 Remove all gateway info from the hub.
149
150 @return: success or failure of the operation
151 @rtype: bool
152 '''
153 try:
154 self._redis_pubsub_server.unsubscribe()
155 gateway_keys = self._redis_server.keys(self._redis_keys['gateway'] + ":*")
156 pipe = self._redis_server.pipeline()
157 pipe.delete(*gateway_keys)
158 pipe.srem(self._redis_keys['gatewaylist'], self._redis_keys['gateway'])
159 pipe.execute()
160 self._redis_channels = {}
161 except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError):
162
163
164
165 pass
166
167 rospy.loginfo("Gateway : unregistered from the hub [%s]" % self.name)
168
169
170
171
172
174 '''
175 Return remote gateway information for the specified gateway string id.
176
177 @param gateways : gateway id string to search for
178 @type string
179 @return remote gateway information
180 @rtype gateway_msgs.RemotGateway or None
181 '''
182 firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall'))
183 ip = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'ip'))
184 if firewall is None:
185 return None
186 else:
187 remote_gateway = gateway_msgs.RemoteGateway()
188 remote_gateway.name = gateway
189 remote_gateway.ip = ip
190 remote_gateway.firewall = True if int(firewall) else False
191 remote_gateway.public_interface = []
192 encoded_advertisements = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'advertisements'))
193 for encoded_advertisement in encoded_advertisements:
194 advertisement = utils.deserialize_connection(encoded_advertisement)
195 remote_gateway.public_interface.append(advertisement.rule)
196 remote_gateway.flipped_interface = []
197 encoded_flips = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'flips'))
198 for encoded_flip in encoded_flips:
199 [target_gateway, name, connection_type, node] = utils.deserialize(encoded_flip)
200 remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
201 remote_gateway.flipped_interface.append(remote_rule)
202 remote_gateway.pulled_interface = []
203 encoded_pulls = self._redis_server.smembers(hub_api.create_rocon_gateway_key(gateway, 'pulls'))
204 for encoded_pull in encoded_pulls:
205 [target_gateway, name, connection_type, node] = utils.deserialize(encoded_pull)
206 remote_rule = gateway_msgs.RemoteRule(target_gateway, gateway_msgs.Rule(connection_type, name, node))
207 remote_gateway.pulled_interface.append(remote_rule)
208 return remote_gateway
209
211 '''
212 Return a list of the gateways (name list, not redis keys).
213 e.g. ['gateway32adcda32','pirate21fasdf']. If not connected, just
214 returns an empty list.
215 '''
216 if not self._redis_server:
217 rospy.logerr("Gateway : cannot retrieve remote gateway names [%s][%s]." % (self.name, self.uri))
218 return []
219 gateways = []
220 try:
221 gateway_keys = self._redis_server.smembers(self._redis_keys['gatewaylist'])
222 for gateway in gateway_keys:
223 if hub_api.key_base_name(gateway) != self._unique_gateway_name:
224 gateways.append(hub_api.key_base_name(gateway))
225 except redis.ConnectionError as unused_e:
226 pass
227 return gateways
228
230 '''
231 Use this when gateway can be a regular expression and
232 we need to check it off against list_remote_gateway_names()
233
234 @return a list of matches (higher level decides on action for duplicates).
235 @rtype list[str] : list of remote gateway names.
236 '''
237 matches = []
238 try:
239 for remote_gateway in self.list_remote_gateway_names():
240 if re.match(gateway, remote_gateway):
241 matches.append(remote_gateway)
242 except HubConnectionLostError:
243 raise
244 return matches
245
247 '''
248 Use this when gateway can be a regular expression and
249 we need to check it off against list_remote_gateway_names()
250 '''
251 weak_matches = []
252 try:
253 for remote_gateway in self.list_remote_gateway_names():
254 if re.match(gateway, rocon_utilities.gateway_basename(remote_gateway)):
255 weak_matches.append(remote_gateway)
256 except HubConnectionLostError:
257 raise
258 return weak_matches
259
261 '''
262 Equivalent to get_connection_state, but generates it from the public
263 interface of a remote gateway
264
265 @param remote_gateway : hash name for a remote gateway
266 @type str
267 @return dictionary of remote advertisements
268 @rtype dictionary of connection type keyed connection values
269 '''
270 connections = utils.create_empty_connection_type_dictionary()
271 key = hub_api.create_rocon_gateway_key(remote_gateway, 'advertisements')
272 public_interface = self._redis_server.smembers(key)
273 for connection_str in public_interface:
274 connection = utils.deserialize_connection(connection_str)
275 connections[connection.rule.type].append(connection)
276 return connections
277
279 '''
280 Returns the value of the remote gateway's firewall (flip)
281 flag.
282
283 @param gateway : gateway string id
284 @param string
285
286 @return state of the flag
287 @rtype Bool
288
289 @raise GatewayUnavailableError when specified gateway is not on the hub
290 '''
291 firewall = self._redis_server.get(hub_api.create_rocon_gateway_key(gateway, 'firewall'))
292 if firewall is not None:
293 return True if int(firewall) else False
294 else:
295 raise GatewayUnavailableError
296
297
298
299
300
302 '''
303 Places a topic, service or action on the public interface. On the
304 redis server, this representation will always be:
305
306 - topic : a triple { name, type, xmlrpc node uri }
307 - service : a triple { name, rosrpc uri, xmlrpc node uri }
308 - action : ???
309
310 @param connection: representation of a connection (topic, service, action)
311 @type connection: str
312 @raise .exceptions.ConnectionTypeError: if connection arg is invalid.
313 '''
314 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
315 msg_str = utils.serialize_connection(connection)
316 self._redis_server.sadd(key, msg_str)
317
319 '''
320 Removes a topic, service or action from the public interface.
321
322 @param connection: representation of a connection (topic, service, action)
323 @type connection: str
324 @raise .exceptions.ConnectionTypeError: if connectionarg is invalid.
325 '''
326 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'advertisements')
327 msg_str = utils.serialize_connection(connection)
328 self._redis_server.srem(key, msg_str)
329
330 - def post_flip_details(self, gateway, name, connection_type, node):
331 '''
332 Post flip details to the redis server. This has no actual functionality,
333 it is just useful for debugging with the remote_gateway_info service.
334
335 @param gateway : the target of the flip
336 @type string
337 @param name : the name of the connection
338 @type string
339 @param type : the type of the connection (one of ConnectionType.xxx
340 @type string
341 @param node : the node name it was pulled from
342 @type string
343 '''
344 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips')
345 serialized_data = utils.serialize([gateway, name, connection_type, node])
346 self._redis_server.sadd(key, serialized_data)
347
349 '''
350 Post flip details to the redis server. This has no actual functionality,
351 it is just useful for debugging with the remote_gateway_info service.
352
353 @param gateway : the target of the flip
354 @type string
355 @param name : the name of the connection
356 @type string
357 @param type : the type of the connection (one of ConnectionType.xxx
358 @type string
359 @param node : the node name it was pulled from
360 @type string
361 '''
362 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'flips')
363 serialized_data = utils.serialize([gateway, name, connection_type, node])
364 self._redis_server.srem(key, serialized_data)
365
366 - def post_pull_details(self, gateway, name, connection_type, node):
367 '''
368 Post pull details to the hub. This has no actual functionality,
369 it is just useful for debugging with the remote_gateway_info service.
370
371 @param gateway : the gateway it is pulling from
372 @type string
373 @param name : the name of the connection
374 @type string
375 @param type : the type of the connection (one of ConnectionType.xxx
376 @type string
377 @param node : the node name it was pulled from
378 @type string
379 '''
380 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls')
381 serialized_data = utils.serialize([gateway, name, connection_type, node])
382 self._redis_server.sadd(key, serialized_data)
383
385 '''
386 Post pull details to the hub. This has no actual functionality,
387 it is just useful for debugging with the remote_gateway_info service.
388
389 @param gateway : the gateway it was pulling from
390 @type string
391 @param name : the name of the connection
392 @type string
393 @param type : the type of the connection (one of ConnectionType.xxx
394 @type string
395 @param node : the node name it was pulled from
396 @type string
397 '''
398 key = hub_api.create_rocon_gateway_key(self._unique_gateway_name, 'pulls')
399 serialized_data = utils.serialize([gateway, name, connection_type, node])
400 self._redis_server.srem(key, serialized_data)
401
402
403
404
405
407 '''
408 Sends a message to the remote gateway via redis pubsub channel. This is called from the
409 watcher thread, when a flip rule gets activated.
410
411 - redis channel name: rocon:<remote_gateway_name>
412 - data : list of [ command, gateway, rule type, type, xmlrpc_uri ]
413 - [0] - command : in this case 'flip'
414 - [1] - gateway : the name of this gateway, i.e. the flipper
415 - [2] - name : local name
416 - [3] - node : local node name
417 - [4] - connection_type : one of ConnectionType.PUBLISHER etc
418 - [5] - type_info : a ros format type (e.g. std_msgs/String or service api)
419 - [6] - xmlrpc_uri : the xmlrpc node uri
420
421 @param command : string command name - either 'flip' or 'unflip'
422 @type str
423
424 @param flip_rule : the flip to send
425 @type gateway_msgs.RemoteRule
426
427 @param type_info : topic type (e.g. std_msgs/String)
428 @param str
429
430 @param xmlrpc_uri : the node uri
431 @param str
432 '''
433 source = hub_api.key_base_name(self._redis_keys['gateway'])
434 cmd = utils.serialize_connection_request('flip', source, connection)
435 try:
436 self._redis_server.publish(hub_api.create_rocon_key(remote_gateway), cmd)
437 except Exception as unused_e:
438 return False
439 return True
440
442 if rule.type == gateway_msgs.ConnectionType.ACTION_CLIENT:
443 action_name = rule.name
444 rule.type = gateway_msgs.ConnectionType.PUBLISHER
445 rule.name = action_name + "/goal"
446 self._send_unflip_request(remote_gateway, rule)
447 rule.name = action_name + "/cancel"
448 self._send_unflip_request(remote_gateway, rule)
449 rule.type = gateway_msgs.ConnectionType.SUBSCRIBER
450 rule.name = action_name + "/feedback"
451 self._send_unflip_request(remote_gateway, rule)
452 rule.name = action_name + "/status"
453 self._send_unflip_request(remote_gateway, rule)
454 rule.name = action_name + "/result"
455 self._send_unflip_request(remote_gateway, rule)
456 elif rule.type == gateway_msgs.ConnectionType.ACTION_SERVER:
457 action_name = rule.name
458 rule.type = gateway_msgs.ConnectionType.SUBSCRIBER
459 rule.name = action_name + "/goal"
460 self._send_unflip_request(remote_gateway, rule)
461 rule.name = action_name + "/cancel"
462 self._send_unflip_request(remote_gateway, rule)
463 rule.type = gateway_msgs.ConnectionType.PUBLISHER
464 rule.name = action_name + "/feedback"
465 self._send_unflip_request(remote_gateway, rule)
466 rule.name = action_name + "/status"
467 self._send_unflip_request(remote_gateway, rule)
468 rule.name = action_name + "/result"
469 self._send_unflip_request(remote_gateway, rule)
470 else:
471 self._send_unflip_request(remote_gateway, rule)
472
474 source = hub_api.key_base_name(self._redis_keys['gateway'])
475 cmd = utils.serialize_rule_request('unflip', source, rule)
476 try:
477 self._redis_server.publish(hub_api.create_rocon_key(remote_gateway), cmd)
478 except Exception as unused_e:
479 return False
480 return True
481