1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 import random
35 import socket
36 import threading
37 import time
38 import traceback
39 import xmlrpclib
40
41 from multimaster_msgs_fkie.msg import SyncTopicInfo, SyncServiceInfo, SyncMasterInfo
42 import rospy
43
44 from master_discovery_fkie.common import masteruri_from_ros
45 from master_discovery_fkie.filter_interface import FilterInterface
46
47
49 '''
50 A thread to synchronize the local ROS master with a remote master. While the
51 synchronization only the topic of the remote ROS master will be registered by
52 the local ROS master. The remote ROS master will be keep unchanged.
53 '''
54
55 MAX_UPDATE_DELAY = 5
56
57 MSG_ANY_TYPE = '*'
58
59 - def __init__(self, name, uri, discoverer_name, monitoruri, timestamp, sync_on_demand=False):
60 '''
61 Initialization method for the SyncThread.
62 @param name: the name of the ROS master synchronized with.
63 @type name: C{str}
64 @param uri: the URI of the ROS master synchronized with
65 @type uri: C{str}
66 @param discoverer_name: the name of the discovery node running on ROS master synchronized with.
67 @type discoverer_name: C{str}
68 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once.
69 @type monitoruri: C{str}
70 @param timestamp: The timestamp of the current state of the ROS master info.
71 @type timestamp: C{float64}
72 @param sync_on_demand: Synchronize topics on demand
73 @type sync_on_demand: bool
74 '''
75 self.name = name
76 self.uri = uri
77 self.discoverer_name = discoverer_name
78 self.monitoruri = monitoruri
79 self.timestamp = timestamp
80 self.timestamp_local = 0.
81 self.timestamp_remote = 0.
82 self._online = True
83 self._offline_ts = 0
84
85 self.masteruri_local = masteruri_from_ros()
86 rospy.logdebug("SyncThread[%s]: create this sync thread", self.name)
87
88 self.__lock_info = threading.RLock()
89 self.__lock_intern = threading.RLock()
90 self._use_filtered_method = None
91
92 self.__sync_info = None
93 self.__unregistered = False
94
95 self.__publisher = []
96
97 self.__subscriber = []
98
99 self.__services = []
100
101
102 self.__own_state = None
103
104
105 self._filter = FilterInterface()
106 self._filter.load(self.name,
107 ['/rosout', rospy.get_name(), self.discoverer_name, '/node_manager', '/zeroconf'], [],
108 ['/rosout', '/rosout_agg'], ['/'] if sync_on_demand else [],
109 ['/*get_loggers', '/*set_logger_level'], [],
110
111 ['bond/Status'],
112 [], [],
113 [])
114
115
116
117
118 self._update_timer = None
119 self._delayed_update = 0
120 self.__on_update = False
121
123 '''
124 Returns the synchronized publisher, subscriber and services.
125 @rtype: SyncMasterInfo
126 '''
127 with self.__lock_info:
128 if self.__sync_info is None:
129
130 result_set = set()
131 result_publisher = []
132 result_subscriber = []
133 result_services = []
134 for (t_n, n_n, n_uri) in self.__publisher:
135 result_publisher.append(SyncTopicInfo(t_n, n_n, n_uri))
136 result_set.add(n_n)
137 for (t_n, n_n, n_uri) in self.__subscriber:
138 result_subscriber.append(SyncTopicInfo(t_n, n_n, n_uri))
139 result_set.add(n_n)
140 for (s_n, s_uri, n_n, n_uri) in self.__services:
141 result_services.append(SyncServiceInfo(s_n, s_uri, n_n, n_uri))
142 result_set.add(n_n)
143 self.__sync_info = SyncMasterInfo(self.uri, list(result_set), result_publisher, result_subscriber, result_services)
144 return self.__sync_info
145
146 - def set_online(self, value, resync_on_reconnect_timeout=0.):
147 if value:
148 if not self._online:
149 with self.__lock_intern:
150 self._online = True
151 offline_duration = time.time() - self._offline_ts
152 if offline_duration >= resync_on_reconnect_timeout:
153 rospy.loginfo("SyncThread[%s]: perform resync after the host was offline (unregister and register again to avoid connection losses to python topic. These does not suppot reconnection!)", self.name)
154 if self._update_timer is not None:
155 self._update_timer.cancel()
156 self._unreg_on_finish()
157 self.__unregistered = False
158 self.__publisher = []
159 self.__subscriber = []
160 self.__services = []
161 self.timestamp = 0.
162 self.timestamp_local = 0.
163 self.timestamp_remote = 0.
164 else:
165 rospy.loginfo("SyncThread[%s]: skip resync after the host was offline because of resync_on_reconnect_timeout=%.2f and the host was only %.2f sec offline", self.name, resync_on_reconnect_timeout, offline_duration)
166 else:
167 self._online = False
168 self._offline_ts = time.time()
169
170 - def update(self, name, uri, discoverer_name, monitoruri, timestamp):
171 '''
172 Sets a request to synchronize the local ROS master with this ROS master.
173 @note: If currently a synchronization is running this request will be ignored!
174 @param name: the name of the ROS master synchronized with.
175 @type name: C{str}
176 @param uri: the URI of the ROS master synchronized with
177 @type uri: C{str}
178 @param discoverer_name: the name of the discovery node running on ROS master synchronized with.
179 @type discoverer_name: C{str}
180 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once.
181 @type monitoruri: C{str}
182 @param timestamp: The timestamp of the current state of the ROS master info.
183 @type timestamp: C{float64}
184 '''
185
186 with self.__lock_intern:
187 self.timestamp_remote = timestamp
188 if (self.timestamp_local != timestamp):
189 rospy.logdebug("SyncThread[%s]: update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp, self.timestamp_local)
190 self.name = name
191 self.uri = uri
192 self.discoverer_name = discoverer_name
193 self.monitoruri = monitoruri
194 self._request_update()
195
196
197
199 '''
200 Sets the state of the local ROS master state. If this state is not None, the topics on demand will be synchronized.
201 @param own_state: the state of the local ROS master state
202 @type own_state: C{master_discovery_fkie/MasterInfo}
203 @param sync_on_demand: if True, sync only topic, which are also local exists (Default: False)
204 @type sync_on_demand: bool
205 '''
206 with self.__lock_intern:
207 timestamp_local = own_state.timestamp_local
208 if self.__own_state is None or (self.__own_state.timestamp_local != timestamp_local):
209 ownstate_ts = self.__own_state.timestamp_local if self.__own_state is not None else float('nan')
210 rospy.logdebug("SyncThread[%s]: local state update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp_local, ownstate_ts)
211 self.__own_state = own_state
212 if sync_on_demand:
213 self._filter.update_sync_topics_pattern(self.__own_state.topic_names)
214 self._request_update()
215
217 '''
218 Stops running thread.
219 '''
220 rospy.logdebug(" SyncThread[%s]: stop request", self.name)
221 with self.__lock_intern:
222 if self._update_timer is not None:
223 self._update_timer.cancel()
224 self._unreg_on_finish()
225 rospy.logdebug(" SyncThread[%s]: stop exit", self.name)
226
228 with self.__lock_intern:
229 r = random.random() * 2.
230
231 if self._update_timer is None or not self._update_timer.isAlive():
232 del self._update_timer
233 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,))
234 self._update_timer.start()
235 else:
236 if self._delayed_update < self.MAX_UPDATE_DELAY:
237
238 self._update_timer.cancel()
239
240 if not self._update_timer.isAlive() or not self.__on_update:
241 self._delayed_update += 1
242 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,))
243 self._update_timer.start()
244
246 self._delayed_update = 0
247 self.__on_update = True
248 try:
249
250 socket.setdefaulttimeout(20)
251 remote_monitor = xmlrpclib.ServerProxy(self.monitoruri)
252
253 if self._use_filtered_method is None:
254 try:
255 self._use_filtered_method = 'masterInfoFiltered' in remote_monitor.system.listMethods()
256 except:
257 self._use_filtered_method = False
258 remote_state = None
259
260 rospy.loginfo("SyncThread[%s] Requesting remote state from '%s'", self.name, self.monitoruri)
261 if self._use_filtered_method:
262 remote_state = remote_monitor.masterInfoFiltered(self._filter.to_list())
263 else:
264 remote_state = remote_monitor.masterInfo()
265 if not self.__unregistered:
266 handler(remote_state)
267 except:
268 rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc())
269 finally:
270 self.__on_update = False
271 socket.setdefaulttimeout(None)
272
274 rospy.loginfo("SyncThread[%s] Applying remote state...", self.name)
275 try:
276 stamp = float(remote_state[0])
277 stamp_local = float(remote_state[1])
278 remote_masteruri = remote_state[2]
279
280 publishers = remote_state[4]
281 subscribers = remote_state[5]
282 rservices = remote_state[6]
283 topicTypes = remote_state[7]
284 nodeProviders = remote_state[8]
285 serviceProviders = remote_state[9]
286
287
288 own_master = xmlrpclib.ServerProxy(self.masteruri_local)
289 own_master_multi = xmlrpclib.MultiCall(own_master)
290
291 handler = []
292
293 publisher = []
294 publisher_to_register = []
295 for (topic, nodes) in publishers:
296 for node in nodes:
297 topictype = self._get_topictype(topic, topicTypes)
298 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri)
299 if topictype and nodeuri and not self._do_ignore_ntp(node, topic, topictype):
300
301 if not ((topic, node, nodeuri) in self.__publisher):
302 publisher_to_register.append((topic, topictype, node, nodeuri))
303 publisher.append((topic, node, nodeuri))
304
305 for (topic, node, nodeuri) in set(self.__publisher) - set(publisher):
306 own_master_multi.unregisterPublisher(node, topic, nodeuri)
307 rospy.logdebug("SyncThread[%s]: UNPUB %s[%s] %s",
308 self.name, node, nodeuri, topic)
309 handler.append(('upub', topic, node, nodeuri))
310
311 for (topic, topictype, node, nodeuri) in publisher_to_register:
312 own_master_multi.registerPublisher(node, topic, topictype, nodeuri)
313 rospy.logdebug("SyncThread[%s]: PUB %s[%s] %s[%s]",
314 self.name, node, nodeuri, topic, topictype)
315 handler.append(('pub', topic, topictype, node, nodeuri))
316
317
318 subscriber = []
319 subscriber_to_register = []
320 for (topic, nodes) in subscribers:
321 for node in nodes:
322 topictype = self._get_topictype(topic, topicTypes)
323 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri)
324
325
326
327
328 if not topictype:
329 topictype = self.MSG_ANY_TYPE
330 if topictype and nodeuri and not self._do_ignore_nts(node, topic, topictype):
331
332 if not ((topic, node, nodeuri) in self.__subscriber):
333 subscriber_to_register.append((topic, topictype, node, nodeuri))
334 subscriber.append((topic, node, nodeuri))
335
336 for (topic, node, nodeuri) in set(self.__subscriber) - set(subscriber):
337 own_master_multi.unregisterSubscriber(node, topic, nodeuri)
338 rospy.logdebug("SyncThread[%s]: UNSUB %s[%s] %s",
339 self.name, node, nodeuri, topic)
340 handler.append(('usub', topic, node, nodeuri))
341
342 for (topic, topictype, node, nodeuri) in subscriber_to_register:
343 own_master_multi.registerSubscriber(node, topic, topictype, nodeuri)
344 rospy.logdebug("SyncThread[%s]: SUB %s[%s] %s[%s]",
345 self.name, node, nodeuri, topic, topictype)
346 handler.append(('sub', topic, topictype, node, nodeuri))
347
348
349 services = []
350 services_to_register = []
351 for (service, nodes) in rservices:
352 for node in nodes:
353 serviceuri = self._get_serviceuri(service, serviceProviders, remote_masteruri)
354 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri)
355 if serviceuri and nodeuri and not self._do_ignore_ns(node, service):
356
357 if not ((service, serviceuri, node, nodeuri) in self.__services):
358 services_to_register.append((service, serviceuri, node, nodeuri))
359 services.append((service, serviceuri, node, nodeuri))
360
361 for (service, serviceuri, node, nodeuri) in set(self.__services) - set(services):
362 own_master_multi.unregisterService(node, service, serviceuri)
363 rospy.logdebug("SyncThread[%s]: UNSRV %s[%s] %s[%s]",
364 self.name, node, nodeuri, service, serviceuri)
365 handler.append(('usrv', service, serviceuri, node, nodeuri))
366
367 for (service, serviceuri, node, nodeuri) in services_to_register:
368 own_master_multi.registerService(node, service, serviceuri, nodeuri)
369 rospy.logdebug("SyncThread[%s]: SRV %s[%s] %s[%s]",
370 self.name, node, nodeuri, service, serviceuri)
371 handler.append(('srv', service, serviceuri, node, nodeuri))
372
373
374 if not self.__unregistered:
375 with self.__lock_info:
376 self.__sync_info = None
377 self.__publisher = publisher
378 self.__subscriber = subscriber
379 self.__services = services
380
381 socket.setdefaulttimeout(3)
382 result = own_master_multi()
383
384
385 hack_pub = set()
386 for h, (code, statusMessage, r) in zip(handler, result):
387 try:
388 if h[0] == 'sub':
389 if code == -1:
390 rospy.logwarn("SyncThread[%s] topic subscription error: %s (%s), %s %s, node: %s", self.name, h[1], h[2], str(code), str(statusMessage), h[3])
391 else:
392 rospy.logdebug("SyncThread[%s] topic subscribed: %s, %s %s, node: %s", self.name, h[1], str(code), str(statusMessage), h[3])
393 if h[0] == 'sub' and code == 1 and len(r) > 0:
394
395 hack_pub.add(h[1])
396 elif h[0] == 'pub':
397 if code == -1:
398 rospy.logwarn("SyncThread[%s] topic advertise error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage))
399 else:
400 rospy.logdebug("SyncThread[%s] topic advertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
401 elif h[0] == 'usub':
402 rospy.logdebug("SyncThread[%s] topic unsubscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
403 elif h[0] == 'upub':
404 rospy.logdebug("SyncThread[%s] topic unadvertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
405 elif h[0] == 'srv':
406 if code == -1:
407 rospy.logwarn("SyncThread[%s] service registration error: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
408 else:
409 rospy.logdebug("SyncThread[%s] service registered: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
410 elif h[0] == 'usrv':
411 rospy.logdebug("SyncThread[%s] service unregistered: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
412 except:
413 rospy.logerr("SyncThread[%s] ERROR while analyzing the results of the registration call [%s]: %s", self.name, h[1], traceback.format_exc())
414
415
416
417
418
419
420
421
422 for m in hack_pub:
423 try:
424 rospy.loginfo("SyncThread[%s] Create and delete publisher to trigger publisherUpdate for %s", self.name, m)
425 topicPub = rospy.Publisher(m, rospy.msg.AnyMsg, queue_size=1)
426 topicPub.unregister()
427 del topicPub
428 except:
429 rospy.logerr("SyncThread[%s] ERROR while hack subscriber[%s]: %s", self.name, h[1], traceback.format_exc())
430
431 self.timestamp = stamp
432 self.timestamp_local = stamp_local
433 rospy.logdebug("SyncThread[%s]: current timestamp %.9f, local %.9f", self.name, stamp, stamp_local)
434 if self.timestamp_remote > stamp_local:
435 rospy.logdebug("SyncThread[%s]: invoke next update, remote ts: %.9f", self.name, self.timestamp_remote)
436 self._update_timer = threading.Timer(random.random() * 2., self._request_remote_state, args=(self._apply_remote_state,))
437 self._update_timer.start()
438 except:
439 rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc())
440 finally:
441 socket.setdefaulttimeout(None)
442 rospy.loginfo("SyncThread[%s] remote state applied.", self.name)
443
445 with self.__lock_info:
446 self.__unregistered = True
447 try:
448 rospy.logdebug(" SyncThread[%s] clear all registrations", self.name)
449 socket.setdefaulttimeout(5)
450 own_master = xmlrpclib.ServerProxy(self.masteruri_local)
451 own_master_multi = xmlrpclib.MultiCall(own_master)
452
453 for topic, node, uri in self.__subscriber:
454 own_master_multi.unregisterSubscriber(node, topic, uri)
455 for topic, node, uri in self.__publisher:
456 own_master_multi.unregisterPublisher(node, topic, uri)
457 for service, serviceuri, node, uri in self.__services:
458 own_master_multi.unregisterService(node, service, serviceuri)
459 rospy.logdebug(" SyncThread[%s] execute a MultiCall", self.name)
460 _ = own_master_multi()
461 rospy.logdebug(" SyncThread[%s] finished", self.name)
462 except:
463 rospy.logerr("SyncThread[%s] ERROR while ending: %s", self.name, traceback.format_exc())
464 socket.setdefaulttimeout(None)
465
467 return self._filter.is_ignored_publisher(node, topic, topictype)
468
470 return self._filter.is_ignored_subscriber(node, topic, topictype)
471
473 return self._filter.is_ignored_service(node, service)
474
476 for (topicname, topic_type) in topic_types:
477 if (topicname == topic):
478 return topic_type.replace('None', '')
479 return None
480
482 for (nodename, uri, masteruri, pid, local) in nodes:
483 if (nodename == node) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'):
484
485 if masteruri != self.masteruri_local:
486 return uri
487 return None
488
490 for (servicename, uri, masteruri, topic_type, local) in nodes:
491 if (servicename == service) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'):
492 if masteruri != self.masteruri_local:
493 return uri
494 return None
495