master_sync.py
Go to the documentation of this file.
00001 import threading
00002 import time
00003 import urlparse
00004 
00005 import rosgraph
00006 import rosgraph.names
00007 import rosgraph.network
00008 
00009 import rospy
00010 
00011 from rospy.core import global_name, is_topic
00012 from rospy.impl.validators import non_empty, ParameterInvalid
00013 
00014 from rospy.impl.masterslave import apivalidate
00015 
00016 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler
00017 
00018 def is_publishers_list(paramName):
00019     return ('is_publishers_list', paramName)
00020 
00021 
00022 class TopicPubListenerHandler(XmlRpcHandler):
00023 
00024     def __init__(self, cb):
00025         super(TopicPubListenerHandler, self).__init__()
00026         self.uri = None
00027         self.cb = cb
00028 
00029     def _ready(self, uri):
00030         self.uri = uri
00031 
00032     def _custom_validate(self, validation, param_name, param_value, caller_id):
00033         if validation == 'is_publishers_list':
00034             if not type(param_value) == list:
00035                 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00036             for v in param_value:
00037                 if not isinstance(v, basestring):
00038                     raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00039                 parsed = urlparse.urlparse(v)
00040                 if not parsed[0] or not parsed[1]: #protocol and host
00041                     raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00042             return param_value
00043         else:
00044             raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00045 
00046     @apivalidate([])
00047     def getBusStats(self, caller_id):
00048         # not supported
00049         return 1, '', [[], [], []]
00050 
00051     @apivalidate([])
00052     def getBusInfo(self, caller_id):
00053         # not supported
00054         return 1, '', [[], [], []]
00055     
00056     @apivalidate('')
00057     def getMasterUri(self, caller_id):
00058         # not supported
00059         return 1, '', ''
00060         
00061     @apivalidate(0, (None, ))
00062     def shutdown(self, caller_id, msg=''):
00063         return -1, "not authorized", 0
00064 
00065     @apivalidate(-1)
00066     def getPid(self, caller_id):
00067         return -1, "not authorized", 0
00068 
00069     ###############################################################################
00070     # PUB/SUB APIS
00071 
00072     @apivalidate([])
00073     def getSubscriptions(self, caller_id):
00074         return 1, "subscriptions", [[], []]
00075 
00076     @apivalidate([])
00077     def getPublications(self, caller_id):
00078         return 1, "publications", [[], []]
00079     
00080     @apivalidate(-1, (global_name('parameter_key'), None))
00081     def paramUpdate(self, caller_id, parameter_key, parameter_value):
00082         # not supported
00083         return -1, 'not authorized', 0
00084 
00085     @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00086     def publisherUpdate(self, caller_id, topic, publishers):
00087         self.cb(topic, publishers)
00088     
00089     @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00090     def requestTopic(self, caller_id, topic, protocols):
00091         return 0, "no supported protocol implementations", []
00092 
00093 
00094 class RemoteManager(object):
00095     def __init__(self, master_uri, cb):
00096         self.master_uri = master_uri
00097 
00098         ns = rosgraph.names.get_ros_namespace()
00099         anon_name = rosgraph.names.anonymous_name('master_sync')
00100 
00101         self.master = rosgraph.Master(rosgraph.names.ns_join(ns, anon_name), master_uri=self.master_uri)
00102 
00103         self.cb = cb
00104 
00105         self.type_cache = {}
00106 
00107         self.subs = {}
00108         self.pubs = {}
00109         self.srvs = {}
00110 
00111         rpc_handler = TopicPubListenerHandler(self.new_topics)
00112         self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
00113         self.external_node.start()
00114 
00115         timeout_t = time.time() + 5.
00116         while time.time() < timeout_t and self.external_node.uri is None:
00117             time.sleep(0.01)
00118 
00119 
00120     def get_topic_type(self, query_topic):
00121         query_topic = self.resolve(query_topic)
00122 
00123         if query_topic in self.type_cache:
00124             return self.type_cache[query_topic]
00125         else:
00126             for topic, topic_type in self.master.getTopicTypes():
00127                 self.type_cache[topic] = topic_type
00128             if query_topic in self.type_cache:
00129                 return self.type_cache[query_topic]
00130             else:
00131                 return "*"
00132 
00133     def subscribe(self, topic):
00134         topic = self.resolve(topic)
00135         publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)        
00136         self.subs[(topic, self.external_node.uri)] = self.master
00137         self.new_topics(topic, publishers)
00138 
00139     def advertise(self, topic, topic_type, uri):
00140         topic = self.resolve(topic)
00141 
00142         # Prevent double-advertisements
00143         if (topic, uri) in self.pubs:
00144             return
00145 
00146         # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
00147         anon_name = rosgraph.names.anonymous_name('master_sync')
00148         master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00149 
00150         rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
00151 
00152         master.registerPublisher(topic, topic_type, uri)
00153         self.pubs[(topic, uri)] = master
00154 
00155 
00156     def unadvertise(self, topic, uri):
00157         if (topic, uri) in self.pubs:
00158             m = self.pubs[(topic,uri)]
00159             rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
00160             m.unregisterPublisher(topic,uri)
00161             del self.pubs[(topic,uri)]
00162 
00163 
00164     def advertise_list(self, topic, topic_type, uris):
00165         topic = self.resolve(topic)
00166 
00167         unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
00168         for (t,u) in self.pubs.keys():
00169             if (t,u) in unadv:
00170                 self.unadvertise(t,u)
00171 
00172         for u in uris:
00173             self.advertise(topic, topic_type, u)
00174 
00175     def lookup_service(self, service_name):
00176         service_name = self.resolve(service_name)
00177         try:
00178             return self.master.lookupService(service_name)
00179         except rosgraph.MasterError:
00180             return None
00181 
00182     def advertise_service(self, service_name, uri):
00183 
00184         # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
00185         anon_name = rosgraph.names.anonymous_name('master_sync')
00186         master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00187 
00188         if (service_name) in self.srvs:
00189             if self.srvs[service_name][0] == uri:
00190                 return
00191             else:
00192                 self.unadvertise_service(service_name)
00193 
00194         fake_api = 'http://%s:0'%rosgraph.network.get_host_name()
00195         rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
00196         master.registerService(service_name, uri, fake_api)
00197 
00198         self.srvs[service_name] = (uri, master)
00199 
00200     def unadvertise_service(self, service_name):
00201         if service_name in self.srvs:
00202             uri,m = self.srvs[service_name]
00203             rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
00204             m.unregisterService(service_name, uri)
00205             del self.srvs[service_name]
00206 
00207 
00208     def resolve(self, topic):
00209         ns = rosgraph.names.namespace(self.master.caller_id)
00210         return rosgraph.names.ns_join(ns, topic)
00211 
00212     def unsubscribe_all(self):
00213         for (t,u),m in self.subs.iteritems():
00214             m.unregisterSubscriber(t,u)
00215         for t,u in self.pubs.keys():
00216             self.unadvertise(t,u)
00217         for s in self.srvs.keys():
00218             self.unadvertise_service(s)
00219         
00220     def new_topics(self, topic, publishers):
00221         self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
00222 
00223 
00224 def check_master(m):
00225     try:
00226         m.getUri()
00227         return True
00228     except Exception:
00229         return False
00230 
00231 class MasterSync(object):
00232     def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
00233 
00234         self.local_service_names   = local_service_names
00235         self.local_pub_names       = local_pub_names
00236         self.foreign_service_names = foreign_service_names
00237         self.foreign_pub_names     = foreign_pub_names
00238 
00239         self.local_manager = None
00240         self.foreign_manager = None
00241         self.stopping = False
00242         self.thread = None
00243             
00244         # Get master URIs
00245         local_master = rosgraph.get_master_uri()
00246 
00247         m = rosgraph.Master(rospy.get_name(), master_uri=foreign_master)
00248         r = rospy.Rate(1)
00249         rospy.loginfo("Waiting for foreign master [%s] to come up..."%(foreign_master))
00250         while not check_master(m) and not rospy.is_shutdown():
00251             r.sleep()
00252 
00253         if not rospy.is_shutdown():
00254             rospy.loginfo("Foreign master is available")
00255             
00256             self.local_manager = RemoteManager(local_master, self.new_local_topics)
00257             self.foreign_manager = RemoteManager(foreign_master, self.new_foreign_topics)
00258 
00259             for t in self.local_pub_names:
00260                 self.local_manager.subscribe(t)
00261 
00262             for t in self.foreign_pub_names:
00263                 self.foreign_manager.subscribe(t)
00264 
00265             self.thread = threading.Thread(target=self.spin)
00266             self.thread.start()
00267 
00268         else:
00269             rospy.loginfo("shutdown flag raised, aborting...")
00270 
00271         
00272     def new_local_topics(self, topic, publishers):
00273         topic_type = self.local_manager.get_topic_type(topic)
00274         self.foreign_manager.advertise_list(topic, topic_type, publishers)
00275 
00276 
00277     def new_foreign_topics(self, topic, publishers):
00278         topic_type = self.foreign_manager.get_topic_type(topic)
00279         self.local_manager.advertise_list(topic, topic_type, publishers)
00280 
00281 
00282     def stop(self):
00283         self.stopping = True
00284         self.thread.join()
00285 
00286         if self.local_manager:
00287             self.local_manager.unsubscribe_all()
00288         if self.foreign_manager:
00289             self.foreign_manager.unsubscribe_all()
00290 
00291 
00292     # Spin is necessary to synchronize SERVICES.  Topics work entirely on a callback-driven basis
00293     def spin(self):
00294         r = rospy.Rate(1.0)
00295         while not rospy.is_shutdown() and not self.stopping:
00296             for s in self.local_service_names:
00297                 srv_uri = self.local_manager.lookup_service(s)
00298                 if srv_uri is not None:
00299                     self.foreign_manager.advertise_service(s, srv_uri)
00300                 else:
00301                     self.foreign_manager.unadvertise_service(s)
00302             for s in self.foreign_service_names:
00303                 srv_uri = self.foreign_manager.lookup_service(s)
00304                 if srv_uri is not None:
00305                     self.local_manager.advertise_service(s, srv_uri)
00306                 else:
00307                     self.local_manager.unadvertise_service(s)
00308             r.sleep()


app_manager
Author(s): Jeremy Leibs, Ken Conley
autogenerated on Thu Sep 3 2015 10:15:41