1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 import uuid
34 import threading
35 import time
36 import xmlrpclib
37 import socket
38
39 import roslib; roslib.load_manifest('master_sync_fkie')
40 import rospy
41
42 from master_discovery_fkie.common import masteruri_from_ros, resolve_url, read_interface, create_pattern, is_empty_pattern
43 from sync_thread import SyncThread
44 from multimaster_msgs_fkie.msg import MasterState
45 from multimaster_msgs_fkie.srv import DiscoverMasters, GetSyncInfo, GetSyncInfoResponse
46 import master_discovery_fkie.interface_finder as interface_finder
47 from master_discovery_fkie.master_info import MasterInfo
48
49
50
52 '''
53 '''
54
55 UPDATE_INTERVALL = 30
56
58 '''
59 Creates a new instance. Find the topic of the master_discovery node using
60 L{master_discovery_fkie.interface_finder.get_changes_topic()}. Also the
61 parameter C{~ignore_hosts} will be analyzed to exclude hosts from sync.
62 '''
63 self.masters = {}
64
65 self.materuri = self.getMasteruri()
66 '''@ivar: the ROS master URI of the C{local} ROS master. '''
67 self.__lock = threading.RLock()
68
69 self._loadInterface()
70
71 topic_names = interface_finder.get_changes_topic(self.getMasteruri())
72 self.sub_changes = dict()
73 '''@ivar: {dict} with topics C{(name: L{rospy.Subscriber})} publishes the changes of the discovered ROS masters.'''
74 for topic_name in topic_names:
75 rospy.loginfo("listen for updates on %s", topic_name)
76 self.sub_changes[topic_name] = rospy.Subscriber(topic_name, MasterState, self.handlerMasterStateMsg)
77 self.__timestamp_local = None
78 self.__own_state = None
79 self.update_timer = None
80 self.own_state_getter = None
81 self._join_threads = dict()
82
83 rospy.Service('~get_sync_info', GetSyncInfo, self.rosservice_get_sync_info)
84 rospy.on_shutdown(self.finish)
85 self.retrieveMasters()
86
88 '''
89 The method to handle the received MasterState messages. Based on this message
90 new threads to synchronize with remote ROS master will be created, updated or
91 removed.
92 @param data: the received message
93 @type data: L{master_discovery_fkie.MasterState}
94 '''
95 with self.__lock:
96 if not rospy.is_shutdown():
97 if data.state in [MasterState.STATE_REMOVED]:
98 self.removeMaster(data.master.name)
99 elif data.state in [MasterState.STATE_NEW, MasterState.STATE_CHANGED]:
100 m = data.master
101 self.updateMaster(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri)
102
103 - def getMasteruri(self):
104 '''
105 Requests the ROS master URI from the ROS master through the RPC interface and
106 returns it. The 'materuri' attribute will be set to the requested value.
107 @return: ROS master URI
108 @rtype: C{str} or C{None}
109 '''
110 if not hasattr(self, 'materuri') or self.materuri is None:
111 masteruri = masteruri_from_ros()
112 master = xmlrpclib.ServerProxy(masteruri)
113 code, message, self.materuri = master.getUri(rospy.get_name())
114 return self.materuri
115
116 - def retrieveMasters(self):
117 '''
118 This method use the service 'list_masters' of the master_discoverer to get
119 the list of discovered ROS master. Based on this list the L{SyncThread} for
120 synchronization will be created.
121 @see: L{master_discovery_fkie.interface_finder.get_listmaster_service()}
122 '''
123 if not rospy.is_shutdown():
124 rospy.loginfo("Update ROS master list...")
125 service_names = interface_finder.get_listmaster_service(self.getMasteruri(), False)
126 for service_name in service_names:
127 rospy.loginfo("service 'list_masters' found on %s", service_name)
128 try:
129 with self.__lock:
130
131 try:
132 socket.setdefaulttimeout(5)
133 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters)
134 resp = discoverMasters()
135 masters = []
136 for m in resp.masters:
137 if not self._re_ignore_hosts.match(m.name) or self._re_sync_hosts.match(m.name):
138 masters.append(m.name)
139 self.updateMaster(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri)
140 for key in set(self.masters.keys()) - set(masters):
141 self.removeMaster(self.masters[key].name)
142 except rospy.ServiceException, e:
143 rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e))
144 except:
145 import traceback
146 rospy.logwarn("ERROR while initial list masters: %s", traceback.format_exc())
147 finally:
148 socket.setdefaulttimeout(None)
149 self.update_timer = threading.Timer(self.UPDATE_INTERVALL, self.retrieveMasters)
150 self.update_timer.start()
151
152
153 - def updateMaster(self, mastername, masteruri, timestamp, timestamp_local, discoverer_name, monitoruri):
154 '''
155 Updates the timestamp of the given ROS master, or creates a new SyncThread to
156 synchronize the local master with given ROS master.
157 @param mastername: the name of the remote ROS master to update or synchronize.
158 @type mastername: C{str}
159 @param masteruri: the URI of the remote ROS master.
160 @type masteruri: C{str}
161 @param timestamp: the timestamp of the remote ROS master.
162 @type timestamp: L{float64}
163 @param timestamp_local: the timestamp of the remote ROS master. (only local changes)
164 @type timestamp_local: L{float64}
165 @param discoverer_name: the name of the remote master_discoverer node
166 @type discoverer_name: C{str}
167 @param monitoruri: the URI of the RPC interface of the remote master_discoverer node.
168 @type monitoruri: C{str}
169 '''
170 try:
171 with self.__lock:
172 if (masteruri != self.materuri):
173 if (is_empty_pattern(self._re_ignore_hosts) or not self._re_ignore_hosts.match(mastername)
174 or (not is_empty_pattern(self._re_sync_hosts) and not self._re_sync_hosts.match(mastername) is None)):
175 if (mastername in self.masters):
176
177 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local)
178 else:
179 self.masters[mastername] = SyncThread(mastername, masteruri, discoverer_name, monitoruri, 0.0, self.__sync_topics_on_demand)
180 if not self.__own_state is None:
181 self.masters[mastername].setOwnMasterState(MasterInfo.from_list(self.__own_state))
182
183
184 elif self.__timestamp_local != timestamp_local:
185
186 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,))
187 self.own_state_getter.start()
188 except:
189 import traceback
190 rospy.logwarn("ERROR while update master[%s]: %s", str(mastername), traceback.format_exc())
191
192 - def get_own_state(self, monitoruri):
193
194
195 try:
196 socket.setdefaulttimeout(3)
197 own_monitor = xmlrpclib.ServerProxy(monitoruri)
198 self.__own_state = own_monitor.masterInfo()
199 own_state = MasterInfo.from_list(self.__own_state)
200 socket.setdefaulttimeout(None)
201 with self.__lock:
202
203 for (mastername, s) in self.masters.iteritems():
204 s.setOwnMasterState(own_state, self.__sync_topics_on_demand)
205 self.__timestamp_local = own_state.timestamp_local
206 except:
207 import traceback
208 print traceback.print_exc()
209 socket.setdefaulttimeout(None)
210 time.sleep(3)
211 if not self.own_state_getter is None and not rospy.is_shutdown():
212 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,))
213 self.own_state_getter.start()
214
215 - def removeMaster(self, ros_master_name):
216 '''
217 Removes the master with given name from the synchronization list.
218 @param ros_master_name: the name of the ROS master to remove.
219 @type ros_master_name: C{str}
220 '''
221 try:
222 with self.__lock:
223 if (ros_master_name in self.masters):
224 m = self.masters.pop(ros_master_name)
225 id = uuid.uuid4()
226 thread = threading.Thread(target=self._threading_stop_sync, args=(m, id))
227 self._join_threads[id] = thread
228 thread.start()
229 except Exception:
230 import traceback
231 rospy.logwarn("ERROR while removing master[%s]: %s", ros_master_name, traceback.format_exc())
232
233 - def _threading_stop_sync(self, sync_thread, id):
234 if isinstance(sync_thread, SyncThread):
235 rospy.loginfo(" Stop synchronization to `%s`"%sync_thread.name)
236 sync_thread.stop()
237 with self.__lock:
238 del self._join_threads[id]
239 rospy.loginfo(" Finished synchronization to `%s`"%sync_thread.name)
240 del sync_thread
241
242 - def finish(self, msg=''):
243 '''
244 Removes all remote masters and unregister their topics and services.
245 '''
246 rospy.loginfo("Stop synchronization...")
247 with self.__lock:
248
249 rospy.loginfo(" Stop timers...")
250 if not self.update_timer is None:
251 self.update_timer.cancel()
252
253 rospy.loginfo(" Unregister from master discovery...")
254 for (k, v) in self.sub_changes.iteritems():
255 v.unregister()
256 self.own_state_getter = None
257
258 for key in self.masters.keys():
259 rospy.loginfo(" Remove master: %s", key)
260 self.removeMaster(key)
261
262 while len(self._join_threads) > 0:
263 rospy.loginfo(" Wait for ending of %s threads ...", str(len(self._join_threads)))
264 time.sleep(1)
265 rospy.loginfo("Synchronization is now off")
266
268 '''
269 Callback for the ROS service to get the info to synchronized nodes.
270 '''
271 masters = list()
272 try:
273 with self.__lock:
274 for (mastername, s) in self.masters.iteritems():
275 masters.append(s.getSyncInfo())
276 except:
277 import traceback
278 traceback.print_exc()
279 finally:
280 return GetSyncInfoResponse(masters)
281
282 - def _loadInterface(self):
283 interface_file = resolve_url(rospy.get_param('~interface_url', ''))
284 if interface_file:
285 rospy.loginfo("interface_url: %s", interface_file)
286 try:
287 data = read_interface(interface_file) if interface_file else {}
288
289 self._re_ignore_hosts = create_pattern('ignore_hosts', data, interface_file, [])
290
291 self._re_sync_hosts = create_pattern('sync_hosts', data, interface_file, [])
292 self.__sync_topics_on_demand = False
293 if interface_file:
294 if data.has_key('sync_topics_on_demand'):
295 self.__sync_topics_on_demand = data['sync_topics_on_demand']
296 elif rospy.has_param('~sync_topics_on_demand'):
297 self.__sync_topics_on_demand = rospy.get_param('~sync_topics_on_demand')
298 rospy.loginfo("sync_topics_on_demand: %s", self.__sync_topics_on_demand)
299 except:
300 import traceback
301
302 rospy.logerr("Error on load interface: %s", traceback.format_exc())
303 import os, signal
304 os.kill(os.getpid(), signal.SIGKILL)
305