master_sync.py
Go to the documentation of this file.
00001 import roslib.network
00002 import rosgraph.masterapi
00003 import rospy
00004 
00005 import time
00006 import urlparse
00007 
00008 
00009 from rospy.core import global_name, is_topic
00010 from rospy.impl.validators import non_empty, ParameterInvalid
00011 
00012 from rospy.impl.masterslave import apivalidate
00013 
00014 import rosgraph
00015 from rosgraph.xmlrpc import XmlRpcHandler
00016 from rosgraph.xmlrpc import XmlRpcNode
00017 import roslib.names
00018 import threading
00019 
00020 def is_publishers_list(paramName):
00021     return ('is_publishers_list', paramName)
00022 
00023 
00024 class TopicPubListenerHandler(XmlRpcHandler):
00025 
00026     def __init__(self, cb):
00027         super(TopicPubListenerHandler, self).__init__()
00028         self.uri = None
00029         self.cb = cb
00030 
00031     def _ready(self, uri):
00032         self.uri = uri
00033 
00034     def _custom_validate(self, validation, param_name, param_value, caller_id):
00035         if validation == 'is_publishers_list':
00036             if not type(param_value) == list:
00037                 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00038             for v in param_value:
00039                 if not isinstance(v, basestring):
00040                     raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00041                 parsed = urlparse.urlparse(v)
00042                 if not parsed[0] or not parsed[1]: #protocol and host
00043                     raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00044             return param_value
00045         else:
00046             raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00047 
00048     @apivalidate([])
00049     def getBusStats(self, caller_id):
00050         # not supported
00051         return 1, '', [[], [], []]
00052 
00053     @apivalidate([])
00054     def getBusInfo(self, caller_id):
00055         # not supported
00056         return 1, '', [[], [], []]
00057     
00058     @apivalidate('')
00059     def getMasterUri(self, caller_id):
00060         # not supported
00061         return 1, '', ''
00062         
00063     @apivalidate(0, (None, ))
00064     def shutdown(self, caller_id, msg=''):
00065         return -1, "not authorized", 0
00066 
00067     @apivalidate(-1)
00068     def getPid(self, caller_id):
00069         return -1, "not authorized", 0
00070 
00071     ###############################################################################
00072     # PUB/SUB APIS
00073 
00074     @apivalidate([])
00075     def getSubscriptions(self, caller_id):
00076         return 1, "subscriptions", [[], []]
00077 
00078     @apivalidate([])
00079     def getPublications(self, caller_id):
00080         return 1, "publications", [[], []]
00081     
00082     @apivalidate(-1, (global_name('parameter_key'), None))
00083     def paramUpdate(self, caller_id, parameter_key, parameter_value):
00084         # not supported
00085         return -1, 'not authorized', 0
00086 
00087     @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00088     def publisherUpdate(self, caller_id, topic, publishers):
00089         self.cb(topic, publishers)
00090     
00091     @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00092     def requestTopic(self, caller_id, topic, protocols):
00093         return 0, "no supported protocol implementations", []
00094 
00095 
00096 class RemoteManager(object):
00097     def __init__(self, master_uri, cb):
00098         self.master_uri = master_uri
00099 
00100         ns = roslib.names.get_ros_namespace()
00101         anon_name = roslib.names.anonymous_name('master_sync')
00102 
00103         self.master = rosgraph.Master(roslib.names.ns_join(ns, anon_name), master_uri=self.master_uri)
00104 
00105         self.cb = cb
00106 
00107         self.type_cache = {}
00108 
00109         self.subs = {}
00110         self.pubs = {}
00111         self.srvs = {}
00112 
00113         rpc_handler = TopicPubListenerHandler(self.new_topics)
00114         self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
00115         self.external_node.start()
00116 
00117         timeout_t = time.time() + 5.
00118         while time.time() < timeout_t and self.external_node.uri is None:
00119             time.sleep(0.01)
00120 
00121 
00122     def get_topic_type(self, query_topic):
00123         query_topic = self.resolve(query_topic)
00124 
00125         if query_topic in self.type_cache:
00126             return self.type_cache[query_topic]
00127         else:
00128             for topic, topic_type in self.master.getTopicTypes():
00129                 self.type_cache[topic] = topic_type
00130             if query_topic in self.type_cache:
00131                 return self.type_cache[query_topic]
00132             else:
00133                 return "*"
00134 
00135     def subscribe(self, topic):
00136         topic = self.resolve(topic)
00137         publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)        
00138         self.subs[(topic, self.external_node.uri)] = self.master
00139         self.new_topics(topic, publishers)
00140 
00141     def advertise(self, topic, topic_type, uri):
00142         topic = self.resolve(topic)
00143 
00144         # Prevent double-advertisements
00145         if (topic, uri) in self.pubs:
00146             return
00147 
00148         # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
00149         anon_name = roslib.names.anonymous_name('master_sync')
00150         master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00151 
00152         rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
00153 
00154         master.registerPublisher(topic, topic_type, uri)
00155         self.pubs[(topic, uri)] = master
00156 
00157 
00158     def unadvertise(self, topic, uri):
00159         if (topic, uri) in self.pubs:
00160             m = self.pubs[(topic,uri)]
00161             rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
00162             m.unregisterPublisher(topic,uri)
00163             del self.pubs[(topic,uri)]
00164 
00165 
00166     def advertise_list(self, topic, topic_type, uris):
00167         topic = self.resolve(topic)
00168 
00169         unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
00170         for (t,u) in self.pubs.keys():
00171             if (t,u) in unadv:
00172                 self.unadvertise(t,u)
00173 
00174         for u in uris:
00175             self.advertise(topic, topic_type, u)
00176 
00177     def lookup_service(self, service_name):
00178         service_name = self.resolve(service_name)
00179         try:
00180             return self.master.lookupService(service_name)
00181         except rosgraph.masterapi.Error:
00182             return None
00183 
00184     def advertise_service(self, service_name, uri):
00185 
00186         # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
00187         anon_name = roslib.names.anonymous_name('master_sync')
00188         master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00189 
00190         if (service_name) in self.srvs:
00191             if self.srvs[service_name][0] == uri:
00192                 return
00193             else:
00194                 self.unadvertise_service(service_name)
00195 
00196         fake_api = 'http://%s:0'%roslib.network.get_host_name()
00197         rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
00198         master.registerService(service_name, uri, fake_api)
00199 
00200         self.srvs[service_name] = (uri, master)
00201 
00202     def unadvertise_service(self, service_name):
00203         if service_name in self.srvs:
00204             uri,m = self.srvs[service_name]
00205             rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
00206             m.unregisterService(service_name, uri)
00207             del self.srvs[service_name]
00208 
00209 
00210     def resolve(self, topic):
00211         ns = roslib.names.namespace(self.master.caller_id)
00212         return roslib.names.ns_join(ns, topic)
00213 
00214     def unsubscribe_all(self):
00215         for (t,u),m in self.subs.iteritems():
00216             m.unregisterSubscriber(t,u)
00217         for t,u in self.pubs.keys():
00218             self.unadvertise(t,u)
00219         for s in self.srvs.keys():
00220             self.unadvertise_service(s)
00221         
00222     def new_topics(self, topic, publishers):
00223         self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
00224 
00225 
00226 def check_master(m):
00227     try:
00228         m.getUri()
00229         return True
00230     except Exception:
00231         return False
00232 
00233 class MasterSync(object):
00234     def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
00235 
00236         self.local_service_names   = local_service_names
00237         self.local_pub_names       = local_pub_names
00238         self.foreign_service_names = foreign_service_names
00239         self.foreign_pub_names     = foreign_pub_names
00240 
00241         self.local_manager = None
00242         self.foreign_manager = None
00243         self.stopping = False
00244         self.thread = None
00245             
00246         # Get master URIs
00247         local_master = rosgraph.get_master_uri()
00248 
00249         m = rosgraph.masterapi.Master(rospy.get_name(), master_uri=foreign_master)
00250         r = rospy.Rate(1)
00251         rospy.loginfo("Waiting for foreign master [%s] to come up..."%(foreign_master))
00252         while not check_master(m) and not rospy.is_shutdown():
00253             r.sleep()
00254 
00255         if not rospy.is_shutdown():
00256             rospy.loginfo("Foreign master is available")
00257             
00258             self.local_manager = RemoteManager(local_master, self.new_local_topics)
00259             self.foreign_manager = RemoteManager(foreign_master, self.new_foreign_topics)
00260 
00261             for t in self.local_pub_names:
00262                 self.local_manager.subscribe(t)
00263 
00264             for t in self.foreign_pub_names:
00265                 self.foreign_manager.subscribe(t)
00266 
00267             self.thread = threading.Thread(target=self.spin)
00268             self.thread.start()
00269 
00270         else:
00271             rospy.loginfo("shutdown flag raised, aborting...")
00272 
00273         
00274     def new_local_topics(self, topic, publishers):
00275         topic_type = self.local_manager.get_topic_type(topic)
00276         self.foreign_manager.advertise_list(topic, topic_type, publishers)
00277 
00278 
00279     def new_foreign_topics(self, topic, publishers):
00280         topic_type = self.foreign_manager.get_topic_type(topic)
00281         self.local_manager.advertise_list(topic, topic_type, publishers)
00282 
00283 
00284     def stop(self):
00285         self.stopping = True
00286         self.thread.join()
00287 
00288         if self.local_manager:
00289             self.local_manager.unsubscribe_all()
00290         if self.foreign_manager:
00291             self.foreign_manager.unsubscribe_all()
00292 
00293 
00294     # Spin is necessary to synchronize SERVICES.  Topics work entirely on a callback-driven basis
00295     def spin(self):
00296         r = rospy.Rate(1.0)
00297         while not rospy.is_shutdown() and not self.stopping:
00298             for s in self.local_service_names:
00299                 srv_uri = self.local_manager.lookup_service(s)
00300                 if srv_uri is not None:
00301                     self.foreign_manager.advertise_service(s, srv_uri)
00302                 else:
00303                     self.foreign_manager.unadvertise_service(s)
00304             for s in self.foreign_service_names:
00305                 srv_uri = self.foreign_manager.lookup_service(s)
00306                 if srv_uri is not None:
00307                     self.local_manager.advertise_service(s, srv_uri)
00308                 else:
00309                     self.local_manager.unadvertise_service(s)
00310             r.sleep()
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends


graveyard_rocon_master_sync
Author(s): Daniel Stonier
autogenerated on Wed Jan 23 2013 13:42:00