00001
00002
00003 import roslib; roslib.load_manifest('multimaster')
00004
00005 import threading
00006 import time
00007 import urlparse
00008
00009 import rosgraph
00010 import rosgraph.names
00011 import rosgraph.network
00012
00013 import rospy
00014
00015 from rospy.core import global_name, is_topic
00016 from rospy.impl.validators import non_empty, ParameterInvalid
00017
00018 from rospy.impl.masterslave import apivalidate
00019
00020 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler
00021
00022 import sys
00023
00024 class MasterSync(object):
00025 def __init__(self):
00026 local_uri = rosgraph.get_master_uri()
00027 foreign_uri = rospy.get_param('~foreign_master', '')
00028 if not foreign_uri:
00029 raise Exception('foreign_master URI not specified')
00030
00031 local_pubs = rospy.get_param('~local_pubs', [])
00032 foreign_pubs = rospy.get_param('~foreign_pubs', [])
00033
00034 self._local_services = rospy.get_param('~local_services', [])
00035 self._foreign_services = rospy.get_param('~foreign_services', [])
00036
00037 foreign_master = rosgraph.Master(rospy.get_name(), master_uri=foreign_uri)
00038 r = rospy.Rate(1)
00039 while not _is_master_up(foreign_master) and not rospy.is_shutdown():
00040 rospy.logdebug('Waiting for foreign master to come up...')
00041 r.sleep()
00042
00043 self._local_manager = None
00044 self._foreign_manager = None
00045 if not rospy.is_shutdown():
00046 self._local_manager = _RemoteManager(local_uri, self._local_publisher_update)
00047 self._foreign_manager = _RemoteManager(foreign_uri, self._foreign_publisher_update)
00048 for t in local_pubs:
00049 self._local_manager.subscribe(t)
00050 for t in foreign_pubs:
00051 self._foreign_manager.subscribe(t)
00052
00053 def _local_publisher_update(self, topic, publishers):
00054 topic_type = self._local_manager.get_topic_type(topic)
00055
00056 self._foreign_manager.publishers_updated(topic, topic_type, publishers)
00057
00058 def _foreign_publisher_update(self, topic, publishers):
00059 topic_type = self._foreign_manager.get_topic_type(topic)
00060
00061 self._local_manager.publishers_updated(topic, topic_type, publishers)
00062
00063 def spin(self):
00064
00065
00066 r = rospy.Rate(1.0)
00067
00068 while not rospy.is_shutdown():
00069 for s in self._local_services:
00070 srv_uri = self._local_manager.lookup_service(s)
00071 if srv_uri:
00072 self._foreign_manager.advertise_service(s, srv_uri)
00073 else:
00074 self._foreign_manager.unadvertise_service(s)
00075
00076 for s in self._foreign_services:
00077 srv_uri = self._foreign_manager.lookup_service(s)
00078 if srv_uri:
00079 self._local_manager.advertise_service(s, srv_uri)
00080 else:
00081 self._local_manager.unadvertise_service(s)
00082
00083 r.sleep()
00084
00085 if self._local_manager:
00086 self._local_manager.unsubscribe_all()
00087 if self._foreign_manager:
00088 self._foreign_manager.unsubscribe_all()
00089
00090 def _is_master_up(m):
00091 try:
00092 m.getUri()
00093 return True
00094 except Exception:
00095 return False
00096
00097 class _RemoteManager(object):
00098 def __init__(self, master_uri, new_topics_callback):
00099 self.master_uri = master_uri
00100 self.new_topics_callback = new_topics_callback
00101
00102 name = rosgraph.names.ns_join(rosgraph.names.get_ros_namespace(), rosgraph.names.anonymous_name('master_sync'))
00103 self.master = rosgraph.Master(name, master_uri=self.master_uri)
00104
00105 self._lock = threading.RLock()
00106
00107 self._type_cache = {}
00108
00109 self._subs = {}
00110 self._pubs = {}
00111 self._srvs = {}
00112
00113 self._external_node = XmlRpcNode(rpc_handler=_TopicPubListenerHandler(self._new_topics))
00114 self._external_node.start()
00115
00116 timeout_t = time.time() + 5.0
00117 while time.time() < timeout_t and self._external_node.uri is None:
00118 time.sleep(0.01)
00119
00120 def get_topic_type(self, query_topic):
00121 query_topic = self.resolve(query_topic)
00122
00123 query_topic_type = self._type_cache.get(query_topic)
00124 if query_topic_type:
00125 return query_topic_type
00126
00127 for topic, topic_type in self.master.getTopicTypes():
00128 self._type_cache[topic] = topic_type
00129
00130 return self._type_cache.get(query_topic, '*')
00131
00132 def subscribe(self, topic):
00133 topic = self.resolve(topic)
00134 publishers = self.master.registerSubscriber(topic, '*', self._external_node.uri)
00135 self._subs[(topic, self._external_node.uri)] = self.master
00136 self._new_topics(topic, publishers)
00137
00138 def publishers_updated(self, topic, topic_type, uris):
00139 resolved = self.resolve(topic)
00140
00141 with self._lock:
00142
00143 uris_set = set(uris)
00144 for t, uri in self._pubs.keys():
00145 if t == resolved and uri not in uris_set:
00146 self.unadvertise(t, uri)
00147
00148
00149 for uri in uris:
00150 if (resolved, uri) not in self._pubs:
00151
00152 rospy.loginfo('Registering (%s, %s) on master %s' % (resolved, uri, self.master_uri))
00153 anon_name = rosgraph.names.anonymous_name('master_sync')
00154 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00155 master.registerPublisher(resolved, topic_type, uri)
00156
00157 self._pubs[(resolved, uri)] = master
00158
00159 def unadvertise(self, topic, uri):
00160 with self._lock:
00161 if (topic, uri) in self._pubs:
00162 master = self._pubs[(topic, uri)]
00163
00164 rospy.loginfo('Unregistering (%s, %s) from master %s' % (topic, uri, master.master_uri))
00165 master.unregisterPublisher(topic, uri)
00166 del self._pubs[(topic, uri)]
00167
00168 def lookup_service(self, service_name):
00169 service_name = self.resolve(service_name)
00170 try:
00171 return self.master.lookupService(service_name)
00172 except:
00173 return None
00174
00175 def advertise_service(self, service_name, uri):
00176
00177 anon_name = rosgraph.names.anonymous_name('master_sync')
00178 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00179
00180 if service_name in self._srvs:
00181 if self._srvs[service_name][0] == uri:
00182 return
00183 self.unadvertise_service(service_name)
00184
00185 rospy.loginfo('Registering service (%s, %s) on master %s' % (service_name, uri, master.master_uri))
00186
00187 fake_api = 'http://%s:0' % roslib.network.get_host_name()
00188 master.registerService(service_name, uri, fake_api)
00189
00190 self._srvs[service_name] = (uri, master)
00191
00192 def unadvertise_service(self, service_name):
00193 if service_name in self._srvs:
00194 uri, master = self._srvs[service_name]
00195
00196 rospy.loginfo('Unregistering service (%s, %s) from master %s' % (service_name, uri, master.master_uri))
00197 master.unregisterService(service_name, uri)
00198
00199 del self._srvs[service_name]
00200
00201 def resolve(self, topic):
00202 return rosgraph.names.ns_join(rosgraph.names.namespace(self.master.caller_id), topic)
00203
00204 def unsubscribe_all(self):
00205 for (topic, uri), master in self._subs.iteritems():
00206 master.unregisterSubscriber(topic, uri)
00207 for topic, uri in self._pubs.keys():
00208 self.unadvertise(topic, uri)
00209 for service in self._srvs.keys():
00210 self.unadvertise_service(service)
00211
00212 def _new_topics(self, topic, publishers):
00213 self.new_topics_callback(topic, [p for p in publishers if (topic, p) not in self._pubs])
00214
00215 def is_publishers_list(paramName):
00216 return ('is_publishers_list', paramName)
00217
00218 class _TopicPubListenerHandler(XmlRpcHandler):
00219 def __init__(self, publisher_update_callback):
00220 super(_TopicPubListenerHandler, self).__init__()
00221
00222 self.uri = None
00223 self.publisher_update_callback = publisher_update_callback
00224
00225 def _ready(self, uri):
00226 self.uri = uri
00227
00228 def _custom_validate(self, validation, param_name, param_value, caller_id):
00229 if validation == 'is_publishers_list':
00230 if not type(param_value) == list:
00231 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00232 for v in param_value:
00233 if not isinstance(v, basestring):
00234 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00235 parsed = urlparse.urlparse(v)
00236 if not parsed[0] or not parsed[1]:
00237 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00238 return param_value
00239 else:
00240 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00241
00242 @apivalidate([])
00243 def getBusStats(self, caller_id):
00244
00245 return 1, '', [[], [], []]
00246
00247 @apivalidate([])
00248 def getBusInfo(self, caller_id):
00249
00250 return 1, '', [[], [], []]
00251
00252 @apivalidate('')
00253 def getMasterUri(self, caller_id):
00254
00255 return 1, '', ''
00256
00257 @apivalidate(0, (None, ))
00258 def shutdown(self, caller_id, msg=''):
00259 return -1, 'not authorized', 0
00260
00261 @apivalidate(-1)
00262 def getPid(self, caller_id):
00263 return -1, 'not authorized', 0
00264
00265
00266
00267 @apivalidate([])
00268 def getSubscriptions(self, caller_id):
00269 return 1, 'subscriptions', [[], []]
00270
00271 @apivalidate([])
00272 def getPublications(self, caller_id):
00273 return 1, 'publications', [[], []]
00274
00275 @apivalidate(-1, (global_name('parameter_key'), None))
00276 def paramUpdate(self, caller_id, parameter_key, parameter_value):
00277
00278 return -1, 'not authorized', 0
00279
00280 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00281 def publisherUpdate(self, caller_id, topic, publishers):
00282 self.publisher_update_callback(topic, publishers)
00283
00284 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00285 def requestTopic(self, caller_id, topic, protocols):
00286 return 0, 'no supported protocol implementations', []
00287
00288 if __name__ == '__main__':
00289 try:
00290 rospy.init_node('master_sync')
00291
00292 sync = MasterSync()
00293 sync.spin()
00294 except KeyboardInterrupt:
00295 pass
00296 except Exception as ex:
00297 import traceback
00298 rospy.logfatal(traceback.format_exc())
00299 rospy.logfatal(str(ex))
00300 time.sleep(1.0)
00301 sys.exit(1)