00001 import threading
00002 import time
00003 import urlparse
00004
00005 import rosgraph
00006 import rosgraph.names
00007 import rosgraph.network
00008
00009 import rospy
00010
00011 from rospy.core import global_name, is_topic
00012 from rospy.impl.validators import non_empty, ParameterInvalid
00013
00014 from rospy.impl.masterslave import apivalidate
00015
00016 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler
00017
00018 def is_publishers_list(paramName):
00019 return ('is_publishers_list', paramName)
00020
00021
00022 class TopicPubListenerHandler(XmlRpcHandler):
00023
00024 def __init__(self, cb):
00025 super(TopicPubListenerHandler, self).__init__()
00026 self.uri = None
00027 self.cb = cb
00028
00029 def _ready(self, uri):
00030 self.uri = uri
00031
00032 def _custom_validate(self, validation, param_name, param_value, caller_id):
00033 if validation == 'is_publishers_list':
00034 if not type(param_value) == list:
00035 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00036 for v in param_value:
00037 if not isinstance(v, basestring):
00038 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00039 parsed = urlparse.urlparse(v)
00040 if not parsed[0] or not parsed[1]:
00041 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00042 return param_value
00043 else:
00044 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00045
00046 @apivalidate([])
00047 def getBusStats(self, caller_id):
00048
00049 return 1, '', [[], [], []]
00050
00051 @apivalidate([])
00052 def getBusInfo(self, caller_id):
00053
00054 return 1, '', [[], [], []]
00055
00056 @apivalidate('')
00057 def getMasterUri(self, caller_id):
00058
00059 return 1, '', ''
00060
00061 @apivalidate(0, (None, ))
00062 def shutdown(self, caller_id, msg=''):
00063 return -1, "not authorized", 0
00064
00065 @apivalidate(-1)
00066 def getPid(self, caller_id):
00067 return -1, "not authorized", 0
00068
00069
00070
00071
00072 @apivalidate([])
00073 def getSubscriptions(self, caller_id):
00074 return 1, "subscriptions", [[], []]
00075
00076 @apivalidate([])
00077 def getPublications(self, caller_id):
00078 return 1, "publications", [[], []]
00079
00080 @apivalidate(-1, (global_name('parameter_key'), None))
00081 def paramUpdate(self, caller_id, parameter_key, parameter_value):
00082
00083 return -1, 'not authorized', 0
00084
00085 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00086 def publisherUpdate(self, caller_id, topic, publishers):
00087 self.cb(topic, publishers)
00088
00089 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00090 def requestTopic(self, caller_id, topic, protocols):
00091 return 0, "no supported protocol implementations", []
00092
00093
00094 class RemoteManager(object):
00095 def __init__(self, master_uri, cb):
00096 self.master_uri = master_uri
00097
00098 ns = rosgraph.names.get_ros_namespace()
00099 anon_name = rosgraph.names.anonymous_name('master_sync')
00100
00101 self.master = rosgraph.Master(rosgraph.names.ns_join(ns, anon_name), master_uri=self.master_uri)
00102
00103 self.cb = cb
00104
00105 self.type_cache = {}
00106
00107 self.subs = {}
00108 self.pubs = {}
00109 self.srvs = {}
00110
00111 rpc_handler = TopicPubListenerHandler(self.new_topics)
00112 self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
00113 self.external_node.start()
00114
00115 timeout_t = time.time() + 5.
00116 while time.time() < timeout_t and self.external_node.uri is None:
00117 time.sleep(0.01)
00118
00119
00120 def get_topic_type(self, query_topic):
00121 query_topic = self.resolve(query_topic)
00122
00123 if query_topic in self.type_cache:
00124 return self.type_cache[query_topic]
00125 else:
00126 for topic, topic_type in self.master.getTopicTypes():
00127 self.type_cache[topic] = topic_type
00128 if query_topic in self.type_cache:
00129 return self.type_cache[query_topic]
00130 else:
00131 return "*"
00132
00133 def subscribe(self, topic):
00134 topic = self.resolve(topic)
00135 publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)
00136 self.subs[(topic, self.external_node.uri)] = self.master
00137 self.new_topics(topic, publishers)
00138
00139 def advertise(self, topic, topic_type, uri):
00140 topic = self.resolve(topic)
00141
00142
00143 if (topic, uri) in self.pubs:
00144 return
00145
00146
00147 anon_name = rosgraph.names.anonymous_name('master_sync')
00148 master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00149
00150 rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
00151
00152 master.registerPublisher(topic, topic_type, uri)
00153 self.pubs[(topic, uri)] = master
00154
00155
00156 def unadvertise(self, topic, uri):
00157 if (topic, uri) in self.pubs:
00158 m = self.pubs[(topic,uri)]
00159 rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
00160 m.unregisterPublisher(topic,uri)
00161 del self.pubs[(topic,uri)]
00162
00163
00164 def advertise_list(self, topic, topic_type, uris):
00165 topic = self.resolve(topic)
00166
00167 unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
00168 for (t,u) in self.pubs.keys():
00169 if (t,u) in unadv:
00170 self.unadvertise(t,u)
00171
00172 for u in uris:
00173 self.advertise(topic, topic_type, u)
00174
00175 def lookup_service(self, service_name):
00176 service_name = self.resolve(service_name)
00177 try:
00178 return self.master.lookupService(service_name)
00179 except rosgraph.MasterError:
00180 return None
00181
00182 def advertise_service(self, service_name, uri):
00183
00184
00185 anon_name = rosgraph.names.anonymous_name('master_sync')
00186 master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00187
00188 if (service_name) in self.srvs:
00189 if self.srvs[service_name][0] == uri:
00190 return
00191 else:
00192 self.unadvertise_service(service_name)
00193
00194 fake_api = 'http://%s:0'%rosgraph.network.get_host_name()
00195 rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
00196 master.registerService(service_name, uri, fake_api)
00197
00198 self.srvs[service_name] = (uri, master)
00199
00200 def unadvertise_service(self, service_name):
00201 if service_name in self.srvs:
00202 uri,m = self.srvs[service_name]
00203 rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
00204 m.unregisterService(service_name, uri)
00205 del self.srvs[service_name]
00206
00207
00208 def resolve(self, topic):
00209 ns = rosgraph.names.namespace(self.master.caller_id)
00210 return rosgraph.names.ns_join(ns, topic)
00211
00212 def unsubscribe_all(self):
00213 for (t,u),m in self.subs.iteritems():
00214 m.unregisterSubscriber(t,u)
00215 for t,u in self.pubs.keys():
00216 self.unadvertise(t,u)
00217 for s in self.srvs.keys():
00218 self.unadvertise_service(s)
00219
00220 def new_topics(self, topic, publishers):
00221 self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
00222
00223
00224 def check_master(m):
00225 try:
00226 m.getUri()
00227 return True
00228 except Exception:
00229 return False
00230
00231 class MasterSync(object):
00232 def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
00233
00234 self.local_service_names = local_service_names
00235 self.local_pub_names = local_pub_names
00236 self.foreign_service_names = foreign_service_names
00237 self.foreign_pub_names = foreign_pub_names
00238
00239 self.local_manager = None
00240 self.foreign_manager = None
00241 self.stopping = False
00242 self.thread = None
00243
00244
00245 local_master = rosgraph.get_master_uri()
00246
00247 m = rosgraph.Master(rospy.get_name(), master_uri=foreign_master)
00248 r = rospy.Rate(1)
00249 rospy.loginfo("App Manager: waiting for foreign master [%s] to come up..."%(foreign_master))
00250 while not check_master(m) and not rospy.is_shutdown():
00251 r.sleep()
00252
00253 if not rospy.is_shutdown():
00254 rospy.loginfo("App Manager: foreign master is available")
00255
00256 self.local_manager = RemoteManager(local_master, self.new_local_topics)
00257 self.foreign_manager = RemoteManager(foreign_master, self.new_foreign_topics)
00258
00259 for t in self.local_pub_names:
00260 self.local_manager.subscribe(t)
00261
00262 for t in self.foreign_pub_names:
00263 self.foreign_manager.subscribe(t)
00264
00265 self.thread = threading.Thread(target=self.spin)
00266 self.thread.start()
00267
00268 else:
00269 rospy.loginfo("shutdown flag raised, aborting...")
00270
00271
00272 def new_local_topics(self, topic, publishers):
00273 topic_type = self.local_manager.get_topic_type(topic)
00274 self.foreign_manager.advertise_list(topic, topic_type, publishers)
00275
00276
00277 def new_foreign_topics(self, topic, publishers):
00278 topic_type = self.foreign_manager.get_topic_type(topic)
00279 self.local_manager.advertise_list(topic, topic_type, publishers)
00280
00281
00282 def stop(self):
00283 self.stopping = True
00284 self.thread.join()
00285
00286 try:
00287 if self.local_manager:
00288 self.local_manager.unsubscribe_all()
00289 if self.foreign_manager:
00290 self.foreign_manager.unsubscribe_all()
00291 except Exception as e:
00292 rospy.loginfo("Error while unsubscribing topics : " + str(e))
00293
00294
00295 def spin(self):
00296 r = rospy.Rate(1.0)
00297 while not rospy.is_shutdown() and not self.stopping:
00298 for s in self.local_service_names:
00299 srv_uri = self.local_manager.lookup_service(s)
00300 if srv_uri is not None:
00301 self.foreign_manager.advertise_service(s, srv_uri)
00302 else:
00303 self.foreign_manager.unadvertise_service(s)
00304 for s in self.foreign_service_names:
00305 srv_uri = self.foreign_manager.lookup_service(s)
00306 if srv_uri is not None:
00307 self.local_manager.advertise_service(s, srv_uri)
00308 else:
00309 self.local_manager.unadvertise_service(s)
00310 r.sleep()