1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 import time
35 import math
36 import threading
37 import xmlrpclib
38 import random
39 import socket
40
41 import roslib; roslib.load_manifest('master_sync_fkie')
42 import roslib.message
43 import rospy
44 import rosgraph.masterapi
45
46 from master_discovery_fkie.common import masteruri_from_ros
47 from master_discovery_fkie.filter_interface import FilterInterface
48 from multimaster_msgs_fkie.msg import SyncTopicInfo, SyncServiceInfo, SyncMasterInfo
49
51 '''
52 A thread to synchronize the local ROS master with a remote master. While the
53 synchronization only the topic of the remote ROS master will be registered by
54 the local ROS master. The remote ROS master will be keep unchanged.
55 '''
56
57 MAX_UPDATE_DELAY = 5
58
59 - def __init__(self, name, uri, discoverer_name, monitoruri, timestamp, sync_on_demand=False):
60 '''
61 Initialization method for the SyncThread.
62 @param name: the name of the ROS master synchronized with.
63 @type name: C{str}
64 @param uri: the URI of the ROS master synchronized with
65 @type uri: C{str}
66 @param discoverer_name: the name of the discovery node running on ROS master synchronized with.
67 @type discoverer_name: C{str}
68 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once.
69 @type monitoruri: C{str}
70 @param timestamp: The timestamp of the current state of the ROS master info.
71 @type timestamp: C{float64}
72 @param sync_on_demand: Synchronize topics on demand
73 @type sync_on_demand: bool
74 '''
75 self.name = name
76 self.uri = uri
77 self.discoverer_name = discoverer_name
78 self.monitoruri = monitoruri
79 self.timestamp = timestamp
80 self.timestamp_local = 0.
81 self.timestamp_remote = 0.
82
83 self.localMasteruri = masteruri_from_ros()
84 rospy.logdebug("SyncThread[%s]: create this sync thread", self.name)
85
86 self.__lock_info = threading.RLock()
87 self.__lock_intern = threading.RLock()
88 self._use_filtered_method = None
89
90 self.__sync_info = None
91 self.__unregistered = False
92
93 self.__publisher = []
94
95 self.__subscriber = []
96
97 self.__services = []
98
99
100 self.__own_state = None
101
102
103 self._filter = FilterInterface()
104 self._filter.load(self.name,
105 ['/rosout', rospy.get_name().replace('/', '/*')+'*', self.discoverer_name.replace('/', '/*')+'*', '/*node_manager', '/*zeroconf'], [],
106 ['/rosout', '/rosout_agg'], ['/'] if sync_on_demand else [],
107 ['/*get_loggers', '/*set_logger_level'], [],
108
109 ['bond/Status'])
110
111
112
113
114 self._update_timer = None
115 self._delayed_update = 0
116 self.__on_update = False
117
119 '''
120 Returns the synchronized publisher, subscriber and services.
121 @rtype: SyncMasterInfo
122 '''
123 with self.__lock_info:
124 if self.__sync_info is None:
125
126 result_set = set()
127 result_publisher = []
128 result_subscriber = []
129 result_services = []
130 for (t_n, n_n, n_uri) in self.__publisher:
131 result_publisher.append(SyncTopicInfo(t_n, n_n, n_uri))
132 result_set.add(n_n)
133 for (t_n, n_n, n_uri) in self.__subscriber:
134 result_subscriber.append(SyncTopicInfo(t_n, n_n, n_uri))
135 result_set.add(n_n)
136 for (s_n, s_uri, n_n, n_uri) in self.__services:
137 result_services.append(SyncServiceInfo(s_n, s_uri, n_n, n_uri))
138 result_set.add(n_n)
139 self.__sync_info = SyncMasterInfo(self.uri, list(result_set), result_publisher, result_subscriber, result_services)
140 return self.__sync_info
141
142 - def update(self, name, uri, discoverer_name, monitoruri, timestamp):
143 '''
144 Sets a request to synchronize the local ROS master with this ROS master.
145 @note: If currently a synchronization is running this request will be ignored!
146 @param name: the name of the ROS master synchronized with.
147 @type name: C{str}
148 @param uri: the URI of the ROS master synchronized with
149 @type uri: C{str}
150 @param discoverer_name: the name of the discovery node running on ROS master synchronized with.
151 @type discoverer_name: C{str}
152 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once.
153 @type monitoruri: C{str}
154 @param timestamp: The timestamp of the current state of the ROS master info.
155 @type timestamp: C{float64}
156 '''
157
158 with self.__lock_intern:
159 if (self.timestamp_local != timestamp):
160 rospy.logdebug("SyncThread[%s]: update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp, self.timestamp_local)
161 self.name = name
162 self.uri = uri
163 self.discoverer_name = discoverer_name
164 self.monitoruri = monitoruri
165 self.timestamp_remote = timestamp
166 self._request_update()
167
168
170 '''
171 Sets the state of the local ROS master state. If this state is not None, the topics on demand will be synchronized.
172 @param own_state: the state of the local ROS master state
173 @type own_state: C{master_discovery_fkie/MasterInfo}
174 @param sync_on_demand: if True, sync only topic, which are also local exists (Default: False)
175 @type sync_on_demand: bool
176 '''
177
178 with self.__lock_intern:
179 timestamp_local = own_state.timestamp_local
180 if self.__own_state is None or (self.__own_state.timestamp_local != timestamp_local):
181 rospy.logdebug("SyncThread[%s]: local state update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp_local, self.__own_state.timestamp_local if not self.__own_state is None else float('nan'))
182 self.__own_state = own_state
183 if sync_on_demand:
184 self._filter.update_sync_topics_pattern(self.__own_state.topic_names)
185 self._request_update()
186
187
189 '''
190 Stops running thread.
191 '''
192 rospy.logdebug(" SyncThread[%s]: stop request", self.name)
193 with self.__lock_intern:
194 if not self._update_timer is None:
195 self._update_timer.cancel()
196 self._unreg_on_finish()
197 rospy.logdebug(" SyncThread[%s]: stop exit", self.name)
198
200 with self.__lock_intern:
201 r = random.random() * 2.
202
203 if self._update_timer is None or not self._update_timer.isAlive():
204 del self._update_timer
205 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,))
206 self._update_timer.start()
207 else:
208 if self._delayed_update < self.MAX_UPDATE_DELAY:
209
210 self._update_timer.cancel()
211
212 if not self._update_timer.isAlive() or not self.__on_update:
213 self._delayed_update += 1
214 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,))
215 self._update_timer.start()
216
218 self._delayed_update = 0
219 self.__on_update = True
220 try:
221
222 socket.setdefaulttimeout(20)
223 remote_monitor = xmlrpclib.ServerProxy(self.monitoruri)
224
225 if self._use_filtered_method is None:
226 try:
227 self._use_filtered_method = 'masterInfoFiltered' in remote_monitor.system.listMethods()
228 except:
229 self._use_filtered_method = False
230 remote_state = None
231
232 if self._use_filtered_method:
233 remote_state = remote_monitor.masterInfoFiltered(self._filter.to_list())
234 else:
235 remote_state = remote_monitor.masterInfo()
236 if not self.__unregistered:
237 handler(remote_state)
238 except:
239 import traceback
240 rospy.logwarn("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc())
241 finally:
242 self.__on_update = False
243 socket.setdefaulttimeout(None)
244
246 try:
247 stamp = float(remote_state[0])
248 stamp_local = float(remote_state[1])
249 remote_masteruri = remote_state[2]
250
251 publishers = remote_state[4]
252 subscribers = remote_state[5]
253 rservices = remote_state[6]
254 topicTypes = remote_state[7]
255 nodeProviders = remote_state[8]
256 serviceProviders = remote_state[9]
257
258
259 own_master = xmlrpclib.ServerProxy(self.localMasteruri)
260 own_master_multi = xmlrpclib.MultiCall(own_master)
261
262 handler = []
263
264 publisher = []
265 publisher_to_register = []
266 for (topic, nodes) in publishers:
267 for node in nodes:
268 topictype = self._getTopicType(topic, topicTypes)
269 nodeuri = self._getNodeUri(node, nodeProviders, remote_masteruri)
270 if topictype and nodeuri and not self._doIgnoreNT(node, topic, topictype):
271
272 if not ((topic, node, nodeuri) in self.__publisher):
273 publisher_to_register.append((topic, topictype, node, nodeuri))
274 publisher.append((topic, node, nodeuri))
275
276 for (topic, node, nodeuri) in set(self.__publisher)-set(publisher):
277 own_master_multi.unregisterPublisher(node, topic, nodeuri)
278 handler.append(('upub', topic, node, nodeuri))
279
280 for (topic, topictype, node, nodeuri) in publisher_to_register:
281 own_master_multi.registerPublisher(node, topic, topictype, nodeuri)
282 handler.append(('pub', topic, topictype, node, nodeuri))
283
284
285 subscriber = []
286 subscriber_to_register = []
287 for (topic, nodes) in subscribers:
288 for node in nodes:
289 topictype = self._getTopicType(topic, topicTypes)
290 nodeuri = self._getNodeUri(node, nodeProviders, remote_masteruri)
291
292 if not topictype and not self.__own_state is None:
293 if topic in self.__own_state.topics:
294 topictype = self.__own_state.topics[topic].type
295 if topictype and nodeuri and not self._doIgnoreNT(node, topic, topictype):
296
297 if not ((topic, node, nodeuri) in self.__subscriber):
298 subscriber_to_register.append((topic, topictype, node, nodeuri))
299 subscriber.append((topic, node, nodeuri))
300
301 for (topic, node, nodeuri) in set(self.__subscriber)-set(subscriber):
302 own_master_multi.unregisterSubscriber(node, topic, nodeuri)
303 handler.append(('usub', topic, node, nodeuri))
304
305 for (topic, topictype, node, nodeuri) in subscriber_to_register:
306 own_master_multi.registerSubscriber(node, topic, topictype, nodeuri)
307 handler.append(('sub', topic, topictype, node, nodeuri))
308
309
310 services = []
311 services_to_register = []
312 for (service, nodes) in rservices:
313 for node in nodes:
314 serviceuri = self._getServiceUri(service, serviceProviders, remote_masteruri)
315 nodeuri = self._getNodeUri(node, nodeProviders, remote_masteruri)
316 if serviceuri and nodeuri and not self._doIgnoreNS(node, service):
317
318 if not ((service, serviceuri, node, nodeuri) in self.__services):
319 services_to_register.append((service, serviceuri, node, nodeuri))
320 services.append((service, serviceuri, node, nodeuri))
321
322 for (service, serviceuri, node, nodeuri) in set(self.__services)-set(services):
323 own_master_multi.unregisterService(node, service, serviceuri)
324 handler.append(('usrv', service, serviceuri, node, nodeuri))
325
326 for (service, serviceuri, node, nodeuri) in services_to_register:
327 own_master_multi.registerService(node, service, serviceuri, nodeuri)
328 handler.append(('srv', service, serviceuri, node, nodeuri))
329
330
331 if not self.__unregistered:
332 with self.__lock_info:
333 self.__sync_info = None
334 self.__publisher = publisher
335 self.__subscriber = subscriber
336 self.__services = services
337
338 socket.setdefaulttimeout(3)
339 result = own_master_multi()
340
341 for h,(code, statusMessage, r) in zip(handler, result):
342 try:
343 if h[0] == 'sub':
344 if code == -1:
345 rospy.logwarn("SyncThread[%s] topic subscription error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage))
346 else:
347 rospy.loginfo("SyncThread[%s] topic subscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
348 if h[0] == 'sub' and code == 1 and len(r) > 0:
349
350
351
352
353
354
355
356
357 topicPub = rospy.Publisher(h[1], roslib.message.get_message_class(h[2]))
358 topicPub.unregister()
359 del topicPub
360 elif h[0] == 'pub':
361 if code == -1:
362 rospy.logwarn("SyncThread[%s] topic advertise error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage))
363 else:
364 rospy.loginfo("SyncThread[%s] topic advertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
365 elif h[0] == 'usub':
366 rospy.loginfo("SyncThread[%s] topic unsubscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
367 elif h[0] == 'upub':
368 rospy.loginfo("SyncThread[%s] topic unadvertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
369 elif h[0] == 'srv':
370 if code == -1:
371 rospy.logwarn("SyncThread[%s] service registration error: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
372 else:
373 rospy.loginfo("SyncThread[%s] service registered: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
374 elif h[0] == 'usrv':
375 rospy.loginfo("SyncThread[%s] service unregistered: %s, %s %s", self.name, h[1], str(code), str(statusMessage))
376 except:
377 import traceback
378 rospy.logerr("SyncThread[%s] ERROR while hack subscriber[%s]: %s", self.name, h[1], traceback.format_exc())
379
380 self.timestamp = stamp
381 self.timestamp_local = stamp_local
382 rospy.logdebug("SyncThread[%s]: current timestamp %.9f, local %.9f", self.name, stamp, stamp_local)
383 if self.timestamp_remote > stamp_local:
384 rospy.logdebug("SyncThread[%s]: invoke next update, remote ts: %.9f", self.name, self.timestamp_remote)
385 self._update_timer = threading.Timer(random.random() * 2., self._request_remote_state, args=(self._apply_remote_state,))
386 self._update_timer.start()
387 except:
388 import traceback
389 rospy.logwarn("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc())
390 finally:
391 socket.setdefaulttimeout(None)
392
394 with self.__lock_info:
395 self.__unregistered = True
396 try:
397 rospy.logdebug(" SyncThread[%s] clear all registrations", self.name)
398 socket.setdefaulttimeout(5)
399 own_master = xmlrpclib.ServerProxy(self.localMasteruri)
400 own_master_multi = xmlrpclib.MultiCall(own_master)
401
402 for topic, node, uri in self.__subscriber:
403 own_master_multi.unregisterSubscriber(node, topic, uri)
404 for topic, node, uri in self.__publisher:
405 own_master_multi.unregisterPublisher(node, topic, uri)
406 for service, serviceuri, node, uri in self.__services:
407 own_master_multi.unregisterService(node, service, serviceuri)
408 rospy.logdebug(" SyncThread[%s] execute a MultiCall", self.name)
409 r = own_master_multi()
410 rospy.logdebug(" SyncThread[%s] finished", self.name)
411 except:
412 import traceback
413 rospy.logwarn("SyncThread[%s] ERROR while ending: %s", self.name, traceback.format_exc())
414 socket.setdefaulttimeout(None)
415
417 return self._filter.is_ignored_topic(node, topic, topictype)
418
420 return self._filter.is_ignored_service(node, service)
421
423 for (topicname, type) in topicTypes:
424 if (topicname == topic):
425 return type.replace('None', '')
426 return None
427
429 for (nodename, uri, masteruri, pid, local) in nodes:
430 if (nodename == node) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'):
431
432 if masteruri != self.localMasteruri:
433 return uri
434 return None
435
437 for (servicename, uri, masteruri, type, local) in nodes:
438 if (servicename == service) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'):
439 if masteruri != self.localMasteruri:
440 return uri
441 return None
442