1
2
3
4
5
6
7
8
9
10
11 import rospy
12 import gateway_msgs.msg as gateway_msgs
13 import gateway_msgs.srv as gateway_srvs
14
15
16 import utils
17 import ros_parameters
18 from .watcher_thread import WatcherThread
19 from .flipped_interface import FlippedInterface
20 from .public_interface import PublicInterface
21 from .pulled_interface import PulledInterface
22 from .master_api import LocalMaster
23
24
25
26
27
28
30 '''
31 Used to synchronise with hubs.
32 '''
33 - def __init__(self, hub_manager, param, unique_name, publish_gateway_info_callback):
72
75
84
86 '''
87 We often check if we're connected to any hubs often just to ensure we
88 don't waste time processing if there is no-one listening.
89
90 @return True if at least one hub is connected, False otherwise
91 @rtype Bool
92 '''
93 return self.hub_manager.is_connected()
94
96 '''
97 Disengage from the specified hub. Don't actually need to clean up connections
98 here like we do in shutdown - that can be handled from the watcher thread itself.
99
100 @param hub : the hub that will be deleted.
101 '''
102 self.hub_manager.disengage_hub(hub)
103
104
105
106
107
109 '''
110 Process the list of local connections and check against
111 the current flip rules and patterns for changes. If a rule
112 has become (un)available take appropriate action.
113
114 @param local_connection_index : list of current local connections parsed from the master
115 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
116
117 @param gateways : list of remote gateway string id's
118 @type string
119 '''
120 state_changed = False
121 new_flips, lost_flips = self.flipped_interface.update(local_connection_index, remote_gateway_hub_index, self._unique_name)
122 for connection_type in utils.connection_types:
123 for flip in new_flips[connection_type]:
124 firewall_flag = self.hub_manager.get_remote_gateway_firewall_flag(flip.gateway)
125 if firewall_flag == True:
126 continue
127 state_changed = True
128
129 connections = self.master.generate_connection_details(flip.rule.type, flip.rule.name, flip.rule.node)
130 if connection_type == utils.ConnectionType.ACTION_CLIENT or connection_type == utils.ConnectionType.ACTION_SERVER:
131 rospy.loginfo("Gateway : sending flip request [%s]%s" % (flip.gateway, utils.format_rule(flip.rule)))
132 hub = remote_gateway_hub_index[flip.gateway][0]
133 hub.post_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
134 for connection in connections:
135 hub.send_flip_request(flip.gateway, connection)
136 else:
137 for connection in connections:
138 rospy.loginfo("Gateway : sending flip request [%s]%s" % (flip.gateway, utils.format_rule(connection.rule)))
139 hub = remote_gateway_hub_index[flip.gateway][0]
140 hub.send_flip_request(flip.gateway, connection)
141 hub.post_flip_details(flip.gateway, connection.rule.name, connection.rule.type, connection.rule.node)
142 for flip in lost_flips[connection_type]:
143 state_changed = True
144 rospy.loginfo("Gateway : sending unflip request [%s]%s" % (flip.gateway, utils.format_rule(flip.rule)))
145 hub = remote_gateway_hub_index[flip.gateway][0]
146 hub.send_unflip_request(flip.gateway, flip.rule)
147 hub.remove_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
148 if state_changed:
149 self._publish_gateway_info()
150
152 '''
153 Process the list of local connections and check against
154 the current pull rules and patterns for changes. If a rule
155 has become (un)available take appropriate action.
156
157 This is called by the watcher thread. The remote_gateway_hub_index
158 is always a full dictionary of all remote gateway and hub key-value
159 pairs - it is only included as an argument here to save
160 processing doubly in the watcher thread.
161
162 @param connections : list of current local connections parsed from the master
163 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
164
165 @param remote_gateway_hub_index : key-value remote gateway name-hub list pairs
166 @type dictionary of remote_gateway_name-list of hub_api.Hub objects key-value pairs
167 '''
168 state_changed = False
169 remote_connections = {}
170 for remote_gateway in remote_gateway_hub_index.keys() + self.pulled_interface.list_remote_gateway_names():
171
172
173 try:
174 hub = remote_gateway_hub_index[remote_gateway][0]
175 remote_connections[remote_gateway] = hub.get_remote_connection_state(remote_gateway)
176 except KeyError:
177 pass
178 new_pulls, lost_pulls = self.pulled_interface.update(remote_connections, self._unique_name)
179 for connection_type in utils.connection_types:
180 for pull in new_pulls[connection_type]:
181
182 connection = None
183 for remote_gateway in remote_connections.keys():
184 for c in remote_connections[remote_gateway][pull.rule.type]:
185 if c.rule.name == pull.rule.name and \
186 c.rule.node == pull.rule.node:
187 connection = c
188 break
189 if connection:
190 break
191
192 existing_registration = self.pulled_interface.find_registration_match(pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type)
193 if not existing_registration:
194 rospy.loginfo("Gateway : pulling in connection %s[%s]" % (utils.format_rule(pull.rule), remote_gateway))
195 registration = utils.Registration(connection, pull.gateway)
196 new_registration = self.master.register(registration)
197 self.pulled_interface.registrations[registration.connection.rule.type].append(new_registration)
198 hub = remote_gateway_hub_index[pull.gateway][0]
199 hub.post_pull_details(pull.gateway, pull.rule.name, pull.rule.type, pull.rule.node)
200 state_changed = True
201 for pull in lost_pulls[connection_type]:
202
203 existing_registration = self.pulled_interface.find_registration_match(pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type)
204 if existing_registration:
205 rospy.loginfo("Gateway : abandoning pulled connection %s[%s]" % (utils.format_rule(pull.rule), pull.gateway))
206 self.master.unregister(existing_registration)
207 if hub:
208 hub.remove_pull_details(pull.gateway, pull.rule.name, pull.rule.type, pull.rule.node)
209 self.pulled_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration)
210 state_changed = True
211 if state_changed:
212 self._publish_gateway_info()
213
215 '''
216 Process the list of local connections and check against
217 the current rules and patterns for changes. If a rule
218 has become (un)available take appropriate action.
219
220 @param local_connection_index : list of current local connections parsed from the master
221 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
222 '''
223 new_conns, lost_conns = self.public_interface.update(local_connection_index)
224 public_interface = self.public_interface.getInterface()
225 for connection_type in utils.connection_types:
226 for new_connection in new_conns[connection_type]:
227 connection = self.master.generate_advertisement_connection_details(new_connection.rule.type, new_connection.rule.name, new_connection.rule.node)
228 rospy.loginfo("Gateway : adding connection to public interface %s" % utils.format_rule(connection.rule))
229 self.hub_manager.advertise(connection)
230 for lost_connection in lost_conns[connection_type]:
231 rospy.loginfo("Gateway : removing connection from public interface %s" % utils.format_rule(lost_connection.rule))
232 self.hub_manager.unadvertise(lost_connection)
233 if new_conns or lost_conns:
234 self._publish_gateway_info()
235 return public_interface
236
237
238
239
240
242 '''
243 Used as a callback for incoming requests on redis pubsub channels.
244 It gets assigned to RedisManager.callback.
245
246 @param registration : fully detailed registration to be processed
247 @type utils.Registration
248 '''
249 pass
250 if self.flipped_interface.firewall:
251 rospy.logwarn("Gateway : firewalling a flip request %s" % registration)
252 else:
253 rospy.loginfo("Gateway : received a flip request %s" % registration)
254
255 existing_registration = self.flipped_interface.find_registration_match(registration.remote_gateway, registration.connection.rule.name, registration.connection.rule.node, registration.connection.rule.type)
256 if not existing_registration:
257 new_registration = self.master.register(registration)
258 if new_registration:
259 self.flipped_interface.registrations[registration.connection.rule.type].append(new_registration)
260 self._publish_gateway_info()
261
263 rospy.loginfo("Gateway : received an unflip request from gateway %s: %s" % (remote_gateway, utils.format_rule(rule)))
264 existing_registration = self.flipped_interface.find_registration_match(remote_gateway, rule.name, rule.node, rule.type)
265 if existing_registration:
266 self.master.unregister(existing_registration)
267 self.flipped_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration)
268 self._publish_gateway_info()
269
270
271
272
273
275 '''
276 Puts/Removes a number of rules on the public interface watchlist.
277 As local rules matching these rules become available/go away,
278 the public interface is modified accordingly. A manual update is done
279 at the end of the advertise call to quickly capture existing
280 rules
281
282 @param request
283 @type gateway_srvs.AdvertiseRequest
284 @return service response
285 @rtgateway_srvs.srv.AdvertiseReponse
286 '''
287 response = gateway_srvs.AdvertiseResponse()
288 try:
289 if not request.cancel:
290 for rule in request.rules:
291 if not self.public_interface.add_rule(rule):
292 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS
293 response.error_message = "advertisment rule already exists [%s:(%s,%s)]" % (rule.name, rule.type, rule.node)
294 else:
295 for rule in request.rules:
296 if not self.public_interface.remove_rule(rule):
297 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_NOT_FOUND
298 response.error_message = "advertisment not found [%s:(%s,%s)]" % (rule.name, rule.type, rule.node)
299 except Exception as e:
300 rospy.logerr("Gateway : unknown advertise error [%s]." % str(e))
301 response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR
302
303
304 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
305 self.watcher_thread.trigger_update = True
306 self._publish_gateway_info()
307 else:
308 rospy.logerr("Gateway : %s." % response.error_message)
309 response.watchlist = self.public_interface.getWatchlist()
310 return response
311
313 '''
314 Toggles the advertise all mode. If advertising all, an additional
315 blacklist parameter can be supplied which includes all the topics that
316 will not be advertised/watched for. This blacklist is added to the
317 default blacklist of the public interface
318
319 @param request
320 @type gateway_srvs.AdvertiseAllRequest
321 @return service response
322 @rtype gateway_srvs.AdvertiseAllReponse
323 '''
324 response = gateway_srvs.AdvertiseAllResponse()
325 try:
326 if not request.cancel:
327 if not self.public_interface.advertise_all(request.blacklist):
328 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS
329 response.error_message = "already advertising all."
330 else:
331 self.public_interface.unadvertise_all()
332 except Exception as e:
333 response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR
334 response.error_message = "unknown advertise all error [%s]" % (str(e))
335
336
337 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
338 self.watcher_thread.trigger_update = True
339 self._publish_gateway_info()
340 else:
341 rospy.logerr("Gateway : %s." % response.error_message)
342 response.blacklist = self.public_interface.getBlacklist()
343 return response
344
346 '''
347 Puts flip rules on a watchlist which (un)flips them when they
348 become (un)available.
349
350 @param request
351 @type gateway_srvs.RemoteRequest
352 @return service response
353 @rtype gateway_srvs.RemoteResponse
354 '''
355 response = gateway_srvs.RemoteResponse()
356
357 for remote in request.remotes:
358 remote.gateway, response.result, response.error_message = self._ros_service_remote_checks(remote.gateway)
359 if response.result != gateway_msgs.ErrorCodes.SUCCESS:
360 rospy.logerr("Gateway : %s." % response.error_message)
361 return response
362
363 added_rules = []
364 for remote in request.remotes:
365 if not request.cancel:
366 flip_rule = self.flipped_interface.add_rule(remote)
367 if flip_rule:
368 added_rules.append(flip_rule)
369 rospy.loginfo("Gateway : added flip rule [%s:(%s,%s)]" % (flip_rule.gateway, flip_rule.rule.name, flip_rule.rule.type))
370 else:
371 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
372 response.error_message = "flip rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
373 break
374 else:
375 removed_flip_rules = self.flipped_interface.remove_rule(remote)
376 if removed_flip_rules:
377 rospy.loginfo("Gateway : removed flip rule [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type))
378
379 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
380 self._publish_gateway_info()
381 self.watcher_thread.trigger_update = True
382 else:
383 if added_rules:
384 for added_rule in added_rules:
385 self.flipped_interface.remove_rule(added_rule)
386 rospy.logerr("Gateway : %s." % response.error_message)
387 return response
388
390 '''
391 Flips everything except a specified blacklist to a particular gateway,
392 or if the cancel flag is set, clears all flips to that gateway.
393
394 @param request
395 @type gateway_srvs.RemoteAllRequest
396 @return service response
397 @rtype gateway_srvs.RemoteAllResponse
398 '''
399 response = gateway_srvs.RemoteAllResponse()
400 remote_gateway_target_hash_name, response.result, response.error_message = self._ros_service_remote_checks(request.gateway)
401 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
402 if not request.cancel:
403 if self.flipped_interface.flip_all(remote_gateway_target_hash_name, request.blacklist):
404 rospy.loginfo("Gateway : flipping all to gateway '%s'" % (remote_gateway_target_hash_name))
405 else:
406 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
407 response.error_message = "already flipping all to gateway '%s' " + remote_gateway_target_hash_name
408 else:
409 self.flipped_interface.unflip_all(remote_gateway_target_hash_name)
410 rospy.loginfo("Gateway : cancelling a previous flip all request [%s]" % (request.gateway))
411 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
412 self._publish_gateway_info()
413 self.watcher_thread.trigger_update = True
414 else:
415 rospy.logerr("Gateway : %s." % response.error_message)
416 return response
417
419 '''
420 Puts a single rule on a watchlist and pulls it from a particular
421 gateway when it becomes (un)available.
422
423 @param request
424 @type gateway_srvs.RemoteRequest
425 @return service response
426 @rtype gateway_srvs.RemoteResponse
427 '''
428 response = gateway_srvs.RemoteResponse()
429
430 for remote in request.remotes:
431 remote.gateway, response.result, response.error_message = self._ros_service_remote_checks(remote.gateway)
432 if response.result != gateway_msgs.ErrorCodes.SUCCESS:
433 rospy.logerr("Gateway : %s." % response.error_message)
434 return response
435
436
437 added_rules = []
438 for remote in request.remotes:
439 if not request.cancel:
440 pull_rule = self.pulled_interface.add_rule(remote)
441 if pull_rule:
442 added_rules.append(pull_rule)
443 rospy.loginfo("Gateway : added pull rule [%s:(%s,%s)]" % (pull_rule.gateway, pull_rule.rule.name, pull_rule.rule.type))
444 else:
445 response.result = gateway_msgs.ErrorCodes.PULL_RULE_ALREADY_EXISTS
446 response.error_message = "pull rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
447 break
448 else:
449 for remote in request.remotes:
450 removed_pull_rules = self.pulled_interface.remove_rule(remote)
451 if removed_pull_rules:
452 rospy.loginfo("Gateway : removed pull rule [%s:%s]" % (remote.gateway, remote.rule.name))
453 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
454 self._publish_gateway_info()
455 self.watcher_thread.trigger_update = True
456 else:
457 if added_rules:
458 for added_rule in added_rules:
459 self.pulled_interface.remove_rule(added_rule)
460 rospy.logerr("Gateway : %s." % response.error_message)
461 return response
462
464 '''
465 Pull everything except a specified blacklist from a particular gateway,
466 or if the cancel flag is set, clears all pulls from that gateway.
467
468 @param request
469 @type gateway_srvs.RemoteAllRequest
470 @return service response
471 @rtype gateway_srvs.RemoteAllResponse
472 '''
473 response = gateway_srvs.RemoteAllResponse()
474 remote_gateway_target_hash_name, response.result, response.error_message = self._ros_service_remote_checks(request.gateway)
475 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
476 if not request.cancel:
477 if self.pulled_interface.pull_all(remote_gateway_target_hash_name, request.blacklist):
478 rospy.loginfo("Gateway : pulling all from gateway '%s'" % (request.gateway))
479 else:
480 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
481 response.error_message = "already pulling all from gateway '%s' " + request.gateway
482 else:
483 self.pulled_interface.unpull_all(remote_gateway_target_hash_name)
484 rospy.loginfo("Gateway : cancelling a previous pull all request [%s]" % (request.gateway))
485 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
486 self._publish_gateway_info()
487 self.watcher_thread.trigger_update = True
488 else:
489 rospy.logerr("Gateway : %s." % response.error_message)
490 return response
491
493 '''
494 Some simple checks when pulling or flipping to make sure that the remote gateway is visible. It
495 does a strict check on the hash names first, then falls back to looking for weak matches on the
496 human friendly name.
497
498 @param gateway : remote gateway target name (can be hash name, basename or regex pattern)
499 @type string
500 @return pair of result type and message
501 @rtype gateway_msgs.ErrorCodes.xxx, string
502 '''
503 if not self.is_connected():
504 return None, gateway_msgs.ErrorCodes.NO_HUB_CONNECTION, "not connected to hub, aborting"
505 if gateway == self._unique_name:
506 return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_SELF_IS_NOT, "gateway cannot flip/pull to itself"
507 return gateway, gateway_msgs.ErrorCodes.SUCCESS, ""
508
509
510
511
512
513
514
515
516
517
518
519
520