master_sync.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import roslib; roslib.load_manifest('multimaster')
00004 
00005 import threading
00006 import time
00007 import urlparse
00008 
00009 import rosgraph
00010 import rosgraph.names
00011 import rosgraph.network
00012 
00013 import rospy
00014 
00015 from rospy.core import global_name, is_topic
00016 from rospy.impl.validators import non_empty, ParameterInvalid
00017 
00018 from rospy.impl.masterslave import apivalidate
00019 
00020 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler
00021 
00022 import sys
00023 
00024 class MasterSync(object):
00025     def __init__(self):
00026         local_uri   = rosgraph.get_master_uri()
00027         foreign_uri = rospy.get_param('~foreign_master', '')
00028         if not foreign_uri:
00029             raise Exception('foreign_master URI not specified')
00030 
00031         local_pubs   = rospy.get_param('~local_pubs',   [])
00032         foreign_pubs = rospy.get_param('~foreign_pubs', [])
00033 
00034         self._local_services   = rospy.get_param('~local_services',   [])
00035         self._foreign_services = rospy.get_param('~foreign_services', [])
00036 
00037         foreign_master = rosgraph.Master(rospy.get_name(), master_uri=foreign_uri)
00038         r = rospy.Rate(1)
00039         while not _is_master_up(foreign_master) and not rospy.is_shutdown():
00040             rospy.logdebug('Waiting for foreign master to come up...')
00041             r.sleep()
00042 
00043         self._local_manager   = None
00044         self._foreign_manager = None
00045         if not rospy.is_shutdown():
00046             self._local_manager   = _RemoteManager(local_uri,   self._local_publisher_update)
00047             self._foreign_manager = _RemoteManager(foreign_uri, self._foreign_publisher_update)
00048             for t in local_pubs:
00049                 self._local_manager.subscribe(t)
00050             for t in foreign_pubs:
00051                 self._foreign_manager.subscribe(t)
00052 
00053     def _local_publisher_update(self, topic, publishers):
00054         topic_type = self._local_manager.get_topic_type(topic)
00055         
00056         self._foreign_manager.publishers_updated(topic, topic_type, publishers)
00057 
00058     def _foreign_publisher_update(self, topic, publishers):
00059         topic_type = self._foreign_manager.get_topic_type(topic)
00060         
00061         self._local_manager.publishers_updated(topic, topic_type, publishers)
00062 
00063     def spin(self):
00064         # @todo: is this excessively hitting the master?
00065 
00066         r = rospy.Rate(1.0)
00067 
00068         while not rospy.is_shutdown():
00069             for s in self._local_services:
00070                 srv_uri = self._local_manager.lookup_service(s)
00071                 if srv_uri:
00072                     self._foreign_manager.advertise_service(s, srv_uri)
00073                 else:
00074                     self._foreign_manager.unadvertise_service(s)
00075 
00076             for s in self._foreign_services:
00077                 srv_uri = self._foreign_manager.lookup_service(s)
00078                 if srv_uri:
00079                     self._local_manager.advertise_service(s, srv_uri)
00080                 else:
00081                     self._local_manager.unadvertise_service(s)
00082 
00083             r.sleep()
00084 
00085         if self._local_manager:
00086             self._local_manager.unsubscribe_all()
00087         if self._foreign_manager:
00088             self._foreign_manager.unsubscribe_all()
00089 
00090 def _is_master_up(m):
00091     try:
00092         m.getUri()
00093         return True
00094     except Exception:
00095         return False
00096 
00097 class _RemoteManager(object):
00098     def __init__(self, master_uri, new_topics_callback):
00099         self.master_uri          = master_uri
00100         self.new_topics_callback = new_topics_callback
00101 
00102         name = rosgraph.names.ns_join(rosgraph.names.get_ros_namespace(), rosgraph.names.anonymous_name('master_sync'))
00103         self.master = rosgraph.Master(name, master_uri=self.master_uri)
00104 
00105         self._lock = threading.RLock()
00106 
00107         self._type_cache = {}
00108 
00109         self._subs = {}
00110         self._pubs = {}
00111         self._srvs = {}
00112 
00113         self._external_node = XmlRpcNode(rpc_handler=_TopicPubListenerHandler(self._new_topics))
00114         self._external_node.start()
00115 
00116         timeout_t = time.time() + 5.0
00117         while time.time() < timeout_t and self._external_node.uri is None:
00118             time.sleep(0.01)
00119 
00120     def get_topic_type(self, query_topic):
00121         query_topic = self.resolve(query_topic)
00122 
00123         query_topic_type = self._type_cache.get(query_topic)
00124         if query_topic_type:
00125             return query_topic_type
00126 
00127         for topic, topic_type in self.master.getTopicTypes():
00128             self._type_cache[topic] = topic_type
00129 
00130         return self._type_cache.get(query_topic, '*')
00131 
00132     def subscribe(self, topic):
00133         topic = self.resolve(topic)
00134         publishers = self.master.registerSubscriber(topic, '*', self._external_node.uri)        
00135         self._subs[(topic, self._external_node.uri)] = self.master
00136         self._new_topics(topic, publishers)
00137 
00138     def publishers_updated(self, topic, topic_type, uris):
00139         resolved = self.resolve(topic)
00140 
00141         with self._lock:
00142             # Unregister any publishers that no longer exist
00143             uris_set = set(uris)
00144             for t, uri in self._pubs.keys():
00145                 if t == resolved and uri not in uris_set:
00146                     self.unadvertise(t, uri)
00147     
00148             # Register new publishers
00149             for uri in uris:
00150                 if (resolved, uri) not in self._pubs:
00151                     # Registrations need to be anonymous so master doesn't kill us if there's a duplicate name
00152                     rospy.loginfo('Registering (%s, %s) on master %s' % (resolved, uri, self.master_uri))
00153                     anon_name = rosgraph.names.anonymous_name('master_sync')
00154                     master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00155                     master.registerPublisher(resolved, topic_type, uri)
00156     
00157                     self._pubs[(resolved, uri)] = master
00158 
00159     def unadvertise(self, topic, uri):
00160         with self._lock:
00161             if (topic, uri) in self._pubs:
00162                 master = self._pubs[(topic, uri)]
00163     
00164                 rospy.loginfo('Unregistering (%s, %s) from master %s' % (topic, uri, master.master_uri))
00165                 master.unregisterPublisher(topic, uri)
00166                 del self._pubs[(topic, uri)]
00167 
00168     def lookup_service(self, service_name):
00169         service_name = self.resolve(service_name)
00170         try:
00171             return self.master.lookupService(service_name)
00172         except:
00173             return None
00174 
00175     def advertise_service(self, service_name, uri):
00176         # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
00177         anon_name = rosgraph.names.anonymous_name('master_sync')
00178         master = rosgraph.masterapi.Master(anon_name, master_uri=self.master_uri)
00179 
00180         if service_name in self._srvs:
00181             if self._srvs[service_name][0] == uri:
00182                 return
00183             self.unadvertise_service(service_name)
00184 
00185         rospy.loginfo('Registering service (%s, %s) on master %s' % (service_name, uri, master.master_uri))
00186 
00187         fake_api = 'http://%s:0' % roslib.network.get_host_name()
00188         master.registerService(service_name, uri, fake_api)
00189 
00190         self._srvs[service_name] = (uri, master)
00191 
00192     def unadvertise_service(self, service_name):
00193         if service_name in self._srvs:
00194             uri, master = self._srvs[service_name]
00195             
00196             rospy.loginfo('Unregistering service (%s, %s) from master %s' % (service_name, uri, master.master_uri))
00197             master.unregisterService(service_name, uri)
00198 
00199             del self._srvs[service_name]
00200 
00201     def resolve(self, topic):
00202         return rosgraph.names.ns_join(rosgraph.names.namespace(self.master.caller_id), topic)
00203 
00204     def unsubscribe_all(self):
00205         for (topic, uri), master in self._subs.iteritems():
00206             master.unregisterSubscriber(topic, uri)
00207         for topic, uri in self._pubs.keys():
00208             self.unadvertise(topic, uri)
00209         for service in self._srvs.keys():
00210             self.unadvertise_service(service)
00211 
00212     def _new_topics(self, topic, publishers):
00213         self.new_topics_callback(topic, [p for p in publishers if (topic, p) not in self._pubs])
00214 
00215 def is_publishers_list(paramName):
00216     return ('is_publishers_list', paramName)
00217 
00218 class _TopicPubListenerHandler(XmlRpcHandler):
00219     def __init__(self, publisher_update_callback):
00220         super(_TopicPubListenerHandler, self).__init__()
00221         
00222         self.uri                       = None
00223         self.publisher_update_callback = publisher_update_callback
00224 
00225     def _ready(self, uri):
00226         self.uri = uri
00227 
00228     def _custom_validate(self, validation, param_name, param_value, caller_id):
00229         if validation == 'is_publishers_list':
00230             if not type(param_value) == list:
00231                 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00232             for v in param_value:
00233                 if not isinstance(v, basestring):
00234                     raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00235                 parsed = urlparse.urlparse(v)
00236                 if not parsed[0] or not parsed[1]: #protocol and host
00237                     raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00238             return param_value
00239         else:
00240             raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00241 
00242     @apivalidate([])
00243     def getBusStats(self, caller_id):
00244         # not supported
00245         return 1, '', [[], [], []]
00246 
00247     @apivalidate([])
00248     def getBusInfo(self, caller_id):
00249         # not supported
00250         return 1, '', [[], [], []]
00251     
00252     @apivalidate('')
00253     def getMasterUri(self, caller_id):
00254         # not supported
00255         return 1, '', ''
00256         
00257     @apivalidate(0, (None, ))
00258     def shutdown(self, caller_id, msg=''):
00259         return -1, 'not authorized', 0
00260 
00261     @apivalidate(-1)
00262     def getPid(self, caller_id):
00263         return -1, 'not authorized', 0
00264 
00265     # pub/sub
00266 
00267     @apivalidate([])
00268     def getSubscriptions(self, caller_id):
00269         return 1, 'subscriptions', [[], []]
00270 
00271     @apivalidate([])
00272     def getPublications(self, caller_id):
00273         return 1, 'publications', [[], []]
00274     
00275     @apivalidate(-1, (global_name('parameter_key'), None))
00276     def paramUpdate(self, caller_id, parameter_key, parameter_value):
00277         # not supported
00278         return -1, 'not authorized', 0
00279 
00280     @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00281     def publisherUpdate(self, caller_id, topic, publishers):
00282         self.publisher_update_callback(topic, publishers)
00283     
00284     @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00285     def requestTopic(self, caller_id, topic, protocols):
00286         return 0, 'no supported protocol implementations', []
00287 
00288 if __name__ == '__main__':
00289     try:
00290         rospy.init_node('master_sync')
00291         
00292         sync = MasterSync()
00293         sync.spin()
00294     except KeyboardInterrupt:
00295         pass
00296     except Exception as ex:
00297         import traceback
00298         rospy.logfatal(traceback.format_exc()) 
00299         rospy.logfatal(str(ex))
00300         time.sleep(1.0)
00301         sys.exit(1)


multimaster
Author(s): Wim Meeussen
autogenerated on Mon Dec 2 2013 11:36:32