7 import rosgraph.network
11 from rospy.core
import global_name, is_topic
12 from rospy.impl.validators
import non_empty, ParameterInvalid
14 from rospy.impl.masterslave
import apivalidate
16 from rosgraph.xmlrpc
import XmlRpcNode, XmlRpcHandler
19 return (
'is_publishers_list', paramName)
25 super(TopicPubListenerHandler, self).
__init__()
33 if validation ==
'is_publishers_list':
34 if not type(param_value) == list:
35 raise ParameterInvalid(
"ERROR: param [%s] must be a list"%param_name)
37 if not isinstance(v, basestring):
38 raise ParameterInvalid(
"ERROR: param [%s] must be a list of strings"%param_name)
39 parsed = urlparse.urlparse(v)
40 if not parsed[0]
or not parsed[1]:
41 raise ParameterInvalid(
"ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
44 raise ParameterInvalid(
"ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
49 return 1,
'', [[], [], []]
54 return 1,
'', [[], [], []]
61 @apivalidate(0, (
None, ))
63 return -1,
"not authorized", 0
67 return -1,
"not authorized", 0
74 return 1,
"subscriptions", [[], []]
78 return 1,
"publications", [[], []]
80 @apivalidate(-1, (global_name(
'parameter_key'),
None))
81 def paramUpdate(self, caller_id, parameter_key, parameter_value):
83 return -1,
'not authorized', 0
87 self.
cb(topic, publishers)
89 @apivalidate([], (is_topic(
'topic'), non_empty(
'protocols')))
91 return 0,
"no supported protocol implementations", []
98 ns = rosgraph.names.get_ros_namespace()
99 anon_name = rosgraph.names.anonymous_name(
'master_sync')
101 self.
master = rosgraph.Master(rosgraph.names.ns_join(ns, anon_name), master_uri=self.
master_uri)
113 self.external_node.start()
115 timeout_t = time.time() + 5.
116 while time.time() < timeout_t
and self.external_node.uri
is None:
121 query_topic = self.
resolve(query_topic)
126 for topic, topic_type
in self.master.getTopicTypes():
135 publishers = self.master.registerSubscriber(topic,
'*', self.external_node.uri)
136 self.
subs[(topic, self.external_node.uri)] = self.
master 143 if (topic, uri)
in self.
pubs:
147 anon_name = rosgraph.names.anonymous_name(
'master_sync')
148 master = rosgraph.Master(anon_name, master_uri=self.
master_uri)
150 rospy.loginfo(
"Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
152 master.registerPublisher(topic, topic_type, uri)
153 self.
pubs[(topic, uri)] = master
157 if (topic, uri)
in self.
pubs:
158 m = self.
pubs[(topic,uri)]
159 rospy.loginfo(
"Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
160 m.unregisterPublisher(topic,uri)
161 del self.
pubs[(topic,uri)]
167 unadv = set((t,u)
for (t,u)
in self.pubs.iterkeys()
if t == topic) - set([(topic, u)
for u
in uris])
168 for (t,u)
in self.pubs.keys():
176 service_name = self.
resolve(service_name)
178 return self.master.lookupService(service_name)
179 except rosgraph.MasterError:
185 anon_name = rosgraph.names.anonymous_name(
'master_sync')
186 master = rosgraph.Master(anon_name, master_uri=self.
master_uri)
188 if (service_name)
in self.
srvs:
189 if self.
srvs[service_name][0] == uri:
194 fake_api =
'http://%s:0'%rosgraph.network.get_host_name()
195 rospy.loginfo(
"Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
196 master.registerService(service_name, uri, fake_api)
198 self.
srvs[service_name] = (uri, master)
201 if service_name
in self.
srvs:
202 uri,m = self.
srvs[service_name]
203 rospy.loginfo(
"Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
204 m.unregisterService(service_name, uri)
205 del self.
srvs[service_name]
209 ns = rosgraph.names.namespace(self.master.caller_id)
210 return rosgraph.names.ns_join(ns, topic)
213 for (t,u),m
in self.subs.iteritems():
214 m.unregisterSubscriber(t,u)
215 for t,u
in self.pubs.keys():
217 for s
in self.srvs.keys():
221 self.
cb(topic, [p
for p
in publishers
if (topic,p)
not in self.
pubs])
232 def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
245 local_master = rosgraph.get_master_uri()
247 m = rosgraph.Master(rospy.get_name(), master_uri=foreign_master)
249 rospy.loginfo(
"Waiting for foreign master [%s] to come up..."%(foreign_master))
253 if not rospy.is_shutdown():
254 rospy.loginfo(
"Foreign master is available")
260 self.local_manager.subscribe(t)
263 self.foreign_manager.subscribe(t)
265 self.
thread = threading.Thread(target=self.
spin)
269 rospy.loginfo(
"shutdown flag raised, aborting...")
273 topic_type = self.local_manager.get_topic_type(topic)
274 self.foreign_manager.advertise_list(topic, topic_type, publishers)
278 topic_type = self.foreign_manager.get_topic_type(topic)
279 self.local_manager.advertise_list(topic, topic_type, publishers)
287 self.local_manager.unsubscribe_all()
289 self.foreign_manager.unsubscribe_all()
295 while not rospy.is_shutdown()
and not self.
stopping:
297 srv_uri = self.local_manager.lookup_service(s)
298 if srv_uri
is not None:
299 self.foreign_manager.advertise_service(s, srv_uri)
301 self.foreign_manager.unadvertise_service(s)
303 srv_uri = self.foreign_manager.lookup_service(s)
304 if srv_uri
is not None:
305 self.local_manager.advertise_service(s, srv_uri)
307 self.local_manager.unadvertise_service(s)
def advertise(self, topic, topic_type, uri)
def unadvertise(self, topic, uri)
def getBusStats(self, caller_id)
def lookup_service(self, service_name)
def subscribe(self, topic)
def new_local_topics(self, topic, publishers)
def _custom_validate(self, validation, param_name, param_value, caller_id)
def __init__(self, master_uri, cb)
def paramUpdate(self, caller_id, parameter_key, parameter_value)
def getMasterUri(self, caller_id)
def getBusInfo(self, caller_id)
def new_topics(self, topic, publishers)
def shutdown(self, caller_id, msg='')
def unsubscribe_all(self)
def getPublications(self, caller_id)
def unadvertise_service(self, service_name)
def requestTopic(self, caller_id, topic, protocols)
def advertise_list(self, topic, topic_type, uris)
def __init__(self, foreign_master, local_service_names=[], local_pub_names=[], foreign_service_names=[], foreign_pub_names=[])
def getSubscriptions(self, caller_id)
PUB/SUB APIS.
def is_publishers_list(paramName)
def advertise_service(self, service_name, uri)
def getPid(self, caller_id)
def new_foreign_topics(self, topic, publishers)
def get_topic_type(self, query_topic)
def publisherUpdate(self, caller_id, topic, publishers)