1
2
3
4
5
6
7
8
9
10
11 import rospy
12 import gateway_msgs.msg as gateway_msgs
13 import gateway_msgs.srv as gateway_srvs
14
15
16 import utils
17 import ros_parameters
18 from .watcher_thread import WatcherThread
19 from .flipped_interface import FlippedInterface
20 from .public_interface import PublicInterface
21 from .pulled_interface import PulledInterface
22 from .master_api import LocalMaster
23
24
25
26
27
28
30 '''
31 Used to synchronise with hubs.
32 '''
33 - def __init__(self, hub_manager, param, unique_name, publish_gateway_info_callback):
72
75
84
86 '''
87 We often check if we're connected to any hubs often just to ensure we
88 don't waste time processing if there is no-one listening.
89
90 @return True if at least one hub is connected, False otherwise
91 @rtype Bool
92 '''
93 return self.hub_manager.is_connected()
94
96 '''
97 Disengage from the specified hub. Don't actually need to clean up connections
98 here like we do in shutdown - that can be handled from the watcher thread itself.
99
100 @param hub : the hub that will be deleted.
101 '''
102 self.hub_manager.disengage_hub(hub)
103
104
105
106
107
109 '''
110 Process the list of local connections and check against
111 the current flip rules and patterns for changes. If a rule
112 has become (un)available take appropriate action.
113
114 @param local_connection_index : list of current local connections parsed from the master
115 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
116
117 @param gateways : list of remote gateway string id's
118 @type string
119 '''
120 state_changed = False
121 new_flips, lost_flips = self.flipped_interface.update(local_connection_index, remote_gateway_hub_index, self._unique_name)
122 for connection_type in utils.connection_types:
123 for flip in new_flips[connection_type]:
124 firewall_flag = self.hub_manager.get_remote_gateway_firewall_flag(flip.gateway)
125 if firewall_flag == True:
126 continue
127 state_changed = True
128
129 connections = self.master.generate_connection_details(flip.rule.type, flip.rule.name, flip.rule.node)
130 if connection_type == utils.ConnectionType.ACTION_CLIENT or connection_type == utils.ConnectionType.ACTION_SERVER:
131 rospy.loginfo("Gateway : sending flip request [%s]%s" % (flip.gateway, utils.format_rule(flip.rule)))
132 hub = remote_gateway_hub_index[flip.gateway][0]
133 hub.post_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
134 for connection in connections:
135 hub.send_flip_request(flip.gateway, connection)
136 else:
137 for connection in connections:
138 rospy.loginfo("Gateway : sending flip request [%s]%s" % (flip.gateway, utils.format_rule(connection.rule)))
139 hub = remote_gateway_hub_index[flip.gateway][0]
140 hub.send_flip_request(flip.gateway, connection)
141 hub.post_flip_details(flip.gateway, connection.rule.name, connection.rule.type, connection.rule.node)
142 for flip in lost_flips[connection_type]:
143 state_changed = True
144 rospy.loginfo("Gateway : sending unflip request [%s]%s" % (flip.gateway, utils.format_rule(flip.rule)))
145 hub = remote_gateway_hub_index[flip.gateway][0]
146 hub.send_unflip_request(flip.gateway, flip.rule)
147 hub.remove_flip_details(flip.gateway, flip.rule.name, flip.rule.type, flip.rule.node)
148 if state_changed:
149 self._publish_gateway_info()
150
152 '''
153 Process the list of local connections and check against
154 the current pull rules and patterns for changes. If a rule
155 has become (un)available take appropriate action.
156
157 This is called by the watcher thread. The remote_gateway_hub_index
158 is always a full dictionary of all remote gateway and hub key-value
159 pairs - it is only included as an argument here to save
160 processing doubly in the watcher thread.
161
162 @param connections : list of current local connections parsed from the master
163 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
164
165 @param remote_gateway_hub_index : key-value remote gateway name-hub list pairs
166 @type dictionary of remote_gateway_name-list of hub_api.Hub objects key-value pairs
167 '''
168 state_changed = False
169 remote_connections = {}
170 for remote_gateway in remote_gateway_hub_index.keys() + self.pulled_interface.list_remote_gateway_names():
171
172
173 try:
174 hub = remote_gateway_hub_index[remote_gateway][0]
175 remote_connections[remote_gateway] = hub.get_remote_connection_state(remote_gateway)
176 except KeyError:
177 pass
178 new_pulls, lost_pulls = self.pulled_interface.update(remote_connections, self._unique_name)
179 for connection_type in utils.connection_types:
180 for pull in new_pulls[connection_type]:
181
182 connection = None
183 for remote_gateway in remote_connections.keys():
184 for c in remote_connections[remote_gateway][pull.rule.type]:
185 if c.rule.name == pull.rule.name and \
186 c.rule.node == pull.rule.node:
187 connection = c
188 break
189 if connection:
190 break
191
192 existing_registration = self.pulled_interface.find_registration_match(pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type)
193 if not existing_registration:
194 rospy.loginfo("Gateway : pulling in connection %s[%s]" % (utils.format_rule(pull.rule), remote_gateway))
195 registration = utils.Registration(connection, pull.gateway)
196 new_registration = self.master.register(registration)
197 self.pulled_interface.registrations[registration.connection.rule.type].append(new_registration)
198 hub = remote_gateway_hub_index[pull.gateway][0]
199 hub.post_pull_details(pull.gateway, pull.rule.name, pull.rule.type, pull.rule.node)
200 state_changed = True
201 for pull in lost_pulls[connection_type]:
202
203 existing_registration = self.pulled_interface.find_registration_match(pull.gateway, pull.rule.name, pull.rule.node, pull.rule.type)
204 if existing_registration:
205 rospy.loginfo("Gateway : abandoning pulled connection %s[%s]" % (utils.format_rule(pull.rule), pull.gateway))
206 self.master.unregister(existing_registration)
207
208
209
210
211
212 self.pulled_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration)
213 state_changed = True
214 if state_changed:
215 self._publish_gateway_info()
216
218 '''
219 Process the list of local connections and check against
220 the current rules and patterns for changes. If a rule
221 has become (un)available take appropriate action.
222
223 @param local_connection_index : list of current local connections parsed from the master
224 @type : dictionary of ConnectionType.xxx keyed lists of utils.Connections
225 '''
226 new_conns, lost_conns = self.public_interface.update(local_connection_index)
227 public_interface = self.public_interface.getInterface()
228 for connection_type in utils.connection_types:
229 for new_connection in new_conns[connection_type]:
230 connection = self.master.generate_advertisement_connection_details(new_connection.rule.type, new_connection.rule.name, new_connection.rule.node)
231 rospy.loginfo("Gateway : adding connection to public interface %s" % utils.format_rule(connection.rule))
232 self.hub_manager.advertise(connection)
233 for lost_connection in lost_conns[connection_type]:
234 rospy.loginfo("Gateway : removing connection from public interface %s" % utils.format_rule(lost_connection.rule))
235 self.hub_manager.unadvertise(lost_connection)
236 if new_conns or lost_conns:
237 self._publish_gateway_info()
238 return public_interface
239
240
241
242
243
245 '''
246 Used as a callback for incoming requests on redis pubsub channels.
247 It gets assigned to RedisManager.callback.
248
249 @param registration : fully detailed registration to be processed
250 @type utils.Registration
251 '''
252 pass
253 if self.flipped_interface.firewall:
254 rospy.logwarn("Gateway : firewalling a flip request %s" % registration)
255 else:
256 rospy.loginfo("Gateway : received a flip request %s" % registration)
257
258 existing_registration = self.flipped_interface.find_registration_match(registration.remote_gateway, registration.connection.rule.name, registration.connection.rule.node, registration.connection.rule.type)
259 if not existing_registration:
260 new_registration = self.master.register(registration)
261 if new_registration:
262 self.flipped_interface.registrations[registration.connection.rule.type].append(new_registration)
263 self._publish_gateway_info()
264
266 rospy.loginfo("Gateway : received an unflip request from gateway %s: %s" % (remote_gateway, utils.format_rule(rule)))
267 existing_registration = self.flipped_interface.find_registration_match(remote_gateway, rule.name, rule.node, rule.type)
268 if existing_registration:
269 self.master.unregister(existing_registration)
270 self.flipped_interface.registrations[existing_registration.connection.rule.type].remove(existing_registration)
271 self._publish_gateway_info()
272
273
274
275
276
278 '''
279 Configures the watcher period. This is useful to slow/speed up the
280 watcher loop. Quite often you want it polling quickly early while
281 configuring connections, but on long loops later when it does not have
282 to do very much except look for shutdown.
283
284 @param request
285 @type gateway_srvs.SetWatcherPeriodRequest
286 @return service response
287 @rtgateway_srvs.srv.SetWatcherPeriodResponse
288 '''
289 self.watcher_thread.set_watch_loop_period(request.period)
290 return gateway_srvs.SetWatcherPeriodResponse(self.watcher_thread.get_watch_loop_period().to_sec())
291
293 '''
294 Trigger a watcher loop update
295 '''
296 self.watcher_thread.trigger_update = True
297
299 '''
300 Puts/Removes a number of rules on the public interface watchlist.
301 As local rules matching these rules become available/go away,
302 the public interface is modified accordingly. A manual update is done
303 at the end of the advertise call to quickly capture existing
304 rules
305
306 @param request
307 @type gateway_srvs.AdvertiseRequest
308 @return service response
309 @rtgateway_srvs.srv.AdvertiseReponse
310 '''
311 response = gateway_srvs.AdvertiseResponse()
312 try:
313 if not request.cancel:
314 for rule in request.rules:
315 if not self.public_interface.add_rule(rule):
316 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS
317 response.error_message = "advertisment rule already exists [%s:(%s,%s)]" % (rule.name, rule.type, rule.node)
318 else:
319 for rule in request.rules:
320 if not self.public_interface.remove_rule(rule):
321 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_NOT_FOUND
322 response.error_message = "advertisment not found [%s:(%s,%s)]" % (rule.name, rule.type, rule.node)
323 except Exception as e:
324 rospy.logerr("Gateway : unknown advertise error [%s]." % str(e))
325 response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR
326
327
328 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
329 self.watcher_thread.trigger_update = True
330 self._publish_gateway_info()
331 else:
332 rospy.logerr("Gateway : %s." % response.error_message)
333 response.watchlist = self.public_interface.getWatchlist()
334 return response
335
337 '''
338 Toggles the advertise all mode. If advertising all, an additional
339 blacklist parameter can be supplied which includes all the topics that
340 will not be advertised/watched for. This blacklist is added to the
341 default blacklist of the public interface
342
343 @param request
344 @type gateway_srvs.AdvertiseAllRequest
345 @return service response
346 @rtype gateway_srvs.AdvertiseAllReponse
347 '''
348 response = gateway_srvs.AdvertiseAllResponse()
349 try:
350 if not request.cancel:
351 if not self.public_interface.advertise_all(request.blacklist):
352 response.result = gateway_msgs.ErrorCodes.ADVERTISEMENT_EXISTS
353 response.error_message = "already advertising all."
354 else:
355 self.public_interface.unadvertise_all()
356 except Exception as e:
357 response.result = gateway_msgs.ErrorCodes.UNKNOWN_ADVERTISEMENT_ERROR
358 response.error_message = "unknown advertise all error [%s]" % (str(e))
359
360
361 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
362 self.watcher_thread.trigger_update = True
363 self._publish_gateway_info()
364 else:
365 rospy.logerr("Gateway : %s." % response.error_message)
366 response.blacklist = self.public_interface.getBlacklist()
367 return response
368
370 '''
371 Puts flip rules on a watchlist which (un)flips them when they
372 become (un)available.
373
374 @param request
375 @type gateway_srvs.RemoteRequest
376 @return service response
377 @rtype gateway_srvs.RemoteResponse
378 '''
379 response = gateway_srvs.RemoteResponse()
380
381 for remote in request.remotes:
382 remote.gateway, response.result, response.error_message = self._ros_service_remote_checks(remote.gateway)
383 if response.result != gateway_msgs.ErrorCodes.SUCCESS:
384 rospy.logerr("Gateway : %s." % response.error_message)
385 return response
386
387 added_rules = []
388 for remote in request.remotes:
389 if not request.cancel:
390 flip_rule = self.flipped_interface.add_rule(remote)
391 if flip_rule:
392 added_rules.append(flip_rule)
393 rospy.loginfo("Gateway : added flip rule [%s:(%s,%s)]" % (flip_rule.gateway, flip_rule.rule.name, flip_rule.rule.type))
394 else:
395 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
396 response.error_message = "flip rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
397 break
398 else:
399 removed_flip_rules = self.flipped_interface.remove_rule(remote)
400 if removed_flip_rules:
401 rospy.loginfo("Gateway : removed flip rule [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type))
402
403 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
404 self._publish_gateway_info()
405 self.watcher_thread.trigger_update = True
406 else:
407 if added_rules:
408 for added_rule in added_rules:
409 self.flipped_interface.remove_rule(added_rule)
410 rospy.logerr("Gateway : %s." % response.error_message)
411 return response
412
414 '''
415 Flips everything except a specified blacklist to a particular gateway,
416 or if the cancel flag is set, clears all flips to that gateway.
417
418 @param request
419 @type gateway_srvs.RemoteAllRequest
420 @return service response
421 @rtype gateway_srvs.RemoteAllResponse
422 '''
423 response = gateway_srvs.RemoteAllResponse()
424 remote_gateway_target_hash_name, response.result, response.error_message = self._ros_service_remote_checks(request.gateway)
425 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
426 if not request.cancel:
427 if self.flipped_interface.flip_all(remote_gateway_target_hash_name, request.blacklist):
428 rospy.loginfo("Gateway : flipping all to gateway '%s'" % (remote_gateway_target_hash_name))
429 else:
430 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
431 response.error_message = "already flipping all to gateway '%s' " + remote_gateway_target_hash_name
432 else:
433 self.flipped_interface.unflip_all(remote_gateway_target_hash_name)
434 rospy.loginfo("Gateway : cancelling a previous flip all request [%s]" % (request.gateway))
435 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
436 self._publish_gateway_info()
437 self.watcher_thread.trigger_update = True
438 else:
439 rospy.logerr("Gateway : %s." % response.error_message)
440 return response
441
443 '''
444 Puts a single rule on a watchlist and pulls it from a particular
445 gateway when it becomes (un)available.
446
447 @param request
448 @type gateway_srvs.RemoteRequest
449 @return service response
450 @rtype gateway_srvs.RemoteResponse
451 '''
452 response = gateway_srvs.RemoteResponse()
453
454 for remote in request.remotes:
455 remote.gateway, response.result, response.error_message = self._ros_service_remote_checks(remote.gateway)
456 if response.result != gateway_msgs.ErrorCodes.SUCCESS:
457 rospy.logerr("Gateway : %s." % response.error_message)
458 return response
459
460
461 added_rules = []
462 for remote in request.remotes:
463 if not request.cancel:
464 pull_rule = self.pulled_interface.add_rule(remote)
465 if pull_rule:
466 added_rules.append(pull_rule)
467 rospy.loginfo("Gateway : added pull rule [%s:(%s,%s)]" % (pull_rule.gateway, pull_rule.rule.name, pull_rule.rule.type))
468 else:
469 response.result = gateway_msgs.ErrorCodes.PULL_RULE_ALREADY_EXISTS
470 response.error_message = "pull rule already exists [%s:(%s,%s)]" % (remote.gateway, remote.rule.name, remote.rule.type)
471 break
472 else:
473 for remote in request.remotes:
474 removed_pull_rules = self.pulled_interface.remove_rule(remote)
475 if removed_pull_rules:
476 rospy.loginfo("Gateway : removed pull rule [%s:%s]" % (remote.gateway, remote.rule.name))
477 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
478 self._publish_gateway_info()
479 self.watcher_thread.trigger_update = True
480 else:
481 if added_rules:
482 for added_rule in added_rules:
483 self.pulled_interface.remove_rule(added_rule)
484 rospy.logerr("Gateway : %s." % response.error_message)
485 return response
486
488 '''
489 Pull everything except a specified blacklist from a particular gateway,
490 or if the cancel flag is set, clears all pulls from that gateway.
491
492 @param request
493 @type gateway_srvs.RemoteAllRequest
494 @return service response
495 @rtype gateway_srvs.RemoteAllResponse
496 '''
497 response = gateway_srvs.RemoteAllResponse()
498 remote_gateway_target_hash_name, response.result, response.error_message = self._ros_service_remote_checks(request.gateway)
499 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
500 if not request.cancel:
501 if self.pulled_interface.pull_all(remote_gateway_target_hash_name, request.blacklist):
502 rospy.loginfo("Gateway : pulling all from gateway '%s'" % (request.gateway))
503 else:
504 response.result = gateway_msgs.ErrorCodes.FLIP_RULE_ALREADY_EXISTS
505 response.error_message = "already pulling all from gateway '%s' " + request.gateway
506 else:
507 self.pulled_interface.unpull_all(remote_gateway_target_hash_name)
508 rospy.loginfo("Gateway : cancelling a previous pull all request [%s]" % (request.gateway))
509 if response.result == gateway_msgs.ErrorCodes.SUCCESS:
510 self._publish_gateway_info()
511 self.watcher_thread.trigger_update = True
512 else:
513 rospy.logerr("Gateway : %s." % response.error_message)
514 return response
515
517 '''
518 Some simple checks when pulling or flipping to make sure that the remote gateway is visible. It
519 does a strict check on the hash names first, then falls back to looking for weak matches on the
520 human friendly name.
521
522 @param gateway : remote gateway target name (can be hash name, basename or regex pattern)
523 @type string
524 @return pair of result type and message
525 @rtype gateway_msgs.ErrorCodes.xxx, string
526 '''
527 if not self.is_connected():
528 return None, gateway_msgs.ErrorCodes.NO_HUB_CONNECTION, "not connected to hub, aborting"
529 if gateway == self._unique_name:
530 return None, gateway_msgs.ErrorCodes.REMOTE_GATEWAY_SELF_IS_NOT, "gateway cannot flip/pull to itself"
531 return gateway, gateway_msgs.ErrorCodes.SUCCESS, ""
532
533
534
535
536
537
538
539
540
541
542
543
544