$search
00001 #!/usr/bin/env python 00002 00003 # Software License Agreement (BSD License) 00004 # 00005 # Copyright (c) 2010, Willow Garage, Inc. 00006 # All rights reserved. 00007 # 00008 # Redistribution and use in source and binary forms, with or without 00009 # modification, are permitted provided that the following conditions 00010 # are met: 00011 # 00012 # * Redistributions of source code must retain the above copyright 00013 # notice, this list of conditions and the following disclaimer. 00014 # * Redistributions in binary form must reproduce the above 00015 # copyright notice, this list of conditions and the following 00016 # disclaimer in the documentation and/or other materials provided 00017 # with the distribution. 00018 # * Neither the name of Willow Garage, Inc. nor the names of its 00019 # contributors may be used to endorse or promote products derived 00020 # from this software without specific prior written permission. 00021 # 00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00033 # POSSIBILITY OF SUCH DAMAGE. 00034 # 00035 00036 NAME="master_sync.py" 00037 00038 import roslib; roslib.load_manifest('rosproxy') 00039 00040 import os 00041 import sys 00042 00043 import roslib.network 00044 import rosgraph.masterapi 00045 import rospy 00046 00047 00048 00049 00050 import os 00051 import sys 00052 import socket 00053 import threading 00054 import traceback 00055 import time 00056 import urlparse 00057 00058 from roslib.xmlrpc import XmlRpcHandler 00059 00060 import rospy.names 00061 00062 import rospy.impl.tcpros 00063 00064 from rospy.core import global_name, is_topic 00065 from rospy.impl.registration import get_topic_manager 00066 from rospy.impl.validators import non_empty, ParameterInvalid 00067 00068 from rospy.impl.masterslave import apivalidate 00069 00070 from roslib.xmlrpc import XmlRpcNode 00071 import roslib.names 00072 00073 00074 def is_publishers_list(paramName): 00075 return ('is_publishers_list', paramName) 00076 00077 00078 class TopicPubListenerHandler(XmlRpcHandler): 00079 00080 def __init__(self, cb): 00081 super(TopicPubListenerHandler, self).__init__() 00082 self.uri = None 00083 self.cb = cb 00084 00085 def _ready(self, uri): 00086 self.uri = uri 00087 00088 def _custom_validate(self, validation, param_name, param_value, caller_id): 00089 if validation == 'is_publishers_list': 00090 if not type(param_value) == list: 00091 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name) 00092 for v in param_value: 00093 if not isinstance(v, basestring): 00094 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name) 00095 parsed = urlparse.urlparse(v) 00096 if not parsed[0] or not parsed[1]: #protocol and host 00097 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v)) 00098 return param_value 00099 else: 00100 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation)) 00101 00102 @apivalidate([]) 00103 def getBusStats(self, caller_id): 00104 # not supported 00105 return 1, '', [[], [], []] 00106 00107 @apivalidate([]) 00108 def getBusInfo(self, caller_id): 00109 # not supported 00110 return 1, '', [[], [], []] 00111 00112 @apivalidate('') 00113 def getMasterUri(self, caller_id): 00114 # not supported 00115 return 1, '', '' 00116 00117 @apivalidate(0, (None, )) 00118 def shutdown(self, caller_id, msg=''): 00119 return -1, "not authorized", 0 00120 00121 @apivalidate(-1) 00122 def getPid(self, caller_id): 00123 return -1, "not authorized", 0 00124 00125 ############################################################################### 00126 # PUB/SUB APIS 00127 00128 @apivalidate([]) 00129 def getSubscriptions(self, caller_id): 00130 return 1, "subscriptions", [[], []] 00131 00132 @apivalidate([]) 00133 def getPublications(self, caller_id): 00134 return 1, "publications", [[], []] 00135 00136 @apivalidate(-1, (global_name('parameter_key'), None)) 00137 def paramUpdate(self, caller_id, parameter_key, parameter_value): 00138 # not supported 00139 return -1, 'not authorized', 0 00140 00141 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers'))) 00142 def publisherUpdate(self, caller_id, topic, publishers): 00143 self.cb(topic, publishers) 00144 00145 @apivalidate([], (is_topic('topic'), non_empty('protocols'))) 00146 def requestTopic(self, caller_id, topic, protocols): 00147 return 0, "no supported protocol implementations", [] 00148 00149 00150 class RemoteManager(object): 00151 def __init__(self, master_uri, cb): 00152 self.master_uri = master_uri 00153 00154 ns = roslib.names.get_ros_namespace() 00155 anon_name = roslib.names.anonymous_name('master_sync') 00156 00157 self.master = rosgraph.masterapi.Master(roslib.names.ns_join(ns, anon_name), master_uri=self.master_uri) 00158 00159 self.cb = cb 00160 00161 self.type_cache = {} 00162 00163 self.subs = {} 00164 self.pubs = {} 00165 self.srvs = {} 00166 00167 rpc_handler = TopicPubListenerHandler(self.new_topics) 00168 self.external_node = XmlRpcNode(rpc_handler=rpc_handler) 00169 self.external_node.start() 00170 00171 timeout_t = time.time() + 5. 00172 while time.time() < timeout_t and self.external_node.uri is None: 00173 time.sleep(0.01) 00174 00175 00176 def get_topic_type(self, query_topic): 00177 query_topic = self.resolve(query_topic) 00178 00179 if query_topic in self.type_cache: 00180 return self.type_cache[query_topic] 00181 else: 00182 for topic, topic_type in self.master.getTopicTypes(): 00183 self.type_cache[topic] = topic_type 00184 if query_topic in self.type_cache: 00185 return self.type_cache[query_topic] 00186 else: 00187 return "*" 00188 00189 def subscribe(self, topic): 00190 topic = self.resolve(topic) 00191 publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri) 00192 self.subs[(topic, self.external_node.uri)] = self.master 00193 self.new_topics(topic, publishers) 00194 00195 def advertise(self, topic, topic_type, uri): 00196 topic = self.resolve(topic) 00197 00198 # Prevent double-advertisements 00199 if (topic, uri) in self.pubs: 00200 return 00201 00202 # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name 00203 anon_name = roslib.names.anonymous_name('master_sync') 00204 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri) 00205 00206 rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri)) 00207 00208 master.registerPublisher(topic, topic_type, uri) 00209 self.pubs[(topic, uri)] = master 00210 00211 00212 def unadvertise(self, topic, uri): 00213 if (topic, uri) in self.pubs: 00214 m = self.pubs[(topic,uri)] 00215 rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri)) 00216 m.unregisterPublisher(topic,uri) 00217 del self.pubs[(topic,uri)] 00218 00219 00220 def advertise_list(self, topic, topic_type, uris): 00221 topic = self.resolve(topic) 00222 00223 unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris]) 00224 for (t,u) in self.pubs.keys(): 00225 if (t,u) in unadv: 00226 self.unadvertise(t,u) 00227 00228 for u in uris: 00229 self.advertise(topic, topic_type, u) 00230 00231 def lookup_service(self, service_name): 00232 service_name = self.resolve(service_name) 00233 try: 00234 return self.master.lookupService(service_name) 00235 except rosgraph.masterapi.Error: 00236 return None 00237 00238 def advertise_service(self, service_name, uri): 00239 00240 # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name 00241 anon_name = roslib.names.anonymous_name('master_sync') 00242 master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri) 00243 00244 if (service_name) in self.srvs: 00245 if self.srvs[service_name][0] == uri: 00246 return 00247 else: 00248 self.unadvertise_service(service_name) 00249 00250 fake_api = 'http://%s:0'%roslib.network.get_host_name() 00251 rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri)) 00252 master.registerService(service_name, uri, fake_api) 00253 00254 self.srvs[service_name] = (uri, master) 00255 00256 def unadvertise_service(self, service_name): 00257 if service_name in self.srvs: 00258 uri,m = self.srvs[service_name] 00259 rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri)) 00260 m.unregisterService(service_name, uri) 00261 del self.srvs[service_name] 00262 00263 00264 def resolve(self, topic): 00265 ns = roslib.names.namespace(self.master.caller_id) 00266 return roslib.names.ns_join(ns, topic) 00267 00268 def unsubscribe_all(self): 00269 for (t,u),m in self.subs.iteritems(): 00270 m.unregisterSubscriber(t,u) 00271 for t,u in self.pubs.keys(): 00272 self.unadvertise(t,u) 00273 for s in self.srvs.keys(): 00274 self.unadvertise_service(s) 00275 00276 def new_topics(self, topic, publishers): 00277 self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs]) 00278 00279 00280 def check_master(m): 00281 try: 00282 m.getUri() 00283 return True 00284 except Exception: 00285 return False 00286 00287 class MasterSync(object): 00288 def __init__(self): 00289 rospy.init_node('master_sync') 00290 00291 00292 self.local_service_names = rospy.get_param('~local_services', {}) 00293 self.local_pub_names = rospy.get_param('~local_pubs', []) 00294 self.foreign_service_names = rospy.get_param('~foreign_services', {}) 00295 self.foreign_pub_names = rospy.get_param('~foreign_pubs', []) 00296 00297 # Get master URIs 00298 lm = roslib.rosenv.get_master_uri() 00299 fm = rospy.get_param('~foreign_master', 'http://localhost:11312') 00300 00301 m = rosgraph.masterapi.Master(rospy.get_name(), master_uri=fm) 00302 r = rospy.Rate(1) 00303 while not check_master(m) and not rospy.is_shutdown(): 00304 rospy.loginfo("Waiting for foreign master to come up...") 00305 r.sleep() 00306 00307 self.local_manager = None 00308 self.foreign_manager = None 00309 00310 if not rospy.is_shutdown(): 00311 00312 self.local_manager = RemoteManager(lm, self.new_local_topics) 00313 self.foreign_manager = RemoteManager(fm, self.new_foreign_topics) 00314 00315 for t in self.local_pub_names: 00316 self.local_manager.subscribe(t) 00317 00318 for t in self.foreign_pub_names: 00319 self.foreign_manager.subscribe(t) 00320 00321 def new_local_topics(self, topic, publishers): 00322 topic_type = self.local_manager.get_topic_type(topic) 00323 self.foreign_manager.advertise_list(topic, topic_type, publishers) 00324 00325 00326 def new_foreign_topics(self, topic, publishers): 00327 topic_type = self.foreign_manager.get_topic_type(topic) 00328 self.local_manager.advertise_list(topic, topic_type, publishers) 00329 00330 00331 def spin(self): 00332 r = rospy.Rate(1.0) 00333 while not rospy.is_shutdown(): 00334 for s in self.local_service_names: 00335 srv_uri = self.local_manager.lookup_service(s) 00336 if srv_uri is not None: 00337 self.foreign_manager.advertise_service(s, srv_uri) 00338 else: 00339 self.foreign_manager.unadvertise_service(s) 00340 for s in self.foreign_service_names: 00341 srv_uri = self.foreign_manager.lookup_service(s) 00342 if srv_uri is not None: 00343 self.local_manager.advertise_service(s, srv_uri) 00344 else: 00345 self.local_manager.unadvertise_service(s) 00346 r.sleep() 00347 00348 if self.local_manager: 00349 self.local_manager.unsubscribe_all() 00350 if self.foreign_manager: 00351 self.foreign_manager.unsubscribe_all() 00352 00353 00354 def master_sync_main(): 00355 r = MasterSync() 00356 r.spin() 00357 00358 if __name__ == '__main__': 00359 master_sync_main()