1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 import socket
34 import threading
35 import time
36 import uuid
37 import xmlrpclib
38
39 from multimaster_msgs_fkie.msg import MasterState
40 from multimaster_msgs_fkie.srv import DiscoverMasters, GetSyncInfo, GetSyncInfoResponse
41 import rospy
42
43 from master_discovery_fkie.common import masteruri_from_master, resolve_url, read_interface, create_pattern, is_empty_pattern
44 from master_discovery_fkie.master_info import MasterInfo
45 import master_discovery_fkie.interface_finder as interface_finder
46
47 from .sync_thread import SyncThread
48
49
51 '''
52 '''
53
54 UPDATE_INTERVALL = 30
55
57 '''
58 Creates a new instance. Find the topic of the master_discovery node using
59 U{master_discovery_fkie.interface_finder.get_changes_topic()
60 <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>}.
61 Also the parameter C{~ignore_hosts} will be analyzed to exclude hosts from sync.
62 '''
63 self.masters = {}
64
65 self.materuri = masteruri_from_master()
66 '''@ivar: the ROS master URI of the C{local} ROS master. '''
67 self.__lock = threading.RLock()
68
69 self._load_interface()
70
71 topic_names = interface_finder.get_changes_topic(masteruri_from_master())
72 self.sub_changes = dict()
73 '''@ivar: `dict` with topics {name: U{rospy.Subscriber<http://docs.ros.org/api/rospy/html/rospy.topics.Subscriber-class.html>}} publishes the changes of the discovered ROS masters.'''
74 for topic_name in topic_names:
75 rospy.loginfo("listen for updates on %s", topic_name)
76 self.sub_changes[topic_name] = rospy.Subscriber(topic_name, MasterState, self._rosmsg_callback_master_state)
77 self.__timestamp_local = None
78 self.__own_state = None
79 self.update_timer = None
80 self.own_state_getter = None
81 self._join_threads = dict()
82
83 rospy.Service('~get_sync_info', GetSyncInfo, self._rosservice_get_sync_info)
84 rospy.on_shutdown(self.finish)
85 self.obtain_masters()
86
88 '''
89 The method to handle the received MasterState messages. Based on this message
90 new threads to synchronize with remote ROS master will be created, updated or
91 removed.
92 @param data: the received message
93 @type data: U{master_discovery_fkie.MasterState
94 <http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}
95 '''
96 with self.__lock:
97 if not rospy.is_shutdown():
98 if data.state in [MasterState.STATE_REMOVED]:
99 self.remove_master(data.master.name)
100 elif data.state in [MasterState.STATE_NEW, MasterState.STATE_CHANGED]:
101 m = data.master
102 self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online)
103
104 - def obtain_masters(self):
105 '''
106 This method use the service 'list_masters' of the master_discoverer to get
107 the list of discovered ROS master. Based on this list the L{SyncThread} for
108 synchronization will be created.
109 @see: U{master_discovery_fkie.interface_finder.get_listmaster_service()
110 <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>}
111 '''
112 if not rospy.is_shutdown():
113 service_names = interface_finder.get_listmaster_service(masteruri_from_master(), False)
114 for service_name in service_names:
115 try:
116 with self.__lock:
117 try:
118 socket.setdefaulttimeout(5)
119 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters)
120 resp = discoverMasters()
121 masters = []
122 master_names = [m.name for m in resp.masters]
123 rospy.loginfo("ROS masters obtained from '%s': %s", service_name, master_names)
124 for m in resp.masters:
125 if self._can_sync(m.name):
126 masters.append(m.name)
127 self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online)
128 for key in set(self.masters.keys()) - set(masters):
129 self.remove_master(self.masters[key].name)
130 except rospy.ServiceException, e:
131 rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e))
132 except:
133 import traceback
134 rospy.logwarn("ERROR while initial list masters: %s", traceback.format_exc())
135 finally:
136 socket.setdefaulttimeout(None)
137 self.update_timer = threading.Timer(self.UPDATE_INTERVALL, self.obtain_masters)
138 self.update_timer.start()
139
140 - def update_master(self, mastername, masteruri, timestamp, timestamp_local, discoverer_name, monitoruri, online):
141 '''
142 Updates the timestamp of the given ROS master, or creates a new L{SyncThread} to
143 synchronize the local master with given ROS master.
144 @param mastername: the name of the remote ROS master to update or synchronize.
145 @type mastername: C{str}
146 @param masteruri: the URI of the remote ROS master.
147 @type masteruri: C{str}
148 @param timestamp: the timestamp of the remote ROS master.
149 @type timestamp: C{float64}
150 @param timestamp_local: the timestamp of the remote ROS master. (only local changes)
151 @type timestamp_local: C{float64}
152 @param discoverer_name: the name of the remote master_discoverer node
153 @type discoverer_name: C{str}
154 @param monitoruri: the URI of the RPC interface of the remote master_discoverer node.
155 @type monitoruri: C{str}
156 @param online: the current state on the master.
157 @type online: C{bool}
158 '''
159 try:
160 with self.__lock:
161 if (masteruri != self.materuri):
162 if self._can_sync(mastername):
163
164 if self.__resync_on_reconnect and mastername in self.masters:
165 self.masters[mastername].set_online(online, self.__resync_on_reconnect_timeout)
166 if online:
167 if mastername in self.masters:
168
169 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local)
170 else:
171 self.masters[mastername] = SyncThread(mastername, masteruri, discoverer_name, monitoruri, 0.0, self.__sync_topics_on_demand)
172 if self.__own_state is not None:
173 self.masters[mastername].set_own_masterstate(MasterInfo.from_list(self.__own_state))
174 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local)
175 elif self.__timestamp_local != timestamp_local and self.__sync_topics_on_demand:
176
177 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,))
178 self.own_state_getter.start()
179 except:
180 import traceback
181 rospy.logwarn("ERROR while update master[%s]: %s", str(mastername), traceback.format_exc())
182
183 - def get_own_state(self, monitoruri):
184 '''
185 Gets the master info from local master discovery and set it to all sync threads.
186 This function is running in a thread!!!
187 '''
188 try:
189 socket.setdefaulttimeout(3)
190 own_monitor = xmlrpclib.ServerProxy(monitoruri)
191 self.__own_state = own_monitor.masterInfo()
192 own_state = MasterInfo.from_list(self.__own_state)
193 socket.setdefaulttimeout(None)
194 with self.__lock:
195
196 for (_, s) in self.masters.iteritems():
197 s.set_own_masterstate(own_state, self.__sync_topics_on_demand)
198 self.__timestamp_local = own_state.timestamp_local
199 except:
200 import traceback
201 rospy.logwarn("ERROR while getting own state from '%s': %s", monitoruri, traceback.format_exc())
202 socket.setdefaulttimeout(None)
203 time.sleep(3)
204 if self.own_state_getter is not None and not rospy.is_shutdown():
205 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,))
206 self.own_state_getter.start()
207
208 - def remove_master(self, ros_master_name):
209 '''
210 Removes the master with given name from the synchronization list.
211 @param ros_master_name: the name of the ROS master to remove.
212 @type ros_master_name: C{str}
213 '''
214 try:
215 with self.__lock:
216 if ros_master_name in self.masters:
217 m = self.masters.pop(ros_master_name)
218 ident = uuid.uuid4()
219 thread = threading.Thread(target=self._threading_stop_sync, args=(m, ident))
220 self._join_threads[ident] = thread
221 thread.start()
222 except Exception:
223 import traceback
224 rospy.logwarn("ERROR while removing master[%s]: %s", ros_master_name, traceback.format_exc())
225
226 - def _threading_stop_sync(self, sync_thread, ident):
227 if isinstance(sync_thread, SyncThread):
228 rospy.loginfo(" Stop synchronization to `%s`" % sync_thread.name)
229 sync_thread.stop()
230 with self.__lock:
231 del self._join_threads[ident]
232 rospy.loginfo(" Finished synchronization to `%s`" % sync_thread.name)
233 del sync_thread
234
235 - def finish(self, msg=''):
236 '''
237 Removes all remote masters and unregister their topics and services.
238 '''
239 rospy.loginfo("Stop synchronization...")
240 with self.__lock:
241
242 rospy.loginfo(" Stop timers...")
243 if self.update_timer is not None:
244 self.update_timer.cancel()
245
246 rospy.loginfo(" Unregister from master discovery...")
247 for (_, v) in self.sub_changes.iteritems():
248 v.unregister()
249 self.own_state_getter = None
250
251 for key in self.masters.keys():
252 rospy.loginfo(" Remove master: %s", key)
253 self.remove_master(key)
254
255 while len(self._join_threads) > 0:
256 rospy.loginfo(" Wait for ending of %s threads ...", str(len(self._join_threads)))
257 time.sleep(1)
258 rospy.loginfo("Synchronization is now off")
259
261 '''
262 Callback for the ROS service to get the info to synchronized nodes.
263 '''
264 masters = list()
265 try:
266 with self.__lock:
267 for (_, s) in self.masters.iteritems():
268 masters.append(s.get_sync_info())
269 except:
270 import traceback
271 traceback.print_exc()
272 finally:
273 return GetSyncInfoResponse(masters)
274
275 - def _load_interface(self):
276 interface_file = resolve_url(rospy.get_param('~interface_url', ''))
277 if interface_file:
278 rospy.loginfo("interface_url: %s", interface_file)
279 try:
280 data = read_interface(interface_file) if interface_file else {}
281
282 self._re_ignore_hosts = create_pattern('ignore_hosts', data, interface_file, [])
283
284 self._re_sync_hosts = create_pattern('sync_hosts', data, interface_file, [])
285 self.__sync_topics_on_demand = False
286 if interface_file:
287 if 'sync_topics_on_demand' in data:
288 self.__sync_topics_on_demand = data['sync_topics_on_demand']
289 elif rospy.has_param('~sync_topics_on_demand'):
290 self.__sync_topics_on_demand = rospy.get_param('~sync_topics_on_demand')
291 rospy.loginfo("sync_topics_on_demand: %s", self.__sync_topics_on_demand)
292 self.__resync_on_reconnect = rospy.get_param('~resync_on_reconnect', True)
293 rospy.loginfo("resync_on_reconnect: %s", self.__resync_on_reconnect)
294 self.__resync_on_reconnect_timeout = rospy.get_param('~resync_on_reconnect_timeout', 0)
295 rospy.loginfo("resync_on_reconnect_timeout: %s", self.__resync_on_reconnect_timeout)
296 except:
297 import traceback
298
299 rospy.logerr("Error on load interface: %s", traceback.format_exc())
300 import os
301 import signal
302 os.kill(os.getpid(), signal.SIGKILL)
303
304 - def _can_sync(self, mastername):
305 result = False
306 if is_empty_pattern(self._re_ignore_hosts):
307 if is_empty_pattern(self._re_sync_hosts):
308 result = True
309 elif self._re_sync_hosts.match(mastername) is not None:
310 result = True
311 elif self._re_ignore_hosts.match(mastername) is None:
312 result = True
313 elif not is_empty_pattern(self._re_sync_hosts):
314 if self._re_sync_hosts.match(mastername) is not None:
315 result = True
316 return result
317