1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 """
35 Internal use: ROS Node (Slave) API.
36
37 The Node API is implemented by the L{ROSHandler}.
38
39 API return convention: (statusCode, statusMessage, returnValue)
40
41 - statusCode: an integer indicating the completion condition of the method.
42 - statusMessage: a human-readable string message for debugging
43 - returnValue: the return value of the method; method-specific.
44
45 Current status codes:
46
47 - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
48 - 0: FAILURE: Method was attempted but failed to complete correctly.
49 - 1: SUCCESS: Method completed successfully.
50
51 Individual methods may assign additional meaning/semantics to statusCode.
52 """
53
54 import os
55 import sys
56 import itertools
57 import logging
58 import socket
59 import threading
60 import traceback
61 import time
62
63 try:
64
65 import urllib.parse as urlparse
66 except ImportError:
67 import urlparse
68
69 from rosgraph.xmlrpc import XmlRpcHandler
70
71 import rospy.names
72 import rospy.rostime
73
74 import rospy.impl.tcpros
75
76 from rospy.core import *
77 from rospy.impl.paramserver import get_param_server_cache
78 from rospy.impl.registration import RegManager, get_topic_manager
79 from rospy.impl.validators import non_empty, ParameterInvalid
80
81
82 STATUS = 0
83 MSG = 1
84 VAL = 2
90 return ('is_publishers_list', paramName)
91
92 _logger = logging.getLogger("rospy.impl.masterslave")
93
94 LOG_API = True
97 """
98 ROS master/slave arg-checking decorator. Applies the specified
99 validator to the corresponding argument and also remaps each
100 argument to be the value returned by the validator. Thus,
101 arguments can be simultaneously validated and canonicalized prior
102 to actual function call.
103 @param error_return_value: API value to return if call unexpectedly fails
104 @param validators: sequence of validators to apply to each
105 arg. None means no validation for the parameter is required. As all
106 api methods take caller_id as the first parameter, the validators
107 start with the second param.
108 @type validators: sequence
109 """
110 def check_validates(f):
111 assert len(validators) == f.__code__.co_argcount - 2, "%s failed arg check"%f
112 def validated_f(*args, **kwds):
113 if LOG_API:
114 _logger.debug("%s%s", f.__name__, str(args[1:]))
115
116 if len(args) == 1:
117 _logger.error("%s invoked without caller_id paramter"%f.__name__)
118 return -1, "missing required caller_id parameter", error_return_value
119 elif len(args) != f.__code__.co_argcount:
120 return -1, "Error: bad call arity", error_return_value
121
122 instance = args[0]
123 caller_id = args[1]
124 if not isinstance(caller_id, str):
125 _logger.error("%s: invalid caller_id param type", f.__name__)
126 return -1, "caller_id must be a string", error_return_value
127
128 newArgs = [instance, caller_id]
129 try:
130 for (v, a) in zip(validators, args[2:]):
131 if v:
132 try:
133
134 if type(v) == list or type(v) == tuple:
135 newArgs.append(instance._custom_validate(v[0], v[1], a, caller_id))
136 else:
137 newArgs.append(v(a, caller_id))
138 except ParameterInvalid as e:
139 _logger.error("%s: invalid parameter: %s", f.__name__, str(e) or 'error')
140 return -1, str(e) or 'error', error_return_value
141 else:
142 newArgs.append(a)
143
144 if LOG_API:
145 retval = f(*newArgs, **kwds)
146 _logger.debug("%s%s returns %s", f.__name__, args[1:], retval)
147 return retval
148 else:
149 code, msg, val = f(*newArgs, **kwds)
150 if val is None:
151 return -1, "Internal error (None value returned)", error_return_value
152 return code, msg, val
153 except TypeError as te:
154 _logger.error(traceback.format_exc())
155 return -1, "Error: invalid arguments: %s"%te, error_return_value
156 except Exception as e:
157 _logger.error(traceback.format_exc())
158 return 0, "Internal failure: %s"%e, error_return_value
159 validated_f.__name__ = f.__name__
160 validated_f.__doc__ = f.__doc__
161 return validated_f
162 return check_validates
163
166 """
167 Base handler for both slave and master nodes. API methods
168 generally provide the capability for establishing point-to-point
169 connections with other nodes.
170
171 Instance methods are XML-RPC API methods, so care must be taken as
172 to what is added here.
173 """
174
176 """
177 Base constructor for ROS nodes/masters
178 @param name: ROS name of this node
179 @type name: str
180 @param master_uri: URI of master node, or None if this node is the master
181 @type master_uri: str
182 """
183 super(ROSHandler, self).__init__()
184 self.masterUri = master_uri
185 self.name = name
186 self.uri = None
187 self.done = False
188
189
190 self.protocol_handlers = []
191 handler = rospy.impl.tcpros.get_tcpros_handler()
192 if handler is not None:
193 self.protocol_handlers.append(handler)
194
195 self.reg_man = RegManager(self)
196
197
198
199
201 """
202 @return: True if slave API is registered with master.
203 @rtype: bool
204 """
205 if self.reg_man is None:
206 return False
207 else:
208 return self.reg_man.is_registered()
209
210
212 """
213 @param uri: XML-RPC URI
214 @type uri: str
215 callback from ROSNode to inform handler of correct i/o information
216 """
217 _logger.info("_ready: %s", uri)
218 self.uri = uri
219
220 if self.reg_man:
221 t = threading.Thread(target=self.reg_man.start, args=(uri, self.masterUri))
222 rospy.core._add_shutdown_thread(t)
223 t.start()
224
226 """
227 Implements validation rules that require access to internal ROSHandler state.
228 @param validation: name of validation rule to use
229 @type validation: str
230 @param param_name: name of parameter being validated
231 @type param_name: str
232 @param param_value str: value of parameter
233 @type param_value: str
234 @param caller_id: value of caller_id parameter to API method
235 @type caller_id: str
236 @raise ParameterInvalid: if the parameter does not meet validation
237 @return: new value for parameter, after validation
238 """
239 if validation == 'is_publishers_list':
240 if not type(param_value) == list:
241 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
242 for v in param_value:
243 if not isinstance(v, str):
244 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
245 parsed = urlparse.urlparse(v)
246 if not parsed[0] or not parsed[1]:
247 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
248 return param_value
249 else:
250 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
251
252
253
254 _remap_table = { }
255
256 @classmethod
258 """
259 @internal
260 @param cls: class to register remappings on
261 @type cls: Class: class to register remappings on
262 @return: parameters (by pos) that should be remapped because they are names
263 @rtype: list
264 """
265 if methodName in cls._remap_table:
266 return cls._remap_table[methodName]
267 else:
268 return []
269
270
271
272
273 @apivalidate('')
274
275
276
277
279 return 1, "", self.uri
280
281 @apivalidate('')
282
283
284
285
287 return 1, "", self.name
288
289
290
291
292
293 @apivalidate([])
295 """
296 Retrieve transport/topic statistics
297 @param caller_id: ROS caller id
298 @type caller_id: str
299 @return: [publishStats, subscribeStats, serviceStats]::
300 publishStats: [[topicName, messageDataSent, pubConnectionData]...[topicNameN, messageDataSentN, pubConnectionDataN]]
301 pubConnectionData: [connectionId, bytesSent, numSent, connected]* .
302 subscribeStats: [[topicName, subConnectionData]...[topicNameN, subConnectionDataN]]
303 subConnectionData: [connectionId, bytesReceived, dropEstimate, connected]* . dropEstimate is -1 if no estimate.
304 serviceStats: not sure yet, probably akin to [numRequests, bytesReceived, bytesSent]
305 """
306 pub_stats, sub_stats = get_topic_manager().get_pub_sub_stats()
307
308 return 1, '', [pub_stats, sub_stats, []]
309
310 @apivalidate([])
312 """
313 Retrieve transport/topic connection information
314 @param caller_id: ROS caller id
315 @type caller_id: str
316 """
317 return 1, "bus info", get_topic_manager().get_pub_sub_info()
318
319 @apivalidate('')
321 """
322 Get the URI of the master node.
323 @param caller_id: ROS caller id
324 @type caller_id: str
325 @return: [code, msg, masterUri]
326 @rtype: [int, str, str]
327 """
328 if self.masterUri:
329 return 1, self.masterUri, self.masterUri
330 else:
331 return 0, "master URI not set", ""
332
334 """
335 @param reason: human-readable debug string
336 @type reason: str
337 """
338 if not self.done:
339 self.done = True
340 if reason:
341 _logger.info(reason)
342 if self.protocol_handlers:
343 for handler in self.protocol_handlers:
344 handler.shutdown()
345 del self.protocol_handlers[:]
346 self.protocol_handlers = None
347 return True
348
349 @apivalidate(0, (None, ))
351 """
352 Stop this server
353 @param caller_id: ROS caller id
354 @type caller_id: str
355 @param msg: a message describing why the node is being shutdown.
356 @type msg: str
357 @return: [code, msg, 0]
358 @rtype: [int, str, int]
359 """
360 if msg:
361 print("shutdown request: %s"%msg)
362 else:
363 print("shutdown requst")
364 if self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)):
365 signal_shutdown('external shutdown request from [%s]: [%s]'%(caller_id, msg))
366 return 1, "shutdown", 0
367
368 @apivalidate(-1)
370 """
371 Get the PID of this server
372 @param caller_id: ROS caller id
373 @type caller_id: str
374 @return: [1, "", serverProcessPID]
375 @rtype: [int, str, int]
376 """
377 return 1, "", os.getpid()
378
379
380
381
382 @apivalidate([])
384 """
385 Retrieve a list of topics that this node subscribes to.
386 @param caller_id: ROS caller id
387 @type caller_id: str
388 @return: list of topics this node subscribes to.
389 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]]
390 """
391 return 1, "subscriptions", get_topic_manager().get_subscriptions()
392
393 @apivalidate([])
395 """
396 Retrieve a list of topics that this node publishes.
397 @param caller_id: ROS caller id
398 @type caller_id: str
399 @return: list of topics published by this node.
400 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]]
401 """
402 return 1, "publications", get_topic_manager().get_publications()
403
405 """
406 Connect subscriber to topic.
407 @param topic: Topic name to connect.
408 @type topic: str
409 @param pub_uri: API URI of topic publisher.
410 @type pub_uri: str
411 @return: [code, msg, numConnects]. numConnects is the number
412 of subscribers connected to the topic.
413 @rtype: [int, str, int]
414 """
415 caller_id = rospy.names.get_caller_id()
416 sub = get_topic_manager().get_subscriber_impl(topic)
417 if not sub:
418 return -1, "No subscriber for topic [%s]"%topic, 0
419 elif sub.has_connection(pub_uri):
420 return 1, "_connect_topic[%s]: subscriber already connected to publisher [%s]"%(topic, pub_uri), 0
421
422
423
424 protocols = []
425 for h in self.protocol_handlers:
426 protocols.extend(h.get_supported())
427 if not protocols:
428 return 0, "ERROR: no available protocol handlers", 0
429
430 _logger.debug("connect[%s]: calling requestTopic(%s, %s, %s)", topic, caller_id, topic, str(protocols))
431
432
433
434
435
436
437
438
439 socket.setdefaulttimeout(60.)
440 tries = 0
441 max_num_tries = 3
442 success = False
443 while not success:
444 tries += 1
445 try:
446 code, msg, result = \
447 xmlrpcapi(pub_uri).requestTopic(caller_id, topic, protocols)
448 success = True
449 except Exception as e:
450 if tries >= max_num_tries:
451 return 0, "unable to requestTopic: %s"%str(e), 0
452 else:
453 _logger.debug("Retrying for %s" % topic)
454
455
456 if code <= 0:
457 _logger.debug("connect[%s]: requestTopic did not succeed %s, %s", pub_uri, code, msg)
458 return code, msg, 0
459 elif not result or type(protocols) != list:
460 return 0, "ERROR: publisher returned invalid protocol choice: %s"%(str(result)), 0
461 _logger.debug("connect[%s]: requestTopic returned protocol list %s", topic, result)
462 protocol = result[0]
463 for h in self.protocol_handlers:
464 if h.supports(protocol):
465 return h.create_transport(topic, pub_uri, result)
466 return 0, "ERROR: publisher returned unsupported protocol choice: %s"%result, 0
467
468 @apivalidate(-1, (global_name('parameter_key'), None))
469 - def paramUpdate(self, caller_id, parameter_key, parameter_value):
470 """
471 Callback from master of current publisher list for specified topic.
472 @param caller_id: ROS caller id
473 @type caller_id: str
474 @param parameter_key str: parameter name, globally resolved
475 @type parameter_key: str
476 @param parameter_value New parameter value
477 @type parameter_value: XMLRPC-legal value
478 @return: [code, status, ignore]. If code is -1 ERROR, the node
479 is not subscribed to parameter_key
480 @rtype: [int, str, int]
481 """
482 try:
483 get_param_server_cache().update(parameter_key, parameter_value)
484 return 1, '', 0
485 except KeyError:
486 return -1, 'not subscribed', 0
487
488 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
490 """
491 Callback from master of current publisher list for specified topic.
492 @param caller_id: ROS caller id
493 @type caller_id: str
494 @param topic str: topic name
495 @type topic: str
496 @param publishers: list of current publishers for topic in the form of XMLRPC URIs
497 @type publishers: [str]
498 @return: [code, status, ignore]
499 @rtype: [int, str, int]
500 """
501 if self.reg_man:
502 for uri in publishers:
503 self.reg_man.publisher_update(topic, publishers)
504 return 1, "", 0
505
506 _remap_table['requestTopic'] = [0]
507 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
509 """
510 Publisher node API method called by a subscriber node.
511
512 Request that source allocate a channel for communication. Subscriber provides
513 a list of desired protocols for communication. Publisher returns the
514 selected protocol along with any additional params required for
515 establishing connection. For example, for a TCP/IP-based connection,
516 the source node may return a port number of TCP/IP server.
517 @param caller_id str: ROS caller id
518 @type caller_id: str
519 @param topic: topic name
520 @type topic: str
521 @param protocols: list of desired
522 protocols for communication in order of preference. Each
523 protocol is a list of the form [ProtocolName,
524 ProtocolParam1, ProtocolParam2...N]
525 @type protocols: [[str, XmlRpcLegalValue*]]
526 @return: [code, msg, protocolParams]. protocolParams may be an
527 empty list if there are no compatible protocols.
528 @rtype: [int, str, [str, XmlRpcLegalValue*]]
529 """
530 if not get_topic_manager().has_publication(topic):
531 return -1, "Not a publisher of [%s]"%topic, []
532 for protocol in protocols:
533 protocol_id = protocol[0]
534 for h in self.protocol_handlers:
535 if h.supports(protocol_id):
536 _logger.debug("requestTopic[%s]: choosing protocol %s", topic, protocol_id)
537 return h.init_publisher(topic, protocol)
538 return 0, "no supported protocol implementations", []
539