00001 import roslib.network
00002 import rosgraph.masterapi
00003 import rospy
00004
00005 import time
00006 import urlparse
00007
00008
00009 from rospy.core import global_name, is_topic
00010 from rospy.impl.validators import non_empty, ParameterInvalid
00011
00012 from rospy.impl.masterslave import apivalidate
00013
00014 import rosgraph
00015 from rosgraph.xmlrpc import XmlRpcHandler
00016 from rosgraph.xmlrpc import XmlRpcNode
00017 import roslib.names
00018 import threading
00019
00020 def is_publishers_list(paramName):
00021 return ('is_publishers_list', paramName)
00022
00023
00024 class TopicPubListenerHandler(XmlRpcHandler):
00025
00026 def __init__(self, cb):
00027 super(TopicPubListenerHandler, self).__init__()
00028 self.uri = None
00029 self.cb = cb
00030
00031 def _ready(self, uri):
00032 self.uri = uri
00033
00034 def _custom_validate(self, validation, param_name, param_value, caller_id):
00035 if validation == 'is_publishers_list':
00036 if not type(param_value) == list:
00037 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00038 for v in param_value:
00039 if not isinstance(v, basestring):
00040 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00041 parsed = urlparse.urlparse(v)
00042 if not parsed[0] or not parsed[1]:
00043 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00044 return param_value
00045 else:
00046 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00047
00048 @apivalidate([])
00049 def getBusStats(self, caller_id):
00050
00051 return 1, '', [[], [], []]
00052
00053 @apivalidate([])
00054 def getBusInfo(self, caller_id):
00055
00056 return 1, '', [[], [], []]
00057
00058 @apivalidate('')
00059 def getMasterUri(self, caller_id):
00060
00061 return 1, '', ''
00062
00063 @apivalidate(0, (None, ))
00064 def shutdown(self, caller_id, msg=''):
00065 return -1, "not authorized", 0
00066
00067 @apivalidate(-1)
00068 def getPid(self, caller_id):
00069 return -1, "not authorized", 0
00070
00071
00072
00073
00074 @apivalidate([])
00075 def getSubscriptions(self, caller_id):
00076 return 1, "subscriptions", [[], []]
00077
00078 @apivalidate([])
00079 def getPublications(self, caller_id):
00080 return 1, "publications", [[], []]
00081
00082 @apivalidate(-1, (global_name('parameter_key'), None))
00083 def paramUpdate(self, caller_id, parameter_key, parameter_value):
00084
00085 return -1, 'not authorized', 0
00086
00087 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00088 def publisherUpdate(self, caller_id, topic, publishers):
00089 self.cb(topic, publishers)
00090
00091 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00092 def requestTopic(self, caller_id, topic, protocols):
00093 return 0, "no supported protocol implementations", []
00094
00095
00096 class RemoteManager(object):
00097 def __init__(self, master_uri, cb):
00098 self.master_uri = master_uri
00099
00100 ns = roslib.names.get_ros_namespace()
00101 anon_name = roslib.names.anonymous_name('master_sync')
00102
00103 self.master = rosgraph.Master(roslib.names.ns_join(ns, anon_name), master_uri=self.master_uri)
00104
00105 self.cb = cb
00106
00107 self.type_cache = {}
00108
00109 self.subs = {}
00110 self.pubs = {}
00111 self.srvs = {}
00112
00113 rpc_handler = TopicPubListenerHandler(self.new_topics)
00114 self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
00115 self.external_node.start()
00116
00117 timeout_t = time.time() + 5.
00118 while time.time() < timeout_t and self.external_node.uri is None:
00119 time.sleep(0.01)
00120
00121
00122 def get_topic_type(self, query_topic):
00123 query_topic = self.resolve(query_topic)
00124
00125 if query_topic in self.type_cache:
00126 return self.type_cache[query_topic]
00127 else:
00128 for topic, topic_type in self.master.getTopicTypes():
00129 self.type_cache[topic] = topic_type
00130 if query_topic in self.type_cache:
00131 return self.type_cache[query_topic]
00132 else:
00133 return "*"
00134
00135 def subscribe(self, topic):
00136 topic = self.resolve(topic)
00137 publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)
00138 self.subs[(topic, self.external_node.uri)] = self.master
00139 self.new_topics(topic, publishers)
00140
00141 def advertise(self, topic, topic_type, uri):
00142 topic = self.resolve(topic)
00143
00144
00145 if (topic, uri) in self.pubs:
00146 return
00147
00148
00149 anon_name = roslib.names.anonymous_name('master_sync')
00150 master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00151
00152 rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
00153
00154 master.registerPublisher(topic, topic_type, uri)
00155 self.pubs[(topic, uri)] = master
00156
00157
00158 def unadvertise(self, topic, uri):
00159 if (topic, uri) in self.pubs:
00160 m = self.pubs[(topic,uri)]
00161 rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
00162 m.unregisterPublisher(topic,uri)
00163 del self.pubs[(topic,uri)]
00164
00165
00166 def advertise_list(self, topic, topic_type, uris):
00167 topic = self.resolve(topic)
00168
00169 unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
00170 for (t,u) in self.pubs.keys():
00171 if (t,u) in unadv:
00172 self.unadvertise(t,u)
00173
00174 for u in uris:
00175 self.advertise(topic, topic_type, u)
00176
00177 def lookup_service(self, service_name):
00178 service_name = self.resolve(service_name)
00179 try:
00180 return self.master.lookupService(service_name)
00181 except rosgraph.masterapi.Error:
00182 return None
00183
00184 def advertise_service(self, service_name, uri):
00185
00186
00187 anon_name = roslib.names.anonymous_name('master_sync')
00188 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00189
00190 if (service_name) in self.srvs:
00191 if self.srvs[service_name][0] == uri:
00192 return
00193 else:
00194 self.unadvertise_service(service_name)
00195
00196 fake_api = 'http://%s:0'%roslib.network.get_host_name()
00197 rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
00198 master.registerService(service_name, uri, fake_api)
00199
00200 self.srvs[service_name] = (uri, master)
00201
00202 def unadvertise_service(self, service_name):
00203 if service_name in self.srvs:
00204 uri,m = self.srvs[service_name]
00205 rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
00206 m.unregisterService(service_name, uri)
00207 del self.srvs[service_name]
00208
00209
00210 def resolve(self, topic):
00211 ns = roslib.names.namespace(self.master.caller_id)
00212 return roslib.names.ns_join(ns, topic)
00213
00214 def unsubscribe_all(self):
00215 for (t,u),m in self.subs.iteritems():
00216 m.unregisterSubscriber(t,u)
00217 for t,u in self.pubs.keys():
00218 self.unadvertise(t,u)
00219 for s in self.srvs.keys():
00220 self.unadvertise_service(s)
00221
00222 def new_topics(self, topic, publishers):
00223 self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
00224
00225
00226 def check_master(m):
00227 try:
00228 m.getUri()
00229 return True
00230 except Exception:
00231 return False
00232
00233 class MasterSync(object):
00234 def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
00235
00236 self.local_service_names = local_service_names
00237 self.local_pub_names = local_pub_names
00238 self.foreign_service_names = foreign_service_names
00239 self.foreign_pub_names = foreign_pub_names
00240
00241 self.local_manager = None
00242 self.foreign_manager = None
00243 self.stopping = False
00244 self.thread = None
00245
00246
00247 local_master = rosgraph.get_master_uri()
00248
00249 m = rosgraph.masterapi.Master(rospy.get_name(), master_uri=foreign_master)
00250 r = rospy.Rate(1)
00251 rospy.loginfo("Waiting for foreign master [%s] to come up..."%(foreign_master))
00252 while not check_master(m) and not rospy.is_shutdown():
00253 r.sleep()
00254
00255 if not rospy.is_shutdown():
00256 rospy.loginfo("Foreign master is available")
00257
00258 self.local_manager = RemoteManager(local_master, self.new_local_topics)
00259 self.foreign_manager = RemoteManager(foreign_master, self.new_foreign_topics)
00260
00261 for t in self.local_pub_names:
00262 self.local_manager.subscribe(t)
00263
00264 for t in self.foreign_pub_names:
00265 self.foreign_manager.subscribe(t)
00266
00267 self.thread = threading.Thread(target=self.spin)
00268 self.thread.start()
00269
00270 else:
00271 rospy.loginfo("shutdown flag raised, aborting...")
00272
00273
00274 def new_local_topics(self, topic, publishers):
00275 topic_type = self.local_manager.get_topic_type(topic)
00276 self.foreign_manager.advertise_list(topic, topic_type, publishers)
00277
00278
00279 def new_foreign_topics(self, topic, publishers):
00280 topic_type = self.foreign_manager.get_topic_type(topic)
00281 self.local_manager.advertise_list(topic, topic_type, publishers)
00282
00283
00284 def stop(self):
00285 self.stopping = True
00286 self.thread.join()
00287
00288 if self.local_manager:
00289 self.local_manager.unsubscribe_all()
00290 if self.foreign_manager:
00291 self.foreign_manager.unsubscribe_all()
00292
00293
00294
00295 def spin(self):
00296 r = rospy.Rate(1.0)
00297 while not rospy.is_shutdown() and not self.stopping:
00298 for s in self.local_service_names:
00299 srv_uri = self.local_manager.lookup_service(s)
00300 if srv_uri is not None:
00301 self.foreign_manager.advertise_service(s, srv_uri)
00302 else:
00303 self.foreign_manager.unadvertise_service(s)
00304 for s in self.foreign_service_names:
00305 srv_uri = self.foreign_manager.lookup_service(s)
00306 if srv_uri is not None:
00307 self.local_manager.advertise_service(s, srv_uri)
00308 else:
00309 self.local_manager.unadvertise_service(s)
00310 r.sleep()