$search
00001 import roslib.network 00002 import rosgraph.masterapi 00003 import rospy 00004 00005 import time 00006 import urlparse 00007 00008 from roslib.xmlrpc import XmlRpcHandler 00009 00010 from rospy.core import global_name, is_topic 00011 from rospy.impl.validators import non_empty, ParameterInvalid 00012 00013 from rospy.impl.masterslave import apivalidate 00014 00015 from roslib.xmlrpc import XmlRpcNode 00016 import roslib.names 00017 import threading 00018 00019 def is_publishers_list(paramName): 00020 return ('is_publishers_list', paramName) 00021 00022 00023 class TopicPubListenerHandler(XmlRpcHandler): 00024 00025 def __init__(self, cb): 00026 super(TopicPubListenerHandler, self).__init__() 00027 self.uri = None 00028 self.cb = cb 00029 00030 def _ready(self, uri): 00031 self.uri = uri 00032 00033 def _custom_validate(self, validation, param_name, param_value, caller_id): 00034 if validation == 'is_publishers_list': 00035 if not type(param_value) == list: 00036 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name) 00037 for v in param_value: 00038 if not isinstance(v, basestring): 00039 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name) 00040 parsed = urlparse.urlparse(v) 00041 if not parsed[0] or not parsed[1]: #protocol and host 00042 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v)) 00043 return param_value 00044 else: 00045 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation)) 00046 00047 @apivalidate([]) 00048 def getBusStats(self, caller_id): 00049 # not supported 00050 return 1, '', [[], [], []] 00051 00052 @apivalidate([]) 00053 def getBusInfo(self, caller_id): 00054 # not supported 00055 return 1, '', [[], [], []] 00056 00057 @apivalidate('') 00058 def getMasterUri(self, caller_id): 00059 # not supported 00060 return 1, '', '' 00061 00062 @apivalidate(0, (None, )) 00063 def shutdown(self, caller_id, msg=''): 00064 return -1, "not authorized", 0 00065 00066 @apivalidate(-1) 00067 def getPid(self, caller_id): 00068 return -1, "not authorized", 0 00069 00070 ############################################################################### 00071 # PUB/SUB APIS 00072 00073 @apivalidate([]) 00074 def getSubscriptions(self, caller_id): 00075 return 1, "subscriptions", [[], []] 00076 00077 @apivalidate([]) 00078 def getPublications(self, caller_id): 00079 return 1, "publications", [[], []] 00080 00081 @apivalidate(-1, (global_name('parameter_key'), None)) 00082 def paramUpdate(self, caller_id, parameter_key, parameter_value): 00083 # not supported 00084 return -1, 'not authorized', 0 00085 00086 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers'))) 00087 def publisherUpdate(self, caller_id, topic, publishers): 00088 self.cb(topic, publishers) 00089 00090 @apivalidate([], (is_topic('topic'), non_empty('protocols'))) 00091 def requestTopic(self, caller_id, topic, protocols): 00092 return 0, "no supported protocol implementations", [] 00093 00094 00095 class RemoteManager(object): 00096 def __init__(self, master_uri, cb): 00097 self.master_uri = master_uri 00098 00099 ns = roslib.names.get_ros_namespace() 00100 anon_name = roslib.names.anonymous_name('master_sync') 00101 00102 self.master = rosgraph.masterapi.Master(roslib.names.ns_join(ns, anon_name), master_uri=self.master_uri) 00103 00104 self.cb = cb 00105 00106 self.type_cache = {} 00107 00108 self.subs = {} 00109 self.pubs = {} 00110 self.srvs = {} 00111 00112 rpc_handler = TopicPubListenerHandler(self.new_topics) 00113 self.external_node = XmlRpcNode(rpc_handler=rpc_handler) 00114 self.external_node.start() 00115 00116 timeout_t = time.time() + 5. 00117 while time.time() < timeout_t and self.external_node.uri is None: 00118 time.sleep(0.01) 00119 00120 00121 def get_topic_type(self, query_topic): 00122 query_topic = self.resolve(query_topic) 00123 00124 if query_topic in self.type_cache: 00125 return self.type_cache[query_topic] 00126 else: 00127 for topic, topic_type in self.master.getTopicTypes(): 00128 self.type_cache[topic] = topic_type 00129 if query_topic in self.type_cache: 00130 return self.type_cache[query_topic] 00131 else: 00132 return "*" 00133 00134 def subscribe(self, topic): 00135 topic = self.resolve(topic) 00136 publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri) 00137 self.subs[(topic, self.external_node.uri)] = self.master 00138 self.new_topics(topic, publishers) 00139 00140 def advertise(self, topic, topic_type, uri): 00141 topic = self.resolve(topic) 00142 00143 # Prevent double-advertisements 00144 if (topic, uri) in self.pubs: 00145 return 00146 00147 # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name 00148 anon_name = roslib.names.anonymous_name('master_sync') 00149 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri) 00150 00151 rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri)) 00152 00153 master.registerPublisher(topic, topic_type, uri) 00154 self.pubs[(topic, uri)] = master 00155 00156 00157 def unadvertise(self, topic, uri): 00158 if (topic, uri) in self.pubs: 00159 m = self.pubs[(topic,uri)] 00160 rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri)) 00161 m.unregisterPublisher(topic,uri) 00162 del self.pubs[(topic,uri)] 00163 00164 00165 def advertise_list(self, topic, topic_type, uris): 00166 topic = self.resolve(topic) 00167 00168 unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris]) 00169 for (t,u) in self.pubs.keys(): 00170 if (t,u) in unadv: 00171 self.unadvertise(t,u) 00172 00173 for u in uris: 00174 self.advertise(topic, topic_type, u) 00175 00176 def lookup_service(self, service_name): 00177 service_name = self.resolve(service_name) 00178 try: 00179 return self.master.lookupService(service_name) 00180 except rosgraph.masterapi.Error: 00181 return None 00182 00183 def advertise_service(self, service_name, uri): 00184 00185 # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name 00186 anon_name = roslib.names.anonymous_name('master_sync') 00187 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri) 00188 00189 if (service_name) in self.srvs: 00190 if self.srvs[service_name][0] == uri: 00191 return 00192 else: 00193 self.unadvertise_service(service_name) 00194 00195 fake_api = 'http://%s:0'%roslib.network.get_host_name() 00196 rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri)) 00197 master.registerService(service_name, uri, fake_api) 00198 00199 self.srvs[service_name] = (uri, master) 00200 00201 def unadvertise_service(self, service_name): 00202 if service_name in self.srvs: 00203 uri,m = self.srvs[service_name] 00204 rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri)) 00205 m.unregisterService(service_name, uri) 00206 del self.srvs[service_name] 00207 00208 00209 def resolve(self, topic): 00210 ns = roslib.names.namespace(self.master.caller_id) 00211 return roslib.names.ns_join(ns, topic) 00212 00213 def unsubscribe_all(self): 00214 for (t,u),m in self.subs.iteritems(): 00215 m.unregisterSubscriber(t,u) 00216 for t,u in self.pubs.keys(): 00217 self.unadvertise(t,u) 00218 for s in self.srvs.keys(): 00219 self.unadvertise_service(s) 00220 00221 def new_topics(self, topic, publishers): 00222 self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs]) 00223 00224 00225 def check_master(m): 00226 try: 00227 m.getUri() 00228 return True 00229 except Exception: 00230 return False 00231 00232 class MasterSync(object): 00233 def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []): 00234 00235 self.local_service_names = local_service_names 00236 self.local_pub_names = local_pub_names 00237 self.foreign_service_names = foreign_service_names 00238 self.foreign_pub_names = foreign_pub_names 00239 00240 self.local_manager = None 00241 self.foreign_manager = None 00242 self.stopping = False 00243 self.thread = None 00244 00245 # Get master URIs 00246 local_master = roslib.rosenv.get_master_uri() 00247 00248 m = rosgraph.masterapi.Master(rospy.get_name(), master_uri=foreign_master) 00249 r = rospy.Rate(1) 00250 rospy.loginfo("Waiting for foreign master [%s] to come up..."%(foreign_master)) 00251 while not check_master(m) and not rospy.is_shutdown(): 00252 r.sleep() 00253 00254 if not rospy.is_shutdown(): 00255 rospy.loginfo("Foreign master is available") 00256 00257 self.local_manager = RemoteManager(local_master, self.new_local_topics) 00258 self.foreign_manager = RemoteManager(foreign_master, self.new_foreign_topics) 00259 00260 for t in self.local_pub_names: 00261 self.local_manager.subscribe(t) 00262 00263 for t in self.foreign_pub_names: 00264 self.foreign_manager.subscribe(t) 00265 00266 self.thread = threading.Thread(target=self.spin) 00267 self.thread.start() 00268 00269 else: 00270 rospy.loginfo("shutdown flag raised, aborting...") 00271 00272 00273 def new_local_topics(self, topic, publishers): 00274 topic_type = self.local_manager.get_topic_type(topic) 00275 self.foreign_manager.advertise_list(topic, topic_type, publishers) 00276 00277 00278 def new_foreign_topics(self, topic, publishers): 00279 topic_type = self.foreign_manager.get_topic_type(topic) 00280 self.local_manager.advertise_list(topic, topic_type, publishers) 00281 00282 00283 def stop(self): 00284 self.stopping = True 00285 self.thread.join() 00286 00287 if self.local_manager: 00288 self.local_manager.unsubscribe_all() 00289 if self.foreign_manager: 00290 self.foreign_manager.unsubscribe_all() 00291 00292 00293 # Spin is necessary to synchronize SERVICES. Topics work entirely on a callback-driven basis 00294 def spin(self): 00295 r = rospy.Rate(1.0) 00296 while not rospy.is_shutdown() and not self.stopping: 00297 for s in self.local_service_names: 00298 srv_uri = self.local_manager.lookup_service(s) 00299 if srv_uri is not None: 00300 self.foreign_manager.advertise_service(s, srv_uri) 00301 else: 00302 self.foreign_manager.unadvertise_service(s) 00303 for s in self.foreign_service_names: 00304 srv_uri = self.foreign_manager.lookup_service(s) 00305 if srv_uri is not None: 00306 self.local_manager.advertise_service(s, srv_uri) 00307 else: 00308 self.local_manager.unadvertise_service(s) 00309 r.sleep()