00001 import roslib.network
00002 import rosgraph.masterapi
00003 import rospy
00004
00005 import time
00006 import urlparse
00007
00008 from roslib.xmlrpc import XmlRpcHandler
00009
00010 from rospy.core import global_name, is_topic
00011 from rospy.impl.validators import non_empty, ParameterInvalid
00012
00013 from rospy.impl.masterslave import apivalidate
00014
00015 from roslib.xmlrpc import XmlRpcNode
00016 import roslib.names
00017 import threading
00018
00019 def is_publishers_list(paramName):
00020 return ('is_publishers_list', paramName)
00021
00022
00023 class TopicPubListenerHandler(XmlRpcHandler):
00024
00025 def __init__(self, cb):
00026 super(TopicPubListenerHandler, self).__init__()
00027 self.uri = None
00028 self.cb = cb
00029
00030 def _ready(self, uri):
00031 self.uri = uri
00032
00033 def _custom_validate(self, validation, param_name, param_value, caller_id):
00034 if validation == 'is_publishers_list':
00035 if not type(param_value) == list:
00036 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00037 for v in param_value:
00038 if not isinstance(v, basestring):
00039 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00040 parsed = urlparse.urlparse(v)
00041 if not parsed[0] or not parsed[1]:
00042 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00043 return param_value
00044 else:
00045 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00046
00047 @apivalidate([])
00048 def getBusStats(self, caller_id):
00049
00050 return 1, '', [[], [], []]
00051
00052 @apivalidate([])
00053 def getBusInfo(self, caller_id):
00054
00055 return 1, '', [[], [], []]
00056
00057 @apivalidate('')
00058 def getMasterUri(self, caller_id):
00059
00060 return 1, '', ''
00061
00062 @apivalidate(0, (None, ))
00063 def shutdown(self, caller_id, msg=''):
00064 return -1, "not authorized", 0
00065
00066 @apivalidate(-1)
00067 def getPid(self, caller_id):
00068 return -1, "not authorized", 0
00069
00070
00071
00072
00073 @apivalidate([])
00074 def getSubscriptions(self, caller_id):
00075 return 1, "subscriptions", [[], []]
00076
00077 @apivalidate([])
00078 def getPublications(self, caller_id):
00079 return 1, "publications", [[], []]
00080
00081 @apivalidate(-1, (global_name('parameter_key'), None))
00082 def paramUpdate(self, caller_id, parameter_key, parameter_value):
00083
00084 return -1, 'not authorized', 0
00085
00086 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00087 def publisherUpdate(self, caller_id, topic, publishers):
00088 self.cb(topic, publishers)
00089
00090 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00091 def requestTopic(self, caller_id, topic, protocols):
00092 return 0, "no supported protocol implementations", []
00093
00094
00095 class RemoteManager(object):
00096 def __init__(self, master_uri, cb):
00097 self.master_uri = master_uri
00098
00099 ns = roslib.names.get_ros_namespace()
00100 anon_name = roslib.names.anonymous_name('master_sync')
00101
00102 self.master = rosgraph.masterapi.Master(roslib.names.ns_join(ns, anon_name), master_uri=self.master_uri)
00103
00104 self.cb = cb
00105
00106 self.type_cache = {}
00107
00108 self.subs = {}
00109 self.pubs = {}
00110 self.srvs = {}
00111
00112 rpc_handler = TopicPubListenerHandler(self.new_topics)
00113 self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
00114 self.external_node.start()
00115
00116 timeout_t = time.time() + 5.
00117 while time.time() < timeout_t and self.external_node.uri is None:
00118 time.sleep(0.01)
00119
00120
00121 def get_topic_type(self, query_topic):
00122 query_topic = self.resolve(query_topic)
00123
00124 if query_topic in self.type_cache:
00125 return self.type_cache[query_topic]
00126 else:
00127 for topic, topic_type in self.master.getTopicTypes():
00128 self.type_cache[topic] = topic_type
00129 if query_topic in self.type_cache:
00130 return self.type_cache[query_topic]
00131 else:
00132 return "*"
00133
00134 def subscribe(self, topic):
00135 topic = self.resolve(topic)
00136 publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)
00137 self.subs[(topic, self.external_node.uri)] = self.master
00138 self.new_topics(topic, publishers)
00139
00140 def advertise(self, topic, topic_type, uri):
00141 topic = self.resolve(topic)
00142
00143
00144 if (topic, uri) in self.pubs:
00145 return
00146
00147
00148 anon_name = roslib.names.anonymous_name('master_sync')
00149 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00150
00151 rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
00152
00153 master.registerPublisher(topic, topic_type, uri)
00154 self.pubs[(topic, uri)] = master
00155
00156
00157 def unadvertise(self, topic, uri):
00158 if (topic, uri) in self.pubs:
00159 m = self.pubs[(topic,uri)]
00160 rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
00161 m.unregisterPublisher(topic,uri)
00162 del self.pubs[(topic,uri)]
00163
00164
00165 def advertise_list(self, topic, topic_type, uris):
00166 topic = self.resolve(topic)
00167
00168 unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
00169 for (t,u) in self.pubs.keys():
00170 if (t,u) in unadv:
00171 self.unadvertise(t,u)
00172
00173 for u in uris:
00174 self.advertise(topic, topic_type, u)
00175
00176 def lookup_service(self, service_name):
00177 service_name = self.resolve(service_name)
00178 try:
00179 return self.master.lookupService(service_name)
00180 except rosgraph.masterapi.Error:
00181 return None
00182
00183 def advertise_service(self, service_name, uri):
00184
00185
00186 anon_name = roslib.names.anonymous_name('master_sync')
00187 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00188
00189 if (service_name) in self.srvs:
00190 if self.srvs[service_name][0] == uri:
00191 return
00192 else:
00193 self.unadvertise_service(service_name)
00194
00195 fake_api = 'http://%s:0'%roslib.network.get_host_name()
00196 rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
00197 master.registerService(service_name, uri, fake_api)
00198
00199 self.srvs[service_name] = (uri, master)
00200
00201 def unadvertise_service(self, service_name):
00202 if service_name in self.srvs:
00203 uri,m = self.srvs[service_name]
00204 rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
00205 m.unregisterService(service_name, uri)
00206 del self.srvs[service_name]
00207
00208
00209 def resolve(self, topic):
00210 ns = roslib.names.namespace(self.master.caller_id)
00211 return roslib.names.ns_join(ns, topic)
00212
00213 def unsubscribe_all(self):
00214 for (t,u),m in self.subs.iteritems():
00215 m.unregisterSubscriber(t,u)
00216 for t,u in self.pubs.keys():
00217 self.unadvertise(t,u)
00218 for s in self.srvs.keys():
00219 self.unadvertise_service(s)
00220
00221 def new_topics(self, topic, publishers):
00222 self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
00223
00224
00225 def check_master(m):
00226 try:
00227 m.getUri()
00228 return True
00229 except Exception:
00230 return False
00231
00232 class MasterSync(object):
00233 def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
00234
00235 self.local_service_names = local_service_names
00236 self.local_pub_names = local_pub_names
00237 self.foreign_service_names = foreign_service_names
00238 self.foreign_pub_names = foreign_pub_names
00239
00240 self.local_manager = None
00241 self.foreign_manager = None
00242 self.stopping = False
00243 self.thread = None
00244
00245
00246 local_master = roslib.rosenv.get_master_uri()
00247
00248 m = rosgraph.masterapi.Master(rospy.get_name(), master_uri=foreign_master)
00249 r = rospy.Rate(1)
00250 rospy.loginfo("Waiting for foreign master [%s] to come up..."%(foreign_master))
00251 while not check_master(m) and not rospy.is_shutdown():
00252 r.sleep()
00253
00254 if not rospy.is_shutdown():
00255 rospy.loginfo("Foreign master is available")
00256
00257 self.local_manager = RemoteManager(local_master, self.new_local_topics)
00258 self.foreign_manager = RemoteManager(foreign_master, self.new_foreign_topics)
00259
00260 for t in self.local_pub_names:
00261 self.local_manager.subscribe(t)
00262
00263 for t in self.foreign_pub_names:
00264 self.foreign_manager.subscribe(t)
00265
00266 self.thread = threading.Thread(target=self.spin)
00267 self.thread.start()
00268
00269 else:
00270 rospy.loginfo("shutdown flag raised, aborting...")
00271
00272
00273 def new_local_topics(self, topic, publishers):
00274 topic_type = self.local_manager.get_topic_type(topic)
00275 self.foreign_manager.advertise_list(topic, topic_type, publishers)
00276
00277
00278 def new_foreign_topics(self, topic, publishers):
00279 topic_type = self.foreign_manager.get_topic_type(topic)
00280 self.local_manager.advertise_list(topic, topic_type, publishers)
00281
00282
00283 def stop(self):
00284 self.stopping = True
00285 self.thread.join()
00286
00287 if self.local_manager:
00288 self.local_manager.unsubscribe_all()
00289 if self.foreign_manager:
00290 self.foreign_manager.unsubscribe_all()
00291
00292
00293
00294 def spin(self):
00295 r = rospy.Rate(1.0)
00296 while not rospy.is_shutdown() and not self.stopping:
00297 for s in self.local_service_names:
00298 srv_uri = self.local_manager.lookup_service(s)
00299 if srv_uri is not None:
00300 self.foreign_manager.advertise_service(s, srv_uri)
00301 else:
00302 self.foreign_manager.unadvertise_service(s)
00303 for s in self.foreign_service_names:
00304 srv_uri = self.foreign_manager.lookup_service(s)
00305 if srv_uri is not None:
00306 self.local_manager.advertise_service(s, srv_uri)
00307 else:
00308 self.local_manager.unadvertise_service(s)
00309 r.sleep()