00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 NAME="master_sync.py"
00037
00038 import os
00039 import sys
00040 import socket
00041 import threading
00042 import traceback
00043 import time
00044 import urlparse
00045
00046 import rosgraph
00047 import rosgraph.network
00048 import rosgraph.names
00049 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler
00050
00051 import rospy
00052
00053 from rospy.core import global_name, is_topic
00054 from rospy.impl.validators import non_empty, ParameterInvalid
00055 from rospy.impl.masterslave import apivalidate
00056
00057 def is_publishers_list(paramName):
00058 return ('is_publishers_list', paramName)
00059
00060
00061 class TopicPubListenerHandler(XmlRpcHandler):
00062
00063 def __init__(self, cb):
00064 super(TopicPubListenerHandler, self).__init__()
00065 self.uri = None
00066 self.cb = cb
00067
00068 def _ready(self, uri):
00069 self.uri = uri
00070
00071 def _custom_validate(self, validation, param_name, param_value, caller_id):
00072 if validation == 'is_publishers_list':
00073 if not type(param_value) == list:
00074 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00075 for v in param_value:
00076 if not isinstance(v, basestring):
00077 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00078 parsed = urlparse.urlparse(v)
00079 if not parsed[0] or not parsed[1]:
00080 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00081 return param_value
00082 else:
00083 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00084
00085 @apivalidate([])
00086 def getBusStats(self, caller_id):
00087
00088 return 1, '', [[], [], []]
00089
00090 @apivalidate([])
00091 def getBusInfo(self, caller_id):
00092
00093 return 1, '', [[], [], []]
00094
00095 @apivalidate('')
00096 def getMasterUri(self, caller_id):
00097
00098 return 1, '', ''
00099
00100 @apivalidate(0, (None, ))
00101 def shutdown(self, caller_id, msg=''):
00102 return -1, "not authorized", 0
00103
00104 @apivalidate(-1)
00105 def getPid(self, caller_id):
00106 return -1, "not authorized", 0
00107
00108
00109
00110
00111 @apivalidate([])
00112 def getSubscriptions(self, caller_id):
00113 return 1, "subscriptions", [[], []]
00114
00115 @apivalidate([])
00116 def getPublications(self, caller_id):
00117 return 1, "publications", [[], []]
00118
00119 @apivalidate(-1, (global_name('parameter_key'), None))
00120 def paramUpdate(self, caller_id, parameter_key, parameter_value):
00121
00122 return -1, 'not authorized', 0
00123
00124 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00125 def publisherUpdate(self, caller_id, topic, publishers):
00126 self.cb(topic, publishers)
00127
00128 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00129 def requestTopic(self, caller_id, topic, protocols):
00130 return 0, "no supported protocol implementations", []
00131
00132
00133 class RemoteManager(object):
00134 def __init__(self, master_uri, cb):
00135 self.master_uri = master_uri
00136
00137 ns = rosgraph.names.get_ros_namespace()
00138 anon_name = rosgraph.names.anonymous_name('master_sync')
00139
00140 self.master = rosgraph.Master(rosgraph.names.ns_join(ns, anon_name), master_uri=self.master_uri)
00141
00142 self.cb = cb
00143
00144 self.type_cache = {}
00145
00146 self.subs = {}
00147 self.pubs = {}
00148 self.srvs = {}
00149
00150 rpc_handler = TopicPubListenerHandler(self.new_topics)
00151 self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
00152 self.external_node.start()
00153
00154 timeout_t = time.time() + 5.
00155 while time.time() < timeout_t and self.external_node.uri is None:
00156 time.sleep(0.01)
00157
00158
00159 def get_topic_type(self, query_topic):
00160 query_topic = self.resolve(query_topic)
00161
00162 if query_topic in self.type_cache:
00163 return self.type_cache[query_topic]
00164 else:
00165 for topic, topic_type in self.master.getTopicTypes():
00166 self.type_cache[topic] = topic_type
00167 if query_topic in self.type_cache:
00168 return self.type_cache[query_topic]
00169 else:
00170 return "*"
00171
00172 def subscribe(self, topic):
00173 topic = self.resolve(topic)
00174 publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)
00175 self.subs[(topic, self.external_node.uri)] = self.master
00176 self.new_topics(topic, publishers)
00177
00178 def advertise(self, topic, topic_type, uri):
00179 topic = self.resolve(topic)
00180
00181
00182 if (topic, uri) in self.pubs:
00183 return
00184
00185
00186 anon_name = rosgraph.names.anonymous_name('master_sync')
00187 master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00188
00189 rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
00190
00191 master.registerPublisher(topic, topic_type, uri)
00192 self.pubs[(topic, uri)] = master
00193
00194
00195 def unadvertise(self, topic, uri):
00196 if (topic, uri) in self.pubs:
00197 m = self.pubs[(topic,uri)]
00198 rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
00199 m.unregisterPublisher(topic,uri)
00200 del self.pubs[(topic,uri)]
00201
00202
00203 def advertise_list(self, topic, topic_type, uris):
00204 topic = self.resolve(topic)
00205
00206 unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
00207 for (t,u) in self.pubs.keys():
00208 if (t,u) in unadv:
00209 self.unadvertise(t,u)
00210
00211 for u in uris:
00212 self.advertise(topic, topic_type, u)
00213
00214 def lookup_service(self, service_name):
00215 service_name = self.resolve(service_name)
00216 try:
00217 return self.master.lookupService(service_name)
00218 except rosgraph.MasterError:
00219 return None
00220
00221 def advertise_service(self, service_name, uri):
00222
00223
00224 anon_name = rosgraph.names.anonymous_name('master_sync')
00225 master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00226
00227 if (service_name) in self.srvs:
00228 if self.srvs[service_name][0] == uri:
00229 return
00230 else:
00231 self.unadvertise_service(service_name)
00232
00233 fake_api = 'http://%s:0'%rosgraph.network.get_host_name()
00234 rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
00235 master.registerService(service_name, uri, fake_api)
00236
00237 self.srvs[service_name] = (uri, master)
00238
00239 def unadvertise_service(self, service_name):
00240 if service_name in self.srvs:
00241 uri,m = self.srvs[service_name]
00242 rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
00243 m.unregisterService(service_name, uri)
00244 del self.srvs[service_name]
00245
00246
00247 def resolve(self, topic):
00248 ns = rosgraph.names.namespace(self.master.caller_id)
00249 return rosgraph.names.ns_join(ns, topic)
00250
00251 def unsubscribe_all(self):
00252 for (t,u),m in self.subs.iteritems():
00253 m.unregisterSubscriber(t,u)
00254 for t,u in self.pubs.keys():
00255 self.unadvertise(t,u)
00256 for s in self.srvs.keys():
00257 self.unadvertise_service(s)
00258
00259 def new_topics(self, topic, publishers):
00260 self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
00261
00262
00263 def check_master(m):
00264 try:
00265 m.getUri()
00266 return True
00267 except Exception:
00268 return False
00269
00270 class MasterSync(object):
00271 def __init__(self):
00272 rospy.init_node('master_sync')
00273
00274
00275 self.local_service_names = rospy.get_param('~local_services', {})
00276 self.local_pub_names = rospy.get_param('~local_pubs', [])
00277 self.foreign_service_names = rospy.get_param('~foreign_services', {})
00278 self.foreign_pub_names = rospy.get_param('~foreign_pubs', [])
00279
00280
00281 lm = rosgraph.get_master_uri()
00282 fm = rospy.get_param('~foreign_master', 'http://localhost:11312')
00283
00284 m = rosgraph.Master(rospy.get_name(), master_uri=fm)
00285 r = rospy.Rate(1)
00286 while not check_master(m) and not rospy.is_shutdown():
00287 rospy.loginfo("Waiting for foreign master to come up...")
00288 r.sleep()
00289
00290 self.local_manager = None
00291 self.foreign_manager = None
00292
00293 if not rospy.is_shutdown():
00294
00295 self.local_manager = RemoteManager(lm, self.new_local_topics)
00296 self.foreign_manager = RemoteManager(fm, self.new_foreign_topics)
00297
00298 for t in self.local_pub_names:
00299 self.local_manager.subscribe(t)
00300
00301 for t in self.foreign_pub_names:
00302 self.foreign_manager.subscribe(t)
00303
00304 def new_local_topics(self, topic, publishers):
00305 topic_type = self.local_manager.get_topic_type(topic)
00306 self.foreign_manager.advertise_list(topic, topic_type, publishers)
00307
00308
00309 def new_foreign_topics(self, topic, publishers):
00310 topic_type = self.foreign_manager.get_topic_type(topic)
00311 self.local_manager.advertise_list(topic, topic_type, publishers)
00312
00313
00314 def spin(self):
00315 r = rospy.Rate(1.0)
00316 while not rospy.is_shutdown():
00317 for s in self.local_service_names:
00318 srv_uri = self.local_manager.lookup_service(s)
00319 if srv_uri is not None:
00320 self.foreign_manager.advertise_service(s, srv_uri)
00321 else:
00322 self.foreign_manager.unadvertise_service(s)
00323 for s in self.foreign_service_names:
00324 srv_uri = self.foreign_manager.lookup_service(s)
00325 if srv_uri is not None:
00326 self.local_manager.advertise_service(s, srv_uri)
00327 else:
00328 self.local_manager.unadvertise_service(s)
00329 r.sleep()
00330
00331 if self.local_manager:
00332 self.local_manager.unsubscribe_all()
00333 if self.foreign_manager:
00334 self.foreign_manager.unsubscribe_all()
00335
00336
00337 def master_sync_main():
00338 r = MasterSync()
00339 r.spin()
00340
00341 if __name__ == '__main__':
00342 master_sync_main()