1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 import random
35 import socket
36 import threading
37 import time
38 import traceback
39 try:
40 import xmlrpclib
41 except ImportError:
42 import xmlrpc.client as xmlrpclib
43
44 from multimaster_msgs_fkie.msg import SyncTopicInfo, SyncServiceInfo, SyncMasterInfo
45 import rospy
46
47 from master_discovery_fkie.common import masteruri_from_ros
48 from master_discovery_fkie.filter_interface import FilterInterface
49
50
52 '''
53 A thread to synchronize the local ROS master with a remote master. While the
54 synchronization only the topic of the remote ROS master will be registered by
55 the local ROS master. The remote ROS master will be keep unchanged.
56 '''
57
58 MAX_UPDATE_DELAY = 5
59
60 MSG_ANY_TYPE = '*'
61
62 - def __init__(self, name, uri, discoverer_name, monitoruri, timestamp, sync_on_demand=False):
63 '''
64 Initialization method for the SyncThread.
65 @param name: the name of the ROS master synchronized with.
66 @type name: C{str}
67 @param uri: the URI of the ROS master synchronized with
68 @type uri: C{str}
69 @param discoverer_name: the name of the discovery node running on ROS master synchronized with.
70 @type discoverer_name: C{str}
71 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once.
72 @type monitoruri: C{str}
73 @param timestamp: The timestamp of the current state of the ROS master info.
74 @type timestamp: C{float64}
75 @param sync_on_demand: Synchronize topics on demand
76 @type sync_on_demand: bool
77 '''
78 self.name = name
79 self.uri = uri
80 self.discoverer_name = discoverer_name
81 self.monitoruri = monitoruri
82 self.timestamp = timestamp
83 self.timestamp_local = 0.
84 self.timestamp_remote = 0.
85 self._online = True
86 self._offline_ts = 0
87
88 self.masteruri_local = masteruri_from_ros()
89 rospy.logdebug("SyncThread[%s]: create this sync thread", self.name)
90
91 self.__lock_info = threading.RLock()
92 self.__lock_intern = threading.RLock()
93 self._use_filtered_method = None
94
95 self.__sync_info = None
96 self.__unregistered = False
97
98 self.__publisher = []
99
100 self.__subscriber = []
101
102 self.__services = []
103
104
105 self.__own_state = None
106
107
108 self._filter = FilterInterface()
109 self._filter.load(self.name,
110 ['/rosout', rospy.get_name(), self.discoverer_name, '/node_manager', '/zeroconf'], [],
111 ['/rosout', '/rosout_agg'], ['/'] if sync_on_demand else [],
112 ['/*get_loggers', '/*set_logger_level'], [],
113
114 ['bond/Status'],
115 [], [],
116 [])
117
118
119
120
121 self._update_timer = None
122 self._delayed_update = 0
123 self.__on_update = False
124
126 '''
127 Returns the synchronized publisher, subscriber and services.
128 @rtype: SyncMasterInfo
129 '''
130 with self.__lock_info:
131 if self.__sync_info is None:
132
133 result_set = set()
134 result_publisher = []
135 result_subscriber = []
136 result_services = []
137 for (t_n, n_n, n_uri) in self.__publisher:
138 result_publisher.append(SyncTopicInfo(t_n, n_n, n_uri))
139 result_set.add(n_n)
140 for (t_n, n_n, n_uri) in self.__subscriber:
141 result_subscriber.append(SyncTopicInfo(t_n, n_n, n_uri))
142 result_set.add(n_n)
143 for (s_n, s_uri, n_n, n_uri) in self.__services:
144 result_services.append(SyncServiceInfo(s_n, s_uri, n_n, n_uri))
145 result_set.add(n_n)
146 self.__sync_info = SyncMasterInfo(self.uri, list(result_set), result_publisher, result_subscriber, result_services)
147 return self.__sync_info
148
149 - def set_online(self, value, resync_on_reconnect_timeout=0.):
150 if value:
151 if not self._online:
152 with self.__lock_intern:
153 self._online = True
154 offline_duration = time.time() - self._offline_ts
155 if offline_duration >= resync_on_reconnect_timeout:
156 rospy.loginfo("SyncThread[%s]: perform resync after the host was offline (unregister and register again to avoid connection losses to python topic. These does not suppot reconnection!)", self.name)
157 if self._update_timer is not None:
158 self._update_timer.cancel()
159 self._unreg_on_finish()
160 self.__unregistered = False
161 self.__publisher = []
162 self.__subscriber = []
163 self.__services = []
164 self.timestamp = 0.
165 self.timestamp_local = 0.
166 self.timestamp_remote = 0.
167 else:
168 rospy.loginfo("SyncThread[%s]: skip resync after the host was offline because of resync_on_reconnect_timeout=%.2f and the host was only %.2f sec offline", self.name, resync_on_reconnect_timeout, offline_duration)
169 else:
170 self._online = False
171 self._offline_ts = time.time()
172
173 - def update(self, name, uri, discoverer_name, monitoruri, timestamp):
174 '''
175 Sets a request to synchronize the local ROS master with this ROS master.
176 @note: If currently a synchronization is running this request will be ignored!
177 @param name: the name of the ROS master synchronized with.
178 @type name: C{str}
179 @param uri: the URI of the ROS master synchronized with
180 @type uri: C{str}
181 @param discoverer_name: the name of the discovery node running on ROS master synchronized with.
182 @type discoverer_name: C{str}
183 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once.
184 @type monitoruri: C{str}
185 @param timestamp: The timestamp of the current state of the ROS master info.
186 @type timestamp: C{float64}
187 '''
188
189 with self.__lock_intern:
190 self.timestamp_remote = timestamp
191 if (self.timestamp_local != timestamp):
192 rospy.logdebug("SyncThread[%s]: update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp, self.timestamp_local)
193 self.name = name
194 self.uri = uri
195 self.discoverer_name = discoverer_name
196 self.monitoruri = monitoruri
197 self._request_update()
198
199
200
202 '''
203 Sets the state of the local ROS master state. If this state is not None, the topics on demand will be synchronized.
204 @param own_state: the state of the local ROS master state
205 @type own_state: C{master_discovery_fkie/MasterInfo}
206 @param sync_on_demand: if True, sync only topic, which are also local exists (Default: False)
207 @type sync_on_demand: bool
208 '''
209 with self.__lock_intern:
210 timestamp_local = own_state.timestamp_local
211 if self.__own_state is None or (self.__own_state.timestamp_local != timestamp_local):
212 ownstate_ts = self.__own_state.timestamp_local if self.__own_state is not None else float('nan')
213 rospy.logdebug("SyncThread[%s]: local state update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp_local, ownstate_ts)
214 self.__own_state = own_state
215 if sync_on_demand:
216 self._filter.update_sync_topics_pattern(self.__own_state.topic_names)
217 self._request_update()
218
220 '''
221 Stops running thread.
222 '''
223 rospy.logdebug(" SyncThread[%s]: stop request", self.name)
224 with self.__lock_intern:
225 if self._update_timer is not None:
226 self._update_timer.cancel()
227 self._unreg_on_finish()
228 rospy.logdebug(" SyncThread[%s]: stop exit", self.name)
229
231 with self.__lock_intern:
232 r = random.random() * 2.
233
234 if self._update_timer is None or not self._update_timer.isAlive():
235 del self._update_timer
236 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,))
237 self._update_timer.start()
238 else:
239 if self._delayed_update < self.MAX_UPDATE_DELAY:
240
241 self._update_timer.cancel()
242
243 if not self._update_timer.isAlive() or not self.__on_update:
244 self._delayed_update += 1
245 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,))
246 self._update_timer.start()
247
249 self._delayed_update = 0
250 self.__on_update = True
251 try:
252
253 socket.setdefaulttimeout(20)
254 remote_monitor = xmlrpclib.ServerProxy(self.monitoruri)
255
256 if self._use_filtered_method is None:
257 try:
258 self._use_filtered_method = 'masterInfoFiltered' in remote_monitor.system.listMethods()
259 except:
260 self._use_filtered_method = False
261 remote_state = None
262
263 rospy.loginfo("SyncThread[%s] Requesting remote state from '%s'", self.name, self.monitoruri)
264 if self._use_filtered_method:
265 remote_state = remote_monitor.masterInfoFiltered(self._filter.to_list())
266 else:
267 remote_state = remote_monitor.masterInfo()
268 if not self.__unregistered:
269 handler(remote_state)
270 except:
271 rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc())
272 finally:
273 self.__on_update = False
274 socket.setdefaulttimeout(None)
275
277 rospy.loginfo("SyncThread[%s] Applying remote state...", self.name)
278 try:
279 stamp = float(remote_state[0])
280 stamp_local = float(remote_state[1])
281 remote_masteruri = remote_state[2]
282
283 publishers = remote_state[4]
284 subscribers = remote_state[5]
285 rservices = remote_state[6]
286 topicTypes = remote_state[7]
287 nodeProviders = remote_state[8]
288 serviceProviders = remote_state[9]
289
290
291 own_master = xmlrpclib.ServerProxy(self.masteruri_local)
292 own_master_multi = xmlrpclib.MultiCall(own_master)
293
294 handler = []
295
296 publisher = []
297 publisher_to_register = []
298 for (topic, nodes) in publishers:
299 for node in nodes:
300 topictype = self._get_topictype(topic, topicTypes)
301 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri)
302 if topictype and nodeuri and not self._do_ignore_ntp(node, topic, topictype):
303
304 if not ((topic, node, nodeuri) in self.__publisher):
305 publisher_to_register.append((topic, topictype, node, nodeuri))
306 publisher.append((topic, node, nodeuri))
307
308 for (topic, node, nodeuri) in set(self.__publisher) - set(publisher):
309 own_master_multi.unregisterPublisher(node, topic, nodeuri)
310 rospy.logdebug("SyncThread[%s]: UNPUB %s[%s] %s",
311 self.name, node, nodeuri, topic)
312 handler.append(('upub', topic, node, nodeuri))
313
314 for (topic, topictype, node, nodeuri) in publisher_to_register:
315 own_master_multi.registerPublisher(node, topic, topictype, nodeuri)
316 rospy.logdebug("SyncThread[%s]: PUB %s[%s] %s[%s]",
317 self.name, node, nodeuri, topic, topictype)
318 handler.append(('pub', topic, topictype, node, nodeuri))
319
320
321 subscriber = []
322 subscriber_to_register = []
323 for (topic, nodes) in subscribers:
324 for node in nodes:
325 topictype = self._get_topictype(topic, topicTypes)
326 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri)
327
328
329
330
331 if not topictype:
332 topictype = self.MSG_ANY_TYPE
333 if topictype and nodeuri and not self._do_ignore_nts(node, topic, topictype):
334
335 if not ((topic, node, nodeuri) in self.__subscriber):
336 subscriber_to_register.append((topic, topictype, node, nodeuri))
337 subscriber.append((topic, node, nodeuri))
338
339 for (topic, node, nodeuri) in set(self.__subscriber) - set(subscriber):
340 own_master_multi.unregisterSubscriber(node, topic, nodeuri)
341 rospy.logdebug("SyncThread[%s]: UNSUB %s[%s] %s",
342 self.name, node, nodeuri, topic)
343 handler.append(('usub', topic, node, nodeuri))
344
345 for (topic, topictype, node, nodeuri) in subscriber_to_register:
346 own_master_multi.registerSubscriber(node, topic, topictype, nodeuri)
347 rospy.logdebug("SyncThread[%s]: SUB %s[%s] %s[%s]",
348 self.name, node, nodeuri, topic, topictype)
349 handler.append(('sub', topic, topictype, node, nodeuri))
350
351
352 services = []
353 services_to_register = []
354 for (service, nodes) in rservices:
355 for node in nodes:
356 serviceuri = self._get_serviceuri(service, serviceProviders, remote_masteruri)
357 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri)
358 if serviceuri and nodeuri and not self._do_ignore_ns(node, service):
359
360 if not ((service, serviceuri, node, nodeuri) in self.__services):
361 services_to_register.append((service, serviceuri, node, nodeuri))
362 services.append((service, serviceuri, node, nodeuri))
363
364 for (service, serviceuri, node, nodeuri) in set(self.__services) - set(services):
365 own_master_multi.unregisterService(node, service, serviceuri)
366 rospy.logdebug("SyncThread[%s]: UNSRV %s[%s] %s[%s]",
367 self.name, node, nodeuri, service, serviceuri)
368 handler.append(('usrv', service, serviceuri, node, nodeuri))
369
370 for (service, serviceuri, node, nodeuri) in services_to_register:
371 own_master_multi.registerService(node, service, serviceuri, nodeuri)
372 rospy.logdebug("SyncThread[%s]: SRV %s[%s] %s[%s]",
373 self.name, node, nodeuri, service, serviceuri)
374 handler.append(('srv', service, serviceuri, node, nodeuri))
375
376
377 if not self.__unregistered:
378 with self.__lock_info:
379 self.__sync_info = None
380 self.__publisher = publisher
381 self.__subscriber = subscriber
382 self.__services = services
383
384 socket.setdefaulttimeout(3)
385 result = own_master_multi()
386
387
388 hack_pub = set()
389 for h, (code, statusMessage, r) in zip(handler, result):
390 try:
391 if h[0] == 'sub':
392 if code == -1:
393 rospy.logwarn("SyncThread[%s] topic subscription error: %s (%s), %s %s, node: %s", self.name, h[1], h[2], str(code), str(statusMessage), h[3])
394 else:
395 rospy.logdebug("SyncThread[%s] topic subscribed: %s, %s %s, node: %s", self.name, h[1], str(code), str(statusMessage), h[3])
396 if h[0] == 'sub' and code == 1 and len(r) > 0:
397
398 hack_pub.add(h[1])
399 elif h[0] == 'pub':
400 if code == -1:
401 rospy.logwarn("SyncThread[%s] topic advertise error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage))
402 else:
403 rospy.logdebug("SyncThread[%s] topic advertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
404 elif h[0] == 'usub':
405 rospy.logdebug("SyncThread[%s] topic unsubscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
406 elif h[0] == 'upub':
407 rospy.logdebug("SyncThread[%s] topic unadvertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
408 elif h[0] == 'srv':
409 if code == -1:
410 rospy.logwarn("SyncThread[%s] service registration error: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
411 else:
412 rospy.logdebug("SyncThread[%s] service registered: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
413 elif h[0] == 'usrv':
414 rospy.logdebug("SyncThread[%s] service unregistered: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
415 except:
416 rospy.logerr("SyncThread[%s] ERROR while analyzing the results of the registration call [%s]: %s", self.name, h[1], traceback.format_exc())
417
418
419
420
421
422
423
424
425 for m in hack_pub:
426 try:
427 rospy.loginfo("SyncThread[%s] Create and delete publisher to trigger publisherUpdate for %s", self.name, m)
428 topicPub = rospy.Publisher(m, rospy.msg.AnyMsg, queue_size=1)
429 topicPub.unregister()
430 del topicPub
431 except:
432 rospy.logerr("SyncThread[%s] ERROR while hack subscriber[%s]: %s", self.name, h[1], traceback.format_exc())
433
434 self.timestamp = stamp
435 self.timestamp_local = stamp_local
436 rospy.logdebug("SyncThread[%s]: current timestamp %.9f, local %.9f", self.name, stamp, stamp_local)
437 if self.timestamp_remote > stamp_local:
438 rospy.logdebug("SyncThread[%s]: invoke next update, remote ts: %.9f", self.name, self.timestamp_remote)
439 self._update_timer = threading.Timer(random.random() * 2., self._request_remote_state, args=(self._apply_remote_state,))
440 self._update_timer.start()
441 except:
442 rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc())
443 finally:
444 socket.setdefaulttimeout(None)
445 rospy.loginfo("SyncThread[%s] remote state applied.", self.name)
446
448 with self.__lock_info:
449 self.__unregistered = True
450 try:
451 rospy.logdebug(" SyncThread[%s] clear all registrations", self.name)
452 socket.setdefaulttimeout(5)
453 own_master = xmlrpclib.ServerProxy(self.masteruri_local)
454 own_master_multi = xmlrpclib.MultiCall(own_master)
455
456 for topic, node, uri in self.__subscriber:
457 own_master_multi.unregisterSubscriber(node, topic, uri)
458 for topic, node, uri in self.__publisher:
459 own_master_multi.unregisterPublisher(node, topic, uri)
460 for service, serviceuri, node, uri in self.__services:
461 own_master_multi.unregisterService(node, service, serviceuri)
462 rospy.logdebug(" SyncThread[%s] execute a MultiCall", self.name)
463 _ = own_master_multi()
464 rospy.logdebug(" SyncThread[%s] finished", self.name)
465 except:
466 rospy.logerr("SyncThread[%s] ERROR while ending: %s", self.name, traceback.format_exc())
467 socket.setdefaulttimeout(None)
468
470 return self._filter.is_ignored_publisher(node, topic, topictype)
471
473 return self._filter.is_ignored_subscriber(node, topic, topictype)
474
476 return self._filter.is_ignored_service(node, service)
477
479 for (topicname, topic_type) in topic_types:
480 if (topicname == topic):
481 return topic_type.replace('None', '')
482 return None
483
485 for (nodename, uri, masteruri, pid, local) in nodes:
486 if (nodename == node) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'):
487
488 if masteruri != self.masteruri_local:
489 return uri
490 return None
491
493 for (servicename, uri, masteruri, topic_type, local) in nodes:
494 if (servicename == service) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'):
495 if masteruri != self.masteruri_local:
496 return uri
497 return None
498