00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 NAME="master_sync.py"
00037
00038 import roslib; roslib.load_manifest('rosproxy')
00039
00040 import os
00041 import sys
00042
00043 import roslib.network
00044 import rosgraph.masterapi
00045 import rospy
00046
00047
00048
00049
00050 import os
00051 import sys
00052 import socket
00053 import threading
00054 import traceback
00055 import time
00056 import urlparse
00057
00058 from roslib.xmlrpc import XmlRpcHandler
00059
00060 import rospy.names
00061
00062 import rospy.impl.tcpros
00063
00064 from rospy.core import global_name, is_topic
00065 from rospy.impl.registration import get_topic_manager
00066 from rospy.impl.validators import non_empty, ParameterInvalid
00067
00068 from rospy.impl.masterslave import apivalidate
00069
00070 from roslib.xmlrpc import XmlRpcNode
00071 import roslib.names
00072
00073
00074 def is_publishers_list(paramName):
00075 return ('is_publishers_list', paramName)
00076
00077
00078 class TopicPubListenerHandler(XmlRpcHandler):
00079
00080 def __init__(self, cb):
00081 super(TopicPubListenerHandler, self).__init__()
00082 self.uri = None
00083 self.cb = cb
00084
00085 def _ready(self, uri):
00086 self.uri = uri
00087
00088 def _custom_validate(self, validation, param_name, param_value, caller_id):
00089 if validation == 'is_publishers_list':
00090 if not type(param_value) == list:
00091 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00092 for v in param_value:
00093 if not isinstance(v, basestring):
00094 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00095 parsed = urlparse.urlparse(v)
00096 if not parsed[0] or not parsed[1]:
00097 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00098 return param_value
00099 else:
00100 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00101
00102 @apivalidate([])
00103 def getBusStats(self, caller_id):
00104
00105 return 1, '', [[], [], []]
00106
00107 @apivalidate([])
00108 def getBusInfo(self, caller_id):
00109
00110 return 1, '', [[], [], []]
00111
00112 @apivalidate('')
00113 def getMasterUri(self, caller_id):
00114
00115 return 1, '', ''
00116
00117 @apivalidate(0, (None, ))
00118 def shutdown(self, caller_id, msg=''):
00119 return -1, "not authorized", 0
00120
00121 @apivalidate(-1)
00122 def getPid(self, caller_id):
00123 return -1, "not authorized", 0
00124
00125
00126
00127
00128 @apivalidate([])
00129 def getSubscriptions(self, caller_id):
00130 return 1, "subscriptions", [[], []]
00131
00132 @apivalidate([])
00133 def getPublications(self, caller_id):
00134 return 1, "publications", [[], []]
00135
00136 @apivalidate(-1, (global_name('parameter_key'), None))
00137 def paramUpdate(self, caller_id, parameter_key, parameter_value):
00138
00139 return -1, 'not authorized', 0
00140
00141 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00142 def publisherUpdate(self, caller_id, topic, publishers):
00143 self.cb(topic, publishers)
00144
00145 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00146 def requestTopic(self, caller_id, topic, protocols):
00147 return 0, "no supported protocol implementations", []
00148
00149
00150 class RemoteManager(object):
00151 def __init__(self, master_uri, cb):
00152 self.master_uri = master_uri
00153
00154 ns = roslib.names.get_ros_namespace()
00155 anon_name = roslib.names.anonymous_name('master_sync')
00156
00157 self.master = rosgraph.masterapi.Master(roslib.names.ns_join(ns, anon_name), master_uri=self.master_uri)
00158
00159 self.cb = cb
00160
00161 self.type_cache = {}
00162
00163 self.subs = {}
00164 self.pubs = {}
00165 self.srvs = {}
00166
00167 rpc_handler = TopicPubListenerHandler(self.new_topics)
00168 self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
00169 self.external_node.start()
00170
00171 timeout_t = time.time() + 5.
00172 while time.time() < timeout_t and self.external_node.uri is None:
00173 time.sleep(0.01)
00174
00175
00176 def get_topic_type(self, query_topic):
00177 query_topic = self.resolve(query_topic)
00178
00179 if query_topic in self.type_cache:
00180 return self.type_cache[query_topic]
00181 else:
00182 for topic, topic_type in self.master.getTopicTypes():
00183 self.type_cache[topic] = topic_type
00184 if query_topic in self.type_cache:
00185 return self.type_cache[query_topic]
00186 else:
00187 return "*"
00188
00189 def subscribe(self, topic):
00190 topic = self.resolve(topic)
00191 publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)
00192 self.subs[(topic, self.external_node.uri)] = self.master
00193 self.new_topics(topic, publishers)
00194
00195 def advertise(self, topic, topic_type, uri):
00196 topic = self.resolve(topic)
00197
00198
00199 if (topic, uri) in self.pubs:
00200 return
00201
00202
00203 anon_name = roslib.names.anonymous_name('master_sync')
00204 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00205
00206 rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
00207
00208 master.registerPublisher(topic, topic_type, uri)
00209 self.pubs[(topic, uri)] = master
00210
00211
00212 def unadvertise(self, topic, uri):
00213 if (topic, uri) in self.pubs:
00214 m = self.pubs[(topic,uri)]
00215 rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
00216 m.unregisterPublisher(topic,uri)
00217 del self.pubs[(topic,uri)]
00218
00219
00220 def advertise_list(self, topic, topic_type, uris):
00221 topic = self.resolve(topic)
00222
00223 unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
00224 for (t,u) in self.pubs.keys():
00225 if (t,u) in unadv:
00226 self.unadvertise(t,u)
00227
00228 for u in uris:
00229 self.advertise(topic, topic_type, u)
00230
00231 def lookup_service(self, service_name):
00232 service_name = self.resolve(service_name)
00233 try:
00234 return self.master.lookupService(service_name)
00235 except rosgraph.masterapi.Error:
00236 return None
00237
00238 def advertise_service(self, service_name, uri):
00239
00240
00241 anon_name = roslib.names.anonymous_name('master_sync')
00242 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00243
00244 if (service_name) in self.srvs:
00245 if self.srvs[service_name][0] == uri:
00246 return
00247 else:
00248 self.unadvertise_service(service_name)
00249
00250 fake_api = 'http://%s:0'%roslib.network.get_host_name()
00251 rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
00252 master.registerService(service_name, uri, fake_api)
00253
00254 self.srvs[service_name] = (uri, master)
00255
00256 def unadvertise_service(self, service_name):
00257 if service_name in self.srvs:
00258 uri,m = self.srvs[service_name]
00259 rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
00260 m.unregisterService(service_name, uri)
00261 del self.srvs[service_name]
00262
00263
00264 def resolve(self, topic):
00265 ns = roslib.names.namespace(self.master.caller_id)
00266 return roslib.names.ns_join(ns, topic)
00267
00268 def unsubscribe_all(self):
00269 for (t,u),m in self.subs.iteritems():
00270 m.unregisterSubscriber(t,u)
00271 for t,u in self.pubs.keys():
00272 self.unadvertise(t,u)
00273 for s in self.srvs.keys():
00274 self.unadvertise_service(s)
00275
00276 def new_topics(self, topic, publishers):
00277 self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
00278
00279
00280 def check_master(m):
00281 try:
00282 m.getUri()
00283 return True
00284 except Exception:
00285 return False
00286
00287 class MasterSync(object):
00288 def __init__(self):
00289 rospy.init_node('master_sync')
00290
00291
00292 self.local_service_names = rospy.get_param('~local_services', {})
00293 self.local_pub_names = rospy.get_param('~local_pubs', [])
00294 self.foreign_service_names = rospy.get_param('~foreign_services', {})
00295 self.foreign_pub_names = rospy.get_param('~foreign_pubs', [])
00296
00297
00298 lm = roslib.rosenv.get_master_uri()
00299 fm = rospy.get_param('~foreign_master', 'http://localhost:11312')
00300
00301 m = rosgraph.masterapi.Master(rospy.get_name(), master_uri=fm)
00302 r = rospy.Rate(1)
00303 while not check_master(m) and not rospy.is_shutdown():
00304 rospy.loginfo("Waiting for foreign master to come up...")
00305 r.sleep()
00306
00307 self.local_manager = None
00308 self.foreign_manager = None
00309
00310 if not rospy.is_shutdown():
00311
00312 self.local_manager = RemoteManager(lm, self.new_local_topics)
00313 self.foreign_manager = RemoteManager(fm, self.new_foreign_topics)
00314
00315 for t in self.local_pub_names:
00316 self.local_manager.subscribe(t)
00317
00318 for t in self.foreign_pub_names:
00319 self.foreign_manager.subscribe(t)
00320
00321 def new_local_topics(self, topic, publishers):
00322 topic_type = self.local_manager.get_topic_type(topic)
00323 self.foreign_manager.advertise_list(topic, topic_type, publishers)
00324
00325
00326 def new_foreign_topics(self, topic, publishers):
00327 topic_type = self.foreign_manager.get_topic_type(topic)
00328 self.local_manager.advertise_list(topic, topic_type, publishers)
00329
00330
00331 def spin(self):
00332 r = rospy.Rate(1.0)
00333 while not rospy.is_shutdown():
00334 for s in self.local_service_names:
00335 srv_uri = self.local_manager.lookup_service(s)
00336 if srv_uri is not None:
00337 self.foreign_manager.advertise_service(s, srv_uri)
00338 else:
00339 self.foreign_manager.unadvertise_service(s)
00340 for s in self.foreign_service_names:
00341 srv_uri = self.foreign_manager.lookup_service(s)
00342 if srv_uri is not None:
00343 self.local_manager.advertise_service(s, srv_uri)
00344 else:
00345 self.local_manager.unadvertise_service(s)
00346 r.sleep()
00347
00348 if self.local_manager:
00349 self.local_manager.unsubscribe_all()
00350 if self.foreign_manager:
00351 self.foreign_manager.unsubscribe_all()
00352
00353
00354 def master_sync_main():
00355 r = MasterSync()
00356 r.spin()
00357
00358 if __name__ == '__main__':
00359 master_sync_main()