master_sync.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 # Software License Agreement (BSD License)
00004 #
00005 # Copyright (c) 2010, Willow Garage, Inc.
00006 # All rights reserved.
00007 #
00008 # Redistribution and use in source and binary forms, with or without
00009 # modification, are permitted provided that the following conditions
00010 # are met:
00011 #
00012 #  * Redistributions of source code must retain the above copyright
00013 #    notice, this list of conditions and the following disclaimer.
00014 #  * Redistributions in binary form must reproduce the above
00015 #    copyright notice, this list of conditions and the following
00016 #    disclaimer in the documentation and/or other materials provided
00017 #    with the distribution.
00018 #  * Neither the name of Willow Garage, Inc. nor the names of its
00019 #    contributors may be used to endorse or promote products derived
00020 #    from this software without specific prior written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 #
00035 
00036 NAME="master_sync.py"
00037 
00038 import os
00039 import sys
00040 import socket
00041 import threading
00042 import traceback
00043 import time
00044 import urlparse
00045 
00046 import rosgraph
00047 import rosgraph.network
00048 import rosgraph.names
00049 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler
00050 
00051 import rospy
00052 
00053 from rospy.core import global_name, is_topic
00054 from rospy.impl.validators import non_empty, ParameterInvalid
00055 from rospy.impl.masterslave import apivalidate
00056 
00057 def is_publishers_list(paramName):
00058     return ('is_publishers_list', paramName)
00059 
00060 
00061 class TopicPubListenerHandler(XmlRpcHandler):
00062 
00063     def __init__(self, cb):
00064         super(TopicPubListenerHandler, self).__init__()
00065         self.uri = None
00066         self.cb = cb
00067 
00068     def _ready(self, uri):
00069         self.uri = uri
00070 
00071     def _custom_validate(self, validation, param_name, param_value, caller_id):
00072         if validation == 'is_publishers_list':
00073             if not type(param_value) == list:
00074                 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
00075             for v in param_value:
00076                 if not isinstance(v, basestring):
00077                     raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
00078                 parsed = urlparse.urlparse(v)
00079                 if not parsed[0] or not parsed[1]: #protocol and host
00080                     raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
00081             return param_value
00082         else:
00083             raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
00084 
00085     @apivalidate([])
00086     def getBusStats(self, caller_id):
00087         # not supported
00088         return 1, '', [[], [], []]
00089 
00090     @apivalidate([])
00091     def getBusInfo(self, caller_id):
00092         # not supported
00093         return 1, '', [[], [], []]
00094     
00095     @apivalidate('')
00096     def getMasterUri(self, caller_id):
00097         # not supported
00098         return 1, '', ''
00099         
00100     @apivalidate(0, (None, ))
00101     def shutdown(self, caller_id, msg=''):
00102         return -1, "not authorized", 0
00103 
00104     @apivalidate(-1)
00105     def getPid(self, caller_id):
00106         return -1, "not authorized", 0
00107 
00108     ###############################################################################
00109     # PUB/SUB APIS
00110 
00111     @apivalidate([])
00112     def getSubscriptions(self, caller_id):
00113         return 1, "subscriptions", [[], []]
00114 
00115     @apivalidate([])
00116     def getPublications(self, caller_id):
00117         return 1, "publications", [[], []]
00118     
00119     @apivalidate(-1, (global_name('parameter_key'), None))
00120     def paramUpdate(self, caller_id, parameter_key, parameter_value):
00121         # not supported
00122         return -1, 'not authorized', 0
00123 
00124     @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
00125     def publisherUpdate(self, caller_id, topic, publishers):
00126         self.cb(topic, publishers)
00127     
00128     @apivalidate([], (is_topic('topic'), non_empty('protocols')))
00129     def requestTopic(self, caller_id, topic, protocols):
00130         return 0, "no supported protocol implementations", []
00131 
00132 
00133 class RemoteManager(object):
00134     def __init__(self, master_uri, cb):
00135         self.master_uri = master_uri
00136 
00137         ns = rosgraph.names.get_ros_namespace()
00138         anon_name = rosgraph.names.anonymous_name('master_sync')
00139 
00140         self.master = rosgraph.Master(rosgraph.names.ns_join(ns, anon_name), master_uri=self.master_uri)
00141 
00142         self.cb = cb
00143 
00144         self.type_cache = {}
00145 
00146         self.subs = {}
00147         self.pubs = {}
00148         self.srvs = {}
00149 
00150         rpc_handler = TopicPubListenerHandler(self.new_topics)
00151         self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
00152         self.external_node.start()
00153 
00154         timeout_t = time.time() + 5.
00155         while time.time() < timeout_t and self.external_node.uri is None:
00156             time.sleep(0.01)
00157 
00158 
00159     def get_topic_type(self, query_topic):
00160         query_topic = self.resolve(query_topic)
00161 
00162         if query_topic in self.type_cache:
00163             return self.type_cache[query_topic]
00164         else:
00165             for topic, topic_type in self.master.getTopicTypes():
00166                 self.type_cache[topic] = topic_type
00167             if query_topic in self.type_cache:
00168                 return self.type_cache[query_topic]
00169             else:
00170                 return "*"
00171 
00172     def subscribe(self, topic):
00173         topic = self.resolve(topic)
00174         publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)        
00175         self.subs[(topic, self.external_node.uri)] = self.master
00176         self.new_topics(topic, publishers)
00177 
00178     def advertise(self, topic, topic_type, uri):
00179         topic = self.resolve(topic)
00180 
00181         # Prevent double-advertisements
00182         if (topic, uri) in self.pubs:
00183             return
00184 
00185         # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
00186         anon_name = rosgraph.names.anonymous_name('master_sync')
00187         master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00188 
00189         rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
00190 
00191         master.registerPublisher(topic, topic_type, uri)
00192         self.pubs[(topic, uri)] = master
00193 
00194 
00195     def unadvertise(self, topic, uri):
00196         if (topic, uri) in self.pubs:
00197             m = self.pubs[(topic,uri)]
00198             rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
00199             m.unregisterPublisher(topic,uri)
00200             del self.pubs[(topic,uri)]
00201 
00202 
00203     def advertise_list(self, topic, topic_type, uris):
00204         topic = self.resolve(topic)
00205 
00206         unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
00207         for (t,u) in self.pubs.keys():
00208             if (t,u) in unadv:
00209                 self.unadvertise(t,u)
00210 
00211         for u in uris:
00212             self.advertise(topic, topic_type, u)
00213 
00214     def lookup_service(self, service_name):
00215         service_name = self.resolve(service_name)
00216         try:
00217             return self.master.lookupService(service_name)
00218         except rosgraph.MasterError:
00219             return None
00220 
00221     def advertise_service(self, service_name, uri):
00222 
00223         # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
00224         anon_name = rosgraph.names.anonymous_name('master_sync')
00225         master = rosgraph.Master(anon_name, master_uri=self.master_uri)
00226 
00227         if (service_name) in self.srvs:
00228             if self.srvs[service_name][0] == uri:
00229                 return
00230             else:
00231                 self.unadvertise_service(service_name)
00232 
00233         fake_api = 'http://%s:0'%rosgraph.network.get_host_name()
00234         rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
00235         master.registerService(service_name, uri, fake_api)
00236 
00237         self.srvs[service_name] = (uri, master)
00238 
00239     def unadvertise_service(self, service_name):
00240         if service_name in self.srvs:
00241             uri,m = self.srvs[service_name]
00242             rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
00243             m.unregisterService(service_name, uri)
00244             del self.srvs[service_name]
00245 
00246 
00247     def resolve(self, topic):
00248         ns = rosgraph.names.namespace(self.master.caller_id)
00249         return rosgraph.names.ns_join(ns, topic)
00250 
00251     def unsubscribe_all(self):
00252         for (t,u),m in self.subs.iteritems():
00253             m.unregisterSubscriber(t,u)
00254         for t,u in self.pubs.keys():
00255             self.unadvertise(t,u)
00256         for s in self.srvs.keys():
00257             self.unadvertise_service(s)
00258         
00259     def new_topics(self, topic, publishers):
00260         self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
00261 
00262 
00263 def check_master(m):
00264     try:
00265         m.getUri()
00266         return True
00267     except Exception:
00268         return False
00269 
00270 class MasterSync(object):
00271     def __init__(self):
00272         rospy.init_node('master_sync')
00273 
00274 
00275         self.local_service_names = rospy.get_param('~local_services', {})
00276         self.local_pub_names = rospy.get_param('~local_pubs', [])
00277         self.foreign_service_names = rospy.get_param('~foreign_services', {})
00278         self.foreign_pub_names = rospy.get_param('~foreign_pubs', [])
00279 
00280         # Get master URIs
00281         lm = rosgraph.get_master_uri()
00282         fm = rospy.get_param('~foreign_master', 'http://localhost:11312')
00283 
00284         m = rosgraph.Master(rospy.get_name(), master_uri=fm)
00285         r = rospy.Rate(1)
00286         while not check_master(m) and not rospy.is_shutdown():
00287             rospy.loginfo("Waiting for foreign master to come up...")
00288             r.sleep()
00289 
00290         self.local_manager = None
00291         self.foreign_manager = None
00292 
00293         if not rospy.is_shutdown():
00294 
00295             self.local_manager = RemoteManager(lm, self.new_local_topics)
00296             self.foreign_manager = RemoteManager(fm, self.new_foreign_topics)
00297 
00298             for t in self.local_pub_names:
00299                 self.local_manager.subscribe(t)
00300 
00301             for t in self.foreign_pub_names:
00302                 self.foreign_manager.subscribe(t)
00303         
00304     def new_local_topics(self, topic, publishers):
00305         topic_type = self.local_manager.get_topic_type(topic)
00306         self.foreign_manager.advertise_list(topic, topic_type, publishers)
00307 
00308 
00309     def new_foreign_topics(self, topic, publishers):
00310         topic_type = self.foreign_manager.get_topic_type(topic)
00311         self.local_manager.advertise_list(topic, topic_type, publishers)
00312 
00313 
00314     def spin(self):
00315         r = rospy.Rate(1.0)
00316         while not rospy.is_shutdown():
00317             for s in self.local_service_names:
00318                 srv_uri = self.local_manager.lookup_service(s)
00319                 if srv_uri is not None:
00320                     self.foreign_manager.advertise_service(s, srv_uri)
00321                 else:
00322                     self.foreign_manager.unadvertise_service(s)
00323             for s in self.foreign_service_names:
00324                 srv_uri = self.foreign_manager.lookup_service(s)
00325                 if srv_uri is not None:
00326                     self.local_manager.advertise_service(s, srv_uri)
00327                 else:
00328                     self.local_manager.unadvertise_service(s)
00329             r.sleep()
00330 
00331         if self.local_manager:
00332             self.local_manager.unsubscribe_all()
00333         if self.foreign_manager:
00334             self.foreign_manager.unsubscribe_all()
00335 
00336 
00337 def master_sync_main():
00338     r = MasterSync()
00339     r.spin()
00340     
00341 if __name__ == '__main__':
00342     master_sync_main()


rosproxy
Author(s): Ken Conley
autogenerated on Mon Dec 2 2013 13:03:48