master_sync.py
Go to the documentation of this file.
1 import sys
2 
3 if sys.version_info[0] == 3:
4  from urllib.parse import urlparse # python3 move urlparse to urllib.parse
5 else:
6  import urlparse
7 
8 
9 import threading
10 import time
11 
12 import rosgraph
13 import rosgraph.names
14 import rosgraph.network
15 
16 import rospy
17 
18 from rospy.core import global_name, is_topic
19 from rospy.impl.validators import non_empty, ParameterInvalid
20 
21 from rospy.impl.masterslave import apivalidate
22 
23 from rosgraph.xmlrpc import XmlRpcNode, XmlRpcHandler
24 
25 def is_publishers_list(paramName):
26  return ('is_publishers_list', paramName)
27 
28 
29 class TopicPubListenerHandler(XmlRpcHandler):
30 
31  def __init__(self, cb):
32  super(TopicPubListenerHandler, self).__init__()
33  self.uri = None
34  self.cb = cb
35 
36  def _ready(self, uri):
37  self.uri = uri
38 
39  def _custom_validate(self, validation, param_name, param_value, caller_id):
40  if validation == 'is_publishers_list':
41  if not type(param_value) == list:
42  raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
43  for v in param_value:
44  if not isinstance(v, basestring):
45  raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
46  parsed = urlparse.urlparse(v)
47  if not parsed[0] or not parsed[1]: #protocol and host
48  raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
49  return param_value
50  else:
51  raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
52 
53  @apivalidate([])
54  def getBusStats(self, caller_id):
55  # not supported
56  return 1, '', [[], [], []]
57 
58  @apivalidate([])
59  def getBusInfo(self, caller_id):
60  # not supported
61  return 1, '', [[], [], []]
62 
63  @apivalidate('')
64  def getMasterUri(self, caller_id):
65  # not supported
66  return 1, '', ''
67 
68  @apivalidate(0, (None, ))
69  def shutdown(self, caller_id, msg=''):
70  return -1, "not authorized", 0
71 
72  @apivalidate(-1)
73  def getPid(self, caller_id):
74  return -1, "not authorized", 0
75 
76  ###############################################################################
77  # PUB/SUB APIS
78 
79  @apivalidate([])
80  def getSubscriptions(self, caller_id):
81  return 1, "subscriptions", [[], []]
82 
83  @apivalidate([])
84  def getPublications(self, caller_id):
85  return 1, "publications", [[], []]
86 
87  @apivalidate(-1, (global_name('parameter_key'), None))
88  def paramUpdate(self, caller_id, parameter_key, parameter_value):
89  # not supported
90  return -1, 'not authorized', 0
91 
92  @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
93  def publisherUpdate(self, caller_id, topic, publishers):
94  self.cb(topic, publishers)
95 
96  @apivalidate([], (is_topic('topic'), non_empty('protocols')))
97  def requestTopic(self, caller_id, topic, protocols):
98  return 0, "no supported protocol implementations", []
99 
100 
101 class RemoteManager(object):
102  def __init__(self, master_uri, cb):
103  self.master_uri = master_uri
104 
105  ns = rosgraph.names.get_ros_namespace()
106  anon_name = rosgraph.names.anonymous_name('master_sync')
107 
108  self.master = rosgraph.Master(rosgraph.names.ns_join(ns, anon_name), master_uri=self.master_uri)
109 
110  self.cb = cb
111 
112  self.type_cache = {}
113 
114  self.subs = {}
115  self.pubs = {}
116  self.srvs = {}
117 
118  rpc_handler = TopicPubListenerHandler(self.new_topics)
119  self.external_node = XmlRpcNode(rpc_handler=rpc_handler)
120  self.external_node.start()
121 
122  timeout_t = time.time() + 5.
123  while time.time() < timeout_t and self.external_node.uri is None:
124  time.sleep(0.01)
125 
126 
127  def get_topic_type(self, query_topic):
128  query_topic = self.resolve(query_topic)
129 
130  if query_topic in self.type_cache:
131  return self.type_cache[query_topic]
132  else:
133  for topic, topic_type in self.master.getTopicTypes():
134  self.type_cache[topic] = topic_type
135  if query_topic in self.type_cache:
136  return self.type_cache[query_topic]
137  else:
138  return "*"
139 
140  def subscribe(self, topic):
141  topic = self.resolve(topic)
142  publishers = self.master.registerSubscriber(topic, '*', self.external_node.uri)
143  self.subs[(topic, self.external_node.uri)] = self.master
144  self.new_topics(topic, publishers)
145 
146  def advertise(self, topic, topic_type, uri):
147  topic = self.resolve(topic)
148 
149  # Prevent double-advertisements
150  if (topic, uri) in self.pubs:
151  return
152 
153  # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
154  anon_name = rosgraph.names.anonymous_name('master_sync')
155  master = rosgraph.Master(anon_name, master_uri=self.master_uri)
156 
157  rospy.loginfo("Registering (%s,%s) on master %s"%(topic,uri,master.master_uri))
158 
159  master.registerPublisher(topic, topic_type, uri)
160  self.pubs[(topic, uri)] = master
161 
162 
163  def unadvertise(self, topic, uri):
164  if (topic, uri) in self.pubs:
165  m = self.pubs[(topic,uri)]
166  rospy.loginfo("Unregistering (%s,%s) from master %s"%(topic,uri,m.master_uri))
167  m.unregisterPublisher(topic,uri)
168  del self.pubs[(topic,uri)]
169 
170 
171  def advertise_list(self, topic, topic_type, uris):
172  topic = self.resolve(topic)
173 
174  unadv = set((t,u) for (t,u) in self.pubs.iterkeys() if t == topic) - set([(topic, u) for u in uris])
175  for (t,u) in self.pubs.keys():
176  if (t,u) in unadv:
177  self.unadvertise(t,u)
178 
179  for u in uris:
180  self.advertise(topic, topic_type, u)
181 
182  def lookup_service(self, service_name):
183  service_name = self.resolve(service_name)
184  try:
185  return self.master.lookupService(service_name)
186  except rosgraph.MasterError:
187  return None
188 
189  def advertise_service(self, service_name, uri):
190 
191  # These registrations need to be anonymous so the master doesn't kill us if there is a duplicate name
192  anon_name = rosgraph.names.anonymous_name('master_sync')
193  master = rosgraph.Master(anon_name, master_uri=self.master_uri)
194 
195  if (service_name) in self.srvs:
196  if self.srvs[service_name][0] == uri:
197  return
198  else:
199  self.unadvertise_service(service_name)
200 
201  fake_api = 'http://%s:0'%rosgraph.network.get_host_name()
202  rospy.loginfo("Registering service (%s,%s) on master %s"%(service_name, uri, master.master_uri))
203  master.registerService(service_name, uri, fake_api)
204 
205  self.srvs[service_name] = (uri, master)
206 
207  def unadvertise_service(self, service_name):
208  if service_name in self.srvs:
209  uri,m = self.srvs[service_name]
210  rospy.loginfo("Unregistering service (%s,%s) from master %s"%(service_name, uri, m.master_uri))
211  m.unregisterService(service_name, uri)
212  del self.srvs[service_name]
213 
214 
215  def resolve(self, topic):
216  ns = rosgraph.names.namespace(self.master.caller_id)
217  return rosgraph.names.ns_join(ns, topic)
218 
219  def unsubscribe_all(self):
220  for (t,u),m in self.subs.iteritems():
221  m.unregisterSubscriber(t,u)
222  for t,u in self.pubs.keys():
223  self.unadvertise(t,u)
224  for s in self.srvs.keys():
225  self.unadvertise_service(s)
226 
227  def new_topics(self, topic, publishers):
228  self.cb(topic, [p for p in publishers if (topic,p) not in self.pubs])
229 
230 
232  try:
233  m.getUri()
234  return True
235  except Exception:
236  return False
237 
238 class MasterSync(object):
239  def __init__(self, foreign_master, local_service_names = [], local_pub_names = [], foreign_service_names = [], foreign_pub_names = []):
240 
241  self.local_service_names = local_service_names
242  self.local_pub_names = local_pub_names
243  self.foreign_service_names = foreign_service_names
244  self.foreign_pub_names = foreign_pub_names
245 
246  self.local_manager = None
247  self.foreign_manager = None
248  self.stopping = False
249  self.thread = None
250 
251  # Get master URIs
252  local_master = rosgraph.get_master_uri()
253 
254  m = rosgraph.Master(rospy.get_name(), master_uri=foreign_master)
255  r = rospy.Rate(1)
256  rospy.loginfo("Waiting for foreign master [%s] to come up..."%(foreign_master))
257  while not check_master(m) and not rospy.is_shutdown():
258  r.sleep()
259 
260  if not rospy.is_shutdown():
261  rospy.loginfo("Foreign master is available")
262 
263  self.local_manager = RemoteManager(local_master, self.new_local_topics)
264  self.foreign_manager = RemoteManager(foreign_master, self.new_foreign_topics)
265 
266  for t in self.local_pub_names:
267  self.local_manager.subscribe(t)
268 
269  for t in self.foreign_pub_names:
270  self.foreign_manager.subscribe(t)
271 
272  self.thread = threading.Thread(target=self.spin)
273  self.thread.start()
274 
275  else:
276  rospy.loginfo("shutdown flag raised, aborting...")
277 
278 
279  def new_local_topics(self, topic, publishers):
280  topic_type = self.local_manager.get_topic_type(topic)
281  self.foreign_manager.advertise_list(topic, topic_type, publishers)
282 
283 
284  def new_foreign_topics(self, topic, publishers):
285  topic_type = self.foreign_manager.get_topic_type(topic)
286  self.local_manager.advertise_list(topic, topic_type, publishers)
287 
288 
289  def stop(self):
290  self.stopping = True
291  self.thread.join()
292 
293  if self.local_manager:
294  self.local_manager.unsubscribe_all()
295  if self.foreign_manager:
296  self.foreign_manager.unsubscribe_all()
297 
298 
299  # Spin is necessary to synchronize SERVICES. Topics work entirely on a callback-driven basis
300  def spin(self):
301  r = rospy.Rate(1.0)
302  while not rospy.is_shutdown() and not self.stopping:
303  for s in self.local_service_names:
304  srv_uri = self.local_manager.lookup_service(s)
305  if srv_uri is not None:
306  self.foreign_manager.advertise_service(s, srv_uri)
307  else:
308  self.foreign_manager.unadvertise_service(s)
309  for s in self.foreign_service_names:
310  srv_uri = self.foreign_manager.lookup_service(s)
311  if srv_uri is not None:
312  self.local_manager.advertise_service(s, srv_uri)
313  else:
314  self.local_manager.unadvertise_service(s)
315  r.sleep()
def advertise(self, topic, topic_type, uri)
Definition: master_sync.py:146
def unadvertise(self, topic, uri)
Definition: master_sync.py:163
def lookup_service(self, service_name)
Definition: master_sync.py:182
def new_local_topics(self, topic, publishers)
Definition: master_sync.py:279
def _custom_validate(self, validation, param_name, param_value, caller_id)
Definition: master_sync.py:39
def __init__(self, master_uri, cb)
Definition: master_sync.py:102
def paramUpdate(self, caller_id, parameter_key, parameter_value)
Definition: master_sync.py:88
def new_topics(self, topic, publishers)
Definition: master_sync.py:227
def shutdown(self, caller_id, msg='')
Definition: master_sync.py:69
def unadvertise_service(self, service_name)
Definition: master_sync.py:207
def requestTopic(self, caller_id, topic, protocols)
Definition: master_sync.py:97
def advertise_list(self, topic, topic_type, uris)
Definition: master_sync.py:171
def __init__(self, foreign_master, local_service_names=[], local_pub_names=[], foreign_service_names=[], foreign_pub_names=[])
Definition: master_sync.py:239
def getSubscriptions(self, caller_id)
PUB/SUB APIS.
Definition: master_sync.py:80
def is_publishers_list(paramName)
Definition: master_sync.py:25
def advertise_service(self, service_name, uri)
Definition: master_sync.py:189
def new_foreign_topics(self, topic, publishers)
Definition: master_sync.py:284
def get_topic_type(self, query_topic)
Definition: master_sync.py:127
def publisherUpdate(self, caller_id, topic, publishers)
Definition: master_sync.py:93


app_manager
Author(s): Jeremy Leibs, Ken Conley, Yuki Furuta
autogenerated on Fri Mar 5 2021 03:07:47