1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 import socket
34 import threading
35 import time
36 import uuid
37 try:
38 import xmlrpclib
39 except ImportError:
40 import xmlrpc.client as xmlrpclib
41
42 from multimaster_msgs_fkie.msg import MasterState
43 from multimaster_msgs_fkie.srv import DiscoverMasters, GetSyncInfo, GetSyncInfoResponse
44 import rospy
45
46 from master_discovery_fkie.common import masteruri_from_master, resolve_url, read_interface, create_pattern, is_empty_pattern
47 from master_discovery_fkie.master_info import MasterInfo
48 import master_discovery_fkie.interface_finder as interface_finder
49
50 from .sync_thread import SyncThread
51
52
54 '''
55 '''
56
57 UPDATE_INTERVALL = 30
58
60 '''
61 Creates a new instance. Find the topic of the master_discovery node using
62 U{master_discovery_fkie.interface_finder.get_changes_topic()
63 <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>}.
64 Also the parameter C{~ignore_hosts} will be analyzed to exclude hosts from sync.
65 '''
66 self.masters = {}
67
68 self.materuri = masteruri_from_master()
69 '''@ivar: the ROS master URI of the C{local} ROS master. '''
70 self.__lock = threading.RLock()
71
72 self._load_interface()
73
74 topic_names = interface_finder.get_changes_topic(masteruri_from_master())
75 self.sub_changes = dict()
76 '''@ivar: `dict` with topics {name: U{rospy.Subscriber<http://docs.ros.org/api/rospy/html/rospy.topics.Subscriber-class.html>}} publishes the changes of the discovered ROS masters.'''
77 for topic_name in topic_names:
78 rospy.loginfo("listen for updates on %s", topic_name)
79 self.sub_changes[topic_name] = rospy.Subscriber(topic_name, MasterState, self._rosmsg_callback_master_state)
80 self.__timestamp_local = None
81 self.__own_state = None
82 self.update_timer = None
83 self.own_state_getter = None
84 self._join_threads = dict()
85
86 rospy.Service('~get_sync_info', GetSyncInfo, self._rosservice_get_sync_info)
87 rospy.on_shutdown(self.finish)
88 self.obtain_masters()
89
91 '''
92 The method to handle the received MasterState messages. Based on this message
93 new threads to synchronize with remote ROS master will be created, updated or
94 removed.
95 @param data: the received message
96 @type data: U{master_discovery_fkie.MasterState
97 <http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}
98 '''
99 with self.__lock:
100 if not rospy.is_shutdown():
101 if data.state in [MasterState.STATE_REMOVED]:
102 self.remove_master(data.master.name)
103 elif data.state in [MasterState.STATE_NEW, MasterState.STATE_CHANGED]:
104 m = data.master
105 self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online)
106
107 - def obtain_masters(self):
108 '''
109 This method use the service 'list_masters' of the master_discoverer to get
110 the list of discovered ROS master. Based on this list the L{SyncThread} for
111 synchronization will be created.
112 @see: U{master_discovery_fkie.interface_finder.get_listmaster_service()
113 <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>}
114 '''
115 if not rospy.is_shutdown():
116 service_names = interface_finder.get_listmaster_service(masteruri_from_master(), False)
117 for service_name in service_names:
118 try:
119 with self.__lock:
120 try:
121 socket.setdefaulttimeout(5)
122 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters)
123 resp = discoverMasters()
124 masters = []
125 master_names = [m.name for m in resp.masters]
126 rospy.loginfo("ROS masters obtained from '%s': %s", service_name, master_names)
127 for m in resp.masters:
128 if self._can_sync(m.name):
129 masters.append(m.name)
130 self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online)
131 for key in set(self.masters.keys()) - set(masters):
132 self.remove_master(self.masters[key].name)
133 except rospy.ServiceException as e:
134 rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e))
135 except:
136 import traceback
137 rospy.logwarn("ERROR while initial list masters: %s", traceback.format_exc())
138 finally:
139 socket.setdefaulttimeout(None)
140 self.update_timer = threading.Timer(self.UPDATE_INTERVALL, self.obtain_masters)
141 self.update_timer.start()
142
143 - def update_master(self, mastername, masteruri, timestamp, timestamp_local, discoverer_name, monitoruri, online):
144 '''
145 Updates the timestamp of the given ROS master, or creates a new L{SyncThread} to
146 synchronize the local master with given ROS master.
147 @param mastername: the name of the remote ROS master to update or synchronize.
148 @type mastername: C{str}
149 @param masteruri: the URI of the remote ROS master.
150 @type masteruri: C{str}
151 @param timestamp: the timestamp of the remote ROS master.
152 @type timestamp: C{float64}
153 @param timestamp_local: the timestamp of the remote ROS master. (only local changes)
154 @type timestamp_local: C{float64}
155 @param discoverer_name: the name of the remote master_discoverer node
156 @type discoverer_name: C{str}
157 @param monitoruri: the URI of the RPC interface of the remote master_discoverer node.
158 @type monitoruri: C{str}
159 @param online: the current state on the master.
160 @type online: C{bool}
161 '''
162 try:
163 with self.__lock:
164 if (masteruri != self.materuri):
165 if self._can_sync(mastername):
166
167 if self.__resync_on_reconnect and mastername in self.masters:
168 self.masters[mastername].set_online(online, self.__resync_on_reconnect_timeout)
169 if online:
170 if mastername in self.masters:
171
172 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local)
173 else:
174 self.masters[mastername] = SyncThread(mastername, masteruri, discoverer_name, monitoruri, 0.0, self.__sync_topics_on_demand)
175 if self.__own_state is not None:
176 self.masters[mastername].set_own_masterstate(MasterInfo.from_list(self.__own_state))
177 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local)
178 elif self.__timestamp_local != timestamp_local and self.__sync_topics_on_demand:
179
180 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,))
181 self.own_state_getter.start()
182 except:
183 import traceback
184 rospy.logwarn("ERROR while update master[%s]: %s", str(mastername), traceback.format_exc())
185
186 - def get_own_state(self, monitoruri):
187 '''
188 Gets the master info from local master discovery and set it to all sync threads.
189 This function is running in a thread!!!
190 '''
191 try:
192 socket.setdefaulttimeout(3)
193 own_monitor = xmlrpclib.ServerProxy(monitoruri)
194 self.__own_state = own_monitor.masterInfo()
195 own_state = MasterInfo.from_list(self.__own_state)
196 socket.setdefaulttimeout(None)
197 with self.__lock:
198
199 for (_, s) in self.masters.items():
200 s.set_own_masterstate(own_state, self.__sync_topics_on_demand)
201 self.__timestamp_local = own_state.timestamp_local
202 except:
203 import traceback
204 rospy.logwarn("ERROR while getting own state from '%s': %s", monitoruri, traceback.format_exc())
205 socket.setdefaulttimeout(None)
206 time.sleep(3)
207 if self.own_state_getter is not None and not rospy.is_shutdown():
208 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,))
209 self.own_state_getter.start()
210
211 - def remove_master(self, ros_master_name):
212 '''
213 Removes the master with given name from the synchronization list.
214 @param ros_master_name: the name of the ROS master to remove.
215 @type ros_master_name: C{str}
216 '''
217 try:
218 with self.__lock:
219 if ros_master_name in self.masters:
220 m = self.masters.pop(ros_master_name)
221 ident = uuid.uuid4()
222 thread = threading.Thread(target=self._threading_stop_sync, args=(m, ident))
223 self._join_threads[ident] = thread
224 thread.start()
225 except Exception:
226 import traceback
227 rospy.logwarn("ERROR while removing master[%s]: %s", ros_master_name, traceback.format_exc())
228
229 - def _threading_stop_sync(self, sync_thread, ident):
230 if isinstance(sync_thread, SyncThread):
231 rospy.loginfo(" Stop synchronization to `%s`" % sync_thread.name)
232 sync_thread.stop()
233 with self.__lock:
234 del self._join_threads[ident]
235 rospy.loginfo(" Finished synchronization to `%s`" % sync_thread.name)
236 del sync_thread
237
238 - def finish(self, msg=''):
239 '''
240 Removes all remote masters and unregister their topics and services.
241 '''
242 rospy.loginfo("Stop synchronization...")
243 with self.__lock:
244
245 rospy.loginfo(" Stop timers...")
246 if self.update_timer is not None:
247 self.update_timer.cancel()
248
249 rospy.loginfo(" Unregister from master discovery...")
250 for (_, v) in self.sub_changes.items():
251 v.unregister()
252 self.own_state_getter = None
253
254 for key in self.masters.keys():
255 rospy.loginfo(" Remove master: %s", key)
256 self.remove_master(key)
257
258 while len(self._join_threads) > 0:
259 rospy.loginfo(" Wait for ending of %s threads ...", str(len(self._join_threads)))
260 time.sleep(1)
261 rospy.loginfo("Synchronization is now off")
262
264 '''
265 Callback for the ROS service to get the info to synchronized nodes.
266 '''
267 masters = list()
268 try:
269 with self.__lock:
270 for (_, s) in self.masters.items():
271 masters.append(s.get_sync_info())
272 except:
273 import traceback
274 traceback.print_exc()
275 finally:
276 return GetSyncInfoResponse(masters)
277
278 - def _load_interface(self):
279 interface_file = resolve_url(rospy.get_param('~interface_url', ''))
280 if interface_file:
281 rospy.loginfo("interface_url: %s", interface_file)
282 try:
283 data = read_interface(interface_file) if interface_file else {}
284
285 self._re_ignore_hosts = create_pattern('ignore_hosts', data, interface_file, [])
286
287 self._re_sync_hosts = create_pattern('sync_hosts', data, interface_file, [])
288 self.__sync_topics_on_demand = False
289 if interface_file:
290 if 'sync_topics_on_demand' in data:
291 self.__sync_topics_on_demand = data['sync_topics_on_demand']
292 elif rospy.has_param('~sync_topics_on_demand'):
293 self.__sync_topics_on_demand = rospy.get_param('~sync_topics_on_demand')
294 rospy.loginfo("sync_topics_on_demand: %s", self.__sync_topics_on_demand)
295 self.__resync_on_reconnect = rospy.get_param('~resync_on_reconnect', True)
296 rospy.loginfo("resync_on_reconnect: %s", self.__resync_on_reconnect)
297 self.__resync_on_reconnect_timeout = rospy.get_param('~resync_on_reconnect_timeout', 0)
298 rospy.loginfo("resync_on_reconnect_timeout: %s", self.__resync_on_reconnect_timeout)
299 except:
300 import traceback
301
302 rospy.logerr("Error on load interface: %s", traceback.format_exc())
303 import os
304 import signal
305 os.kill(os.getpid(), signal.SIGKILL)
306
307 - def _can_sync(self, mastername):
308 result = False
309 if is_empty_pattern(self._re_ignore_hosts):
310 if is_empty_pattern(self._re_sync_hosts):
311 result = True
312 elif self._re_sync_hosts.match(mastername) is not None:
313 result = True
314 elif self._re_ignore_hosts.match(mastername) is None:
315 result = True
316 elif not is_empty_pattern(self._re_sync_hosts):
317 if self._re_sync_hosts.match(mastername) is not None:
318 result = True
319 return result
320