master_sync.py
Go to the documentation of this file.
1 import threading
2 import time
3 import urlparse
4 
5 import rosgraph
6 import rosgraph.names
7 import rosgraph.network
8 
9 import rospy
10 
11 from rospy.core import global_name, is_topic
12 from rospy.impl.validators import non_empty, ParameterInvalid
13 
14 from rospy.impl.masterslave import apivalidate
15 
16 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler
17 
18 def is_publishers_list(paramName):
19  return ('is_publishers_list', paramName)
20 
21 
22 class TopicPubListenerHandler(XmlRpcHandler):
23 
24  def __init__(self, cb):
25  super(TopicPubListenerHandler, self).__init__()
26  self.uri = None
27  self.cb = cb
28 
29  def _ready(self, uri):
30  self.uri = uri
31 
32  def _custom_validate(self, validation, param_name, param_value, caller_id):
33  if validation == 'is_publishers_list':
34  if not type(param_value) == list:
35  raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
36  for v in param_value:
37  if not isinstance(v, basestring):
38  raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
39  parsed = urlparse.urlparse(v)
40  if not parsed[0] or not parsed[1]: #protocol and host
41  raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
42  return param_value
43  else:
44  raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
45 
46  @apivalidate([])
47  def getBusStats(self, caller_id):
48  # not supported
49  return 1, '', [[], [], []]
50 
51  @apivalidate([])
52  def getBusInfo(self, caller_id):
53  # not supported
54  return 1, '', [[], [], []]
55 
56  @apivalidate('')
57  def getMasterUri(self, caller_id):
58  # not supported
59  return 1, '', ''
60 
61  @apivalidate(0, (None, ))
62  def shutdown(self, caller_id, msg=''):
63  return -1, "not authorized", 0
64 
65  @apivalidate(-1)
66  def getPid(self, caller_id):
67  return -1, "not authorized", 0
68 
69  ###############################################################################
70  # PUB/SUB APIS
71 
72  @apivalidate([])
73  def getSubscriptions(self, caller_id):
74  return 1, "subscriptions", [[], []]
75 
76  @apivalidate([])
77  def getPublications(self, caller_id):
78  return 1, "publications", [[], []]
79 
80  @apivalidate(-1, (global_name('parameter_key'), None))
81  def paramUpdate(self, caller_id, parameter_key, parameter_value):
82  # not supported
83  return -1, 'not authorized', 0
84 
85  @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
86  def publisherUpdate(self, caller_id, topic, publishers):
87  self.cb(topic, publishers)
88 
89  @apivalidate([], (is_topic('topic'), non_empty('protocols')))
90  def requestTopic(self, caller_id, topic, protocols):
91  return 0, "no supported protocol implementations", []
92 
93 
94 class RemoteManager(object):
95  def __init__(self, master_uri, cb):
96  self.master_uri = master_uri
97 
98  ns = rosgraph.names.get_ros_namespace()
99  anon_name = rosgraph.names.anonymous_name('master_sync')
100 
101  self.master = rosgraph.Master(rosgraph.names.ns_join(ns, anon_name), master_uri=self.master_uri)
102 
103  self.cb = cb
104 
105  self.type_cache = {}
106 
107  self.subs = {}
108  self.pubs = {}
109  self.srvs = {}
110 
111  rpc_handler = TopicPubListenerHandler(self.new_topics)
112  self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
113  self.external_node.start()
114 
115  timeout_t = time.time() + 5.
116  while time.time() < timeout_t and self.external_node.uri is None:
117  time.sleep(0.01)
118 
119 
120  def get_topic_type(self, query_topic):
121  query_topic = self.resolve(query_topic)
122 
123  if query_topic in self.type_cache:
124  return self.type_cache[query_topic]
125  else:
126  for topic, topic_type in self.master.getTopicTypes():
127  self.type_cache[topic] = topic_type
128  if query_topic in self.type_cache:
129  return self.type_cache[query_topic]
130  else:
131  return "*"
132 
133  def subscribe(self, topic):
134  topic = self.resolve(topic)
135  publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)
136  self.subs[(topic, self.external_node.uri)] = self.master
137  self.new_topics(topic, publishers)
138 
139  def advertise(self, topic, topic_type, uri):
140  topic = self.resolve(topic)
141 
142  # Prevent double-advertisements
143  if (topic, uri) in self.pubs:
144  return
145 
146  # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
147  anon_name = rosgraph.names.anonymous_name('master_sync')
148  master = rosgraph.Master(anon_name, master_uri=self.master_uri)
149 
150  rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
151 
152  master.registerPublisher(topic, topic_type, uri)
153  self.pubs[(topic, uri)] = master
154 
155 
156  def unadvertise(self, topic, uri):
157  if (topic, uri) in self.pubs:
158  m = self.pubs[(topic,uri)]
159  rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
160  m.unregisterPublisher(topic,uri)
161  del self.pubs[(topic,uri)]
162 
163 
164  def advertise_list(self, topic, topic_type, uris):
165  topic = self.resolve(topic)
166 
167  unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
168  for (t,u) in self.pubs.keys():
169  if (t,u) in unadv:
170  self.unadvertise(t,u)
171 
172  for u in uris:
173  self.advertise(topic, topic_type, u)
174 
175  def lookup_service(self, service_name):
176  service_name = self.resolve(service_name)
177  try:
178  return self.master.lookupService(service_name)
179  except rosgraph.MasterError:
180  return None
181 
182  def advertise_service(self, service_name, uri):
183 
184  # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
185  anon_name = rosgraph.names.anonymous_name('master_sync')
186  master = rosgraph.Master(anon_name, master_uri=self.master_uri)
187 
188  if (service_name) in self.srvs:
189  if self.srvs[service_name][0] == uri:
190  return
191  else:
192  self.unadvertise_service(service_name)
193 
194  fake_api = 'http://%s:0'%rosgraph.network.get_host_name()
195  rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
196  master.registerService(service_name, uri, fake_api)
197 
198  self.srvs[service_name] = (uri, master)
199 
200  def unadvertise_service(self, service_name):
201  if service_name in self.srvs:
202  uri,m = self.srvs[service_name]
203  rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
204  m.unregisterService(service_name, uri)
205  del self.srvs[service_name]
206 
207 
208  def resolve(self, topic):
209  ns = rosgraph.names.namespace(self.master.caller_id)
210  return rosgraph.names.ns_join(ns, topic)
211 
212  def unsubscribe_all(self):
213  for (t,u),m in self.subs.iteritems():
214  m.unregisterSubscriber(t,u)
215  for t,u in self.pubs.keys():
216  self.unadvertise(t,u)
217  for s in self.srvs.keys():
218  self.unadvertise_service(s)
219 
220  def new_topics(self, topic, publishers):
221  self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
222 
223 
225  try:
226  m.getUri()
227  return True
228  except Exception:
229  return False
230 
231 class MasterSync(object):
232  def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
233 
234  self.local_service_names = local_service_names
235  self.local_pub_names = local_pub_names
236  self.foreign_service_names = foreign_service_names
237  self.foreign_pub_names = foreign_pub_names
238 
239  self.local_manager = None
240  self.foreign_manager = None
241  self.stopping = False
242  self.thread = None
243 
244  # Get master URIs
245  local_master = rosgraph.get_master_uri()
246 
247  m = rosgraph.Master(rospy.get_name(), master_uri=foreign_master)
248  r = rospy.Rate(1)
249  rospy.loginfo("Waiting for foreign master [%s] to come up..."%(foreign_master))
250  while not check_master(m) and not rospy.is_shutdown():
251  r.sleep()
252 
253  if not rospy.is_shutdown():
254  rospy.loginfo("Foreign master is available")
255 
256  self.local_manager = RemoteManager(local_master, self.new_local_topics)
257  self.foreign_manager = RemoteManager(foreign_master, self.new_foreign_topics)
258 
259  for t in self.local_pub_names:
260  self.local_manager.subscribe(t)
261 
262  for t in self.foreign_pub_names:
263  self.foreign_manager.subscribe(t)
264 
265  self.thread = threading.Thread(target=self.spin)
266  self.thread.start()
267 
268  else:
269  rospy.loginfo("shutdown flag raised, aborting...")
270 
271 
272  def new_local_topics(self, topic, publishers):
273  topic_type = self.local_manager.get_topic_type(topic)
274  self.foreign_manager.advertise_list(topic, topic_type, publishers)
275 
276 
277  def new_foreign_topics(self, topic, publishers):
278  topic_type = self.foreign_manager.get_topic_type(topic)
279  self.local_manager.advertise_list(topic, topic_type, publishers)
280 
281 
282  def stop(self):
283  self.stopping = True
284  self.thread.join()
285 
286  if self.local_manager:
287  self.local_manager.unsubscribe_all()
288  if self.foreign_manager:
289  self.foreign_manager.unsubscribe_all()
290 
291 
292  # Spin is necessary to synchronize SERVICES. Topics work entirely on a callback-driven basis
293  def spin(self):
294  r = rospy.Rate(1.0)
295  while not rospy.is_shutdown() and not self.stopping:
296  for s in self.local_service_names:
297  srv_uri = self.local_manager.lookup_service(s)
298  if srv_uri is not None:
299  self.foreign_manager.advertise_service(s, srv_uri)
300  else:
301  self.foreign_manager.unadvertise_service(s)
302  for s in self.foreign_service_names:
303  srv_uri = self.foreign_manager.lookup_service(s)
304  if srv_uri is not None:
305  self.local_manager.advertise_service(s, srv_uri)
306  else:
307  self.local_manager.unadvertise_service(s)
308  r.sleep()
def advertise(self, topic, topic_type, uri)
Definition: master_sync.py:139
def unadvertise(self, topic, uri)
Definition: master_sync.py:156
def lookup_service(self, service_name)
Definition: master_sync.py:175
def new_local_topics(self, topic, publishers)
Definition: master_sync.py:272
def _custom_validate(self, validation, param_name, param_value, caller_id)
Definition: master_sync.py:32
def __init__(self, master_uri, cb)
Definition: master_sync.py:95
def paramUpdate(self, caller_id, parameter_key, parameter_value)
Definition: master_sync.py:81
def new_topics(self, topic, publishers)
Definition: master_sync.py:220
def shutdown(self, caller_id, msg='')
Definition: master_sync.py:62
def unadvertise_service(self, service_name)
Definition: master_sync.py:200
def requestTopic(self, caller_id, topic, protocols)
Definition: master_sync.py:90
def advertise_list(self, topic, topic_type, uris)
Definition: master_sync.py:164
def __init__(self, foreign_master, local_service_names=[], local_pub_names=[], foreign_service_names=[], foreign_pub_names=[])
Definition: master_sync.py:232
def getSubscriptions(self, caller_id)
PUB/SUB APIS.
Definition: master_sync.py:73
def is_publishers_list(paramName)
Definition: master_sync.py:18
def advertise_service(self, service_name, uri)
Definition: master_sync.py:182
def new_foreign_topics(self, topic, publishers)
Definition: master_sync.py:277
def get_topic_type(self, query_topic)
Definition: master_sync.py:120
def publisherUpdate(self, caller_id, topic, publishers)
Definition: master_sync.py:86


app_manager
Author(s): Jeremy Leibs, Ken Conley, Yuki Furuta
autogenerated on Tue Apr 2 2019 02:58:24