3 if sys.version_info[0] == 3:
4 from urllib.parse
import urlparse
14 import rosgraph.network
18 from rospy.core
import global_name, is_topic
19 from rospy.impl.validators
import non_empty, ParameterInvalid
21 from rospy.impl.masterslave
import apivalidate
23 from rosgraph.xmlrpc
import XmlRpcNode, XmlRpcHandler
26 return (
'is_publishers_list', paramName)
32 super(TopicPubListenerHandler, self).
__init__()
40 if validation ==
'is_publishers_list':
41 if not type(param_value) == list:
42 raise ParameterInvalid(
"ERROR: param [%s] must be a list"%param_name)
44 if not isinstance(v, basestring):
45 raise ParameterInvalid(
"ERROR: param [%s] must be a list of strings"%param_name)
46 parsed = urlparse.urlparse(v)
47 if not parsed[0]
or not parsed[1]:
48 raise ParameterInvalid(
"ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
51 raise ParameterInvalid(
"ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
56 return 1,
'', [[], [], []]
61 return 1,
'', [[], [], []]
68 @apivalidate(0, (
None, ))
70 return -1,
"not authorized", 0
74 return -1,
"not authorized", 0
81 return 1,
"subscriptions", [[], []]
85 return 1,
"publications", [[], []]
87 @apivalidate(-1, (global_name(
'parameter_key'),
None))
88 def paramUpdate(self, caller_id, parameter_key, parameter_value):
90 return -1,
'not authorized', 0
94 self.
cb(topic, publishers)
96 @apivalidate([], (is_topic(
'topic'), non_empty(
'protocols')))
98 return 0,
"no supported protocol implementations", []
105 ns = rosgraph.names.get_ros_namespace()
106 anon_name = rosgraph.names.anonymous_name(
'master_sync')
108 self.
master = rosgraph.Master(rosgraph.names.ns_join(ns, anon_name), master_uri=self.
master_uri)
120 self.external_node.start()
122 timeout_t = time.time() + 5.
123 while time.time() < timeout_t
and self.external_node.uri
is None:
128 query_topic = self.
resolve(query_topic)
133 for topic, topic_type
in self.master.getTopicTypes():
142 publishers = self.master.registerSubscriber(topic,
'*', self.external_node.uri)
143 self.
subs[(topic, self.external_node.uri)] = self.
master 150 if (topic, uri)
in self.
pubs:
154 anon_name = rosgraph.names.anonymous_name(
'master_sync')
155 master = rosgraph.Master(anon_name, master_uri=self.
master_uri)
157 rospy.loginfo(
"Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
159 master.registerPublisher(topic, topic_type, uri)
160 self.
pubs[(topic, uri)] = master
164 if (topic, uri)
in self.
pubs:
165 m = self.
pubs[(topic,uri)]
166 rospy.loginfo(
"Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
167 m.unregisterPublisher(topic,uri)
168 del self.
pubs[(topic,uri)]
174 unadv = set((t,u)
for (t,u)
in self.pubs.iterkeys()
if t == topic) - set([(topic, u)
for u
in uris])
175 for (t,u)
in self.pubs.keys():
183 service_name = self.
resolve(service_name)
185 return self.master.lookupService(service_name)
186 except rosgraph.MasterError:
192 anon_name = rosgraph.names.anonymous_name(
'master_sync')
193 master = rosgraph.Master(anon_name, master_uri=self.
master_uri)
195 if (service_name)
in self.
srvs:
196 if self.
srvs[service_name][0] == uri:
201 fake_api =
'http://%s:0'%rosgraph.network.get_host_name()
202 rospy.loginfo(
"Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
203 master.registerService(service_name, uri, fake_api)
205 self.
srvs[service_name] = (uri, master)
208 if service_name
in self.
srvs:
209 uri,m = self.
srvs[service_name]
210 rospy.loginfo(
"Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
211 m.unregisterService(service_name, uri)
212 del self.
srvs[service_name]
216 ns = rosgraph.names.namespace(self.master.caller_id)
217 return rosgraph.names.ns_join(ns, topic)
220 for (t,u),m
in self.subs.iteritems():
221 m.unregisterSubscriber(t,u)
222 for t,u
in self.pubs.keys():
224 for s
in self.srvs.keys():
228 self.
cb(topic, [p
for p
in publishers
if (topic,p)
not in self.
pubs])
239 def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
252 local_master = rosgraph.get_master_uri()
254 m = rosgraph.Master(rospy.get_name(), master_uri=foreign_master)
256 rospy.loginfo(
"Waiting for foreign master [%s] to come up..."%(foreign_master))
260 if not rospy.is_shutdown():
261 rospy.loginfo(
"Foreign master is available")
267 self.local_manager.subscribe(t)
270 self.foreign_manager.subscribe(t)
272 self.
thread = threading.Thread(target=self.
spin)
276 rospy.loginfo(
"shutdown flag raised, aborting...")
280 topic_type = self.local_manager.get_topic_type(topic)
281 self.foreign_manager.advertise_list(topic, topic_type, publishers)
285 topic_type = self.foreign_manager.get_topic_type(topic)
286 self.local_manager.advertise_list(topic, topic_type, publishers)
294 self.local_manager.unsubscribe_all()
296 self.foreign_manager.unsubscribe_all()
302 while not rospy.is_shutdown()
and not self.
stopping:
304 srv_uri = self.local_manager.lookup_service(s)
305 if srv_uri
is not None:
306 self.foreign_manager.advertise_service(s, srv_uri)
308 self.foreign_manager.unadvertise_service(s)
310 srv_uri = self.foreign_manager.lookup_service(s)
311 if srv_uri
is not None:
312 self.local_manager.advertise_service(s, srv_uri)
314 self.local_manager.unadvertise_service(s)
def advertise(self, topic, topic_type, uri)
def unadvertise(self, topic, uri)
def getBusStats(self, caller_id)
def lookup_service(self, service_name)
def subscribe(self, topic)
def new_local_topics(self, topic, publishers)
def _custom_validate(self, validation, param_name, param_value, caller_id)
def __init__(self, master_uri, cb)
def paramUpdate(self, caller_id, parameter_key, parameter_value)
def getMasterUri(self, caller_id)
def getBusInfo(self, caller_id)
def new_topics(self, topic, publishers)
def shutdown(self, caller_id, msg='')
def unsubscribe_all(self)
def getPublications(self, caller_id)
def unadvertise_service(self, service_name)
def requestTopic(self, caller_id, topic, protocols)
def advertise_list(self, topic, topic_type, uris)
def __init__(self, foreign_master, local_service_names=[], local_pub_names=[], foreign_service_names=[], foreign_pub_names=[])
def getSubscriptions(self, caller_id)
PUB/SUB APIS.
def is_publishers_list(paramName)
def advertise_service(self, service_name, uri)
def getPid(self, caller_id)
def new_foreign_topics(self, topic, publishers)
def get_topic_type(self, query_topic)
def publisherUpdate(self, caller_id, topic, publishers)