$search
00001 #!/usr/bin/env python 00002 00003 import roslib; roslib.load_manifest('multimaster') 00004 00005 import threading 00006 import time 00007 import urlparse 00008 00009 import rosgraph 00010 import rosgraph.names 00011 import rosgraph.network 00012 00013 import rospy 00014 00015 from rospy.core import global_name, is_topic 00016 from rospy.impl.validators import non_empty, ParameterInvalid 00017 00018 from rospy.impl.masterslave import apivalidate 00019 00020 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler 00021 00022 import sys 00023 00024 class MasterSync(object): 00025 def __init__(self): 00026 local_uri = rosgraph.get_master_uri() 00027 foreign_uri = rospy.get_param('~foreign_master', '') 00028 if not foreign_uri: 00029 raise Exception('foreign_master URI not specified') 00030 00031 local_pubs = rospy.get_param('~local_pubs', []) 00032 foreign_pubs = rospy.get_param('~foreign_pubs', []) 00033 00034 self._local_services = rospy.get_param('~local_services', []) 00035 self._foreign_services = rospy.get_param('~foreign_services', []) 00036 00037 foreign_master = rosgraph.Master(rospy.get_name(), master_uri=foreign_uri) 00038 r = rospy.Rate(1) 00039 while not _is_master_up(foreign_master) and not rospy.is_shutdown(): 00040 rospy.logdebug('Waiting for foreign master to come up...') 00041 r.sleep() 00042 00043 self._local_manager = None 00044 self._foreign_manager = None 00045 if not rospy.is_shutdown(): 00046 self._local_manager = _RemoteManager(local_uri, self._local_publisher_update) 00047 self._foreign_manager = _RemoteManager(foreign_uri, self._foreign_publisher_update) 00048 for t in local_pubs: 00049 self._local_manager.subscribe(t) 00050 for t in foreign_pubs: 00051 self._foreign_manager.subscribe(t) 00052 00053 def _local_publisher_update(self, topic, publishers): 00054 topic_type = self._local_manager.get_topic_type(topic) 00055 00056 self._foreign_manager.publishers_updated(topic, topic_type, publishers) 00057 00058 def _foreign_publisher_update(self, topic, publishers): 00059 topic_type = self._foreign_manager.get_topic_type(topic) 00060 00061 self._local_manager.publishers_updated(topic, topic_type, publishers) 00062 00063 def spin(self): 00064 # @todo: is this excessively hitting the master? 00065 00066 r = rospy.Rate(1.0) 00067 00068 while not rospy.is_shutdown(): 00069 for s in self._local_services: 00070 srv_uri = self._local_manager.lookup_service(s) 00071 if srv_uri: 00072 self._foreign_manager.advertise_service(s, srv_uri) 00073 else: 00074 self._foreign_manager.unadvertise_service(s) 00075 00076 for s in self._foreign_services: 00077 srv_uri = self._foreign_manager.lookup_service(s) 00078 if srv_uri: 00079 self._local_manager.advertise_service(s, srv_uri) 00080 else: 00081 self._local_manager.unadvertise_service(s) 00082 00083 r.sleep() 00084 00085 if self._local_manager: 00086 self._local_manager.unsubscribe_all() 00087 if self._foreign_manager: 00088 self._foreign_manager.unsubscribe_all() 00089 00090 def _is_master_up(m): 00091 try: 00092 m.getUri() 00093 return True 00094 except Exception: 00095 return False 00096 00097 class _RemoteManager(object): 00098 def __init__(self, master_uri, new_topics_callback): 00099 self.master_uri = master_uri 00100 self.new_topics_callback = new_topics_callback 00101 00102 name = rosgraph.names.ns_join(rosgraph.names.get_ros_namespace(), rosgraph.names.anonymous_name('master_sync')) 00103 self.master = rosgraph.Master(name, master_uri=self.master_uri) 00104 00105 self._lock = threading.RLock() 00106 00107 self._type_cache = {} 00108 00109 self._subs = {} 00110 self._pubs = {} 00111 self._srvs = {} 00112 00113 self._external_node = XmlRpcNode(rpc_handler=_TopicPubListenerHandler(self._new_topics)) 00114 self._external_node.start() 00115 00116 timeout_t = time.time() + 5.0 00117 while time.time() < timeout_t and self._external_node.uri is None: 00118 time.sleep(0.01) 00119 00120 def get_topic_type(self, query_topic): 00121 query_topic = self.resolve(query_topic) 00122 00123 query_topic_type = self._type_cache.get(query_topic) 00124 if query_topic_type: 00125 return query_topic_type 00126 00127 for topic, topic_type in self.master.getTopicTypes(): 00128 self._type_cache[topic] = topic_type 00129 00130 return self._type_cache.get(query_topic, '*') 00131 00132 def subscribe(self, topic): 00133 topic = self.resolve(topic) 00134 publishers = self.master.registerSubscriber(topic, '*', self._external_node.uri) 00135 self._subs[(topic, self._external_node.uri)] = self.master 00136 self._new_topics(topic, publishers) 00137 00138 def publishers_updated(self, topic, topic_type, uris): 00139 resolved = self.resolve(topic) 00140 00141 with self._lock: 00142 # Unregister any publishers that no longer exist 00143 uris_set = set(uris) 00144 for t, uri in self._pubs.keys(): 00145 if t == resolved and uri not in uris_set: 00146 self.unadvertise(t, uri) 00147 00148 # Register new publishers 00149 for uri in uris: 00150 if (resolved, uri) not in self._pubs: 00151 # Registrations need to be anonymous so master doesn't kill us if there's a duplicate name 00152 rospy.loginfo('Registering (%s, %s) on master %s' % (resolved, uri, self.master_uri)) 00153 anon_name = rosgraph.names.anonymous_name('master_sync') 00154 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri) 00155 master.registerPublisher(resolved, topic_type, uri) 00156 00157 self._pubs[(resolved, uri)] = master 00158 00159 def unadvertise(self, topic, uri): 00160 with self._lock: 00161 if (topic, uri) in self._pubs: 00162 master = self._pubs[(topic, uri)] 00163 00164 rospy.loginfo('Unregistering (%s, %s) from master %s' % (topic, uri, master.master_uri)) 00165 master.unregisterPublisher(topic, uri) 00166 del self._pubs[(topic, uri)] 00167 00168 def lookup_service(self, service_name): 00169 service_name = self.resolve(service_name) 00170 try: 00171 return self.master.lookupService(service_name) 00172 except: 00173 return None 00174 00175 def advertise_service(self, service_name, uri): 00176 # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name 00177 anon_name = rosgraph.names.anonymous_name('master_sync') 00178 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri) 00179 00180 if service_name in self._srvs: 00181 if self._srvs[service_name][0] == uri: 00182 return 00183 self.unadvertise_service(service_name) 00184 00185 rospy.loginfo('Registering service (%s, %s) on master %s' % (service_name, uri, master.master_uri)) 00186 00187 fake_api = 'http://%s:0' % roslib.network.get_host_name() 00188 master.registerService(service_name, uri, fake_api) 00189 00190 self._srvs[service_name] = (uri, master) 00191 00192 def unadvertise_service(self, service_name): 00193 if service_name in self._srvs: 00194 uri, master = self._srvs[service_name] 00195 00196 rospy.loginfo('Unregistering service (%s, %s) from master %s' % (service_name, uri, master.master_uri)) 00197 master.unregisterService(service_name, uri) 00198 00199 del self._srvs[service_name] 00200 00201 def resolve(self, topic): 00202 return rosgraph.names.ns_join(rosgraph.names.namespace(self.master.caller_id), topic) 00203 00204 def unsubscribe_all(self): 00205 for (topic, uri), master in self._subs.iteritems(): 00206 master.unregisterSubscriber(topic, uri) 00207 for topic, uri in self._pubs.keys(): 00208 self.unadvertise(topic, uri) 00209 for service in self._srvs.keys(): 00210 self.unadvertise_service(service) 00211 00212 def _new_topics(self, topic, publishers): 00213 self.new_topics_callback(topic, [p for p in publishers if (topic, p) not in self._pubs]) 00214 00215 def is_publishers_list(paramName): 00216 return ('is_publishers_list', paramName) 00217 00218 class _TopicPubListenerHandler(XmlRpcHandler): 00219 def __init__(self, publisher_update_callback): 00220 super(_TopicPubListenerHandler, self).__init__() 00221 00222 self.uri = None 00223 self.publisher_update_callback = publisher_update_callback 00224 00225 def _ready(self, uri): 00226 self.uri = uri 00227 00228 def _custom_validate(self, validation, param_name, param_value, caller_id): 00229 if validation == 'is_publishers_list': 00230 if not type(param_value) == list: 00231 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name) 00232 for v in param_value: 00233 if not isinstance(v, basestring): 00234 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name) 00235 parsed = urlparse.urlparse(v) 00236 if not parsed[0] or not parsed[1]: #protocol and host 00237 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v)) 00238 return param_value 00239 else: 00240 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation)) 00241 00242 @apivalidate([]) 00243 def getBusStats(self, caller_id): 00244 # not supported 00245 return 1, '', [[], [], []] 00246 00247 @apivalidate([]) 00248 def getBusInfo(self, caller_id): 00249 # not supported 00250 return 1, '', [[], [], []] 00251 00252 @apivalidate('') 00253 def getMasterUri(self, caller_id): 00254 # not supported 00255 return 1, '', '' 00256 00257 @apivalidate(0, (None, )) 00258 def shutdown(self, caller_id, msg=''): 00259 return -1, 'not authorized', 0 00260 00261 @apivalidate(-1) 00262 def getPid(self, caller_id): 00263 return -1, 'not authorized', 0 00264 00265 # pub/sub 00266 00267 @apivalidate([]) 00268 def getSubscriptions(self, caller_id): 00269 return 1, 'subscriptions', [[], []] 00270 00271 @apivalidate([]) 00272 def getPublications(self, caller_id): 00273 return 1, 'publications', [[], []] 00274 00275 @apivalidate(-1, (global_name('parameter_key'), None)) 00276 def paramUpdate(self, caller_id, parameter_key, parameter_value): 00277 # not supported 00278 return -1, 'not authorized', 0 00279 00280 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers'))) 00281 def publisherUpdate(self, caller_id, topic, publishers): 00282 self.publisher_update_callback(topic, publishers) 00283 00284 @apivalidate([], (is_topic('topic'), non_empty('protocols'))) 00285 def requestTopic(self, caller_id, topic, protocols): 00286 return 0, 'no supported protocol implementations', [] 00287 00288 if __name__ == '__main__': 00289 try: 00290 rospy.init_node('master_sync') 00291 00292 sync = MasterSync() 00293 sync.spin() 00294 except KeyboardInterrupt: 00295 pass 00296 except Exception as ex: 00297 import traceback 00298 rospy.logfatal(traceback.format_exc()) 00299 rospy.logfatal(str(ex)) 00300 time.sleep(1.0) 00301 sys.exit(1)