1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 """
35 Internal use: ROS Node (Slave) API.
36
37 The Node API is implemented by the L{ROSHandler}.
38
39 API return convention: (statusCode, statusMessage, returnValue)
40
41 - statusCode: an integer indicating the completion condition of the method.
42 - statusMessage: a human-readable string message for debugging
43 - returnValue: the return value of the method; method-specific.
44
45 Current status codes:
46
47 - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
48 - 0: FAILURE: Method was attempted but failed to complete correctly.
49 - 1: SUCCESS: Method completed successfully.
50
51 Individual methods may assign additional meaning/semantics to statusCode.
52 """
53
54 import os
55 import sys
56 import itertools
57 import logging
58 import socket
59 import threading
60 import traceback
61 import time
62 import errno
63
64 try:
65
66 import urllib.parse as urlparse
67 except ImportError:
68 import urlparse
69
70 from rosgraph.xmlrpc import XmlRpcHandler
71
72 import rospy.names
73 import rospy.rostime
74
75 import rospy.impl.tcpros
76
77 from rospy.core import *
78 from rospy.impl.paramserver import get_param_server_cache
79 from rospy.impl.registration import RegManager, get_topic_manager
80 from rospy.impl.validators import non_empty, ParameterInvalid
81
82
83 STATUS = 0
84 MSG = 1
85 VAL = 2
91 return ('is_publishers_list', paramName)
92
93 _logger = logging.getLogger("rospy.impl.masterslave")
94
95 LOG_API = True
98 """
99 ROS master/slave arg-checking decorator. Applies the specified
100 validator to the corresponding argument and also remaps each
101 argument to be the value returned by the validator. Thus,
102 arguments can be simultaneously validated and canonicalized prior
103 to actual function call.
104 @param error_return_value: API value to return if call unexpectedly fails
105 @param validators: sequence of validators to apply to each
106 arg. None means no validation for the parameter is required. As all
107 api methods take caller_id as the first parameter, the validators
108 start with the second param.
109 @type validators: sequence
110 """
111 def check_validates(f):
112 assert len(validators) == f.__code__.co_argcount - 2, "%s failed arg check"%f
113 def validated_f(*args, **kwds):
114 if LOG_API:
115 _logger.debug("%s%s", f.__name__, str(args[1:]))
116
117 if len(args) == 1:
118 _logger.error("%s invoked without caller_id parameter"%f.__name__)
119 return -1, "missing required caller_id parameter", error_return_value
120 elif len(args) != f.__code__.co_argcount:
121 return -1, "Error: bad call arity", error_return_value
122
123 instance = args[0]
124 caller_id = args[1]
125 if not isinstance(caller_id, str):
126 _logger.error("%s: invalid caller_id param type", f.__name__)
127 return -1, "caller_id must be a string", error_return_value
128
129 newArgs = [instance, caller_id]
130 try:
131 for (v, a) in zip(validators, args[2:]):
132 if v:
133 try:
134
135 if type(v) == list or type(v) == tuple:
136 newArgs.append(instance._custom_validate(v[0], v[1], a, caller_id))
137 else:
138 newArgs.append(v(a, caller_id))
139 except ParameterInvalid as e:
140 _logger.error("%s: invalid parameter: %s", f.__name__, str(e) or 'error')
141 return -1, str(e) or 'error', error_return_value
142 else:
143 newArgs.append(a)
144
145 if LOG_API:
146 retval = f(*newArgs, **kwds)
147 _logger.debug("%s%s returns %s", f.__name__, args[1:], retval)
148 return retval
149 else:
150 code, msg, val = f(*newArgs, **kwds)
151 if val is None:
152 return -1, "Internal error (None value returned)", error_return_value
153 return code, msg, val
154 except TypeError as te:
155 _logger.error(traceback.format_exc())
156 return -1, "Error: invalid arguments: %s"%te, error_return_value
157 except Exception as e:
158 _logger.error(traceback.format_exc())
159 return 0, "Internal failure: %s"%e, error_return_value
160 validated_f.__name__ = f.__name__
161 validated_f.__doc__ = f.__doc__
162 return validated_f
163 return check_validates
164
167 """
168 Base handler for both slave and master nodes. API methods
169 generally provide the capability for establishing point-to-point
170 connections with other nodes.
171
172 Instance methods are XML-RPC API methods, so care must be taken as
173 to what is added here.
174 """
175
177 """
178 Base constructor for ROS nodes/masters
179 @param name: ROS name of this node
180 @type name: str
181 @param master_uri: URI of master node, or None if this node is the master
182 @type master_uri: str
183 """
184 super(ROSHandler, self).__init__()
185 self.masterUri = master_uri
186 self.name = name
187 self.uri = None
188 self.done = False
189
190
191 self.protocol_handlers = []
192 handler = rospy.impl.tcpros.get_tcpros_handler()
193 if handler is not None:
194 self.protocol_handlers.append(handler)
195
196 self.reg_man = RegManager(self)
197
198
199
200
202 """
203 @return: True if slave API is registered with master.
204 @rtype: bool
205 """
206 if self.reg_man is None:
207 return False
208 else:
209 return self.reg_man.is_registered()
210
211
213 """
214 @param uri: XML-RPC URI
215 @type uri: str
216 callback from ROSNode to inform handler of correct i/o information
217 """
218 _logger.info("_ready: %s", uri)
219 self.uri = uri
220
221 if self.reg_man:
222 t = threading.Thread(target=self.reg_man.start, args=(uri, self.masterUri))
223 rospy.core._add_shutdown_thread(t)
224 t.start()
225
227 """
228 Implements validation rules that require access to internal ROSHandler state.
229 @param validation: name of validation rule to use
230 @type validation: str
231 @param param_name: name of parameter being validated
232 @type param_name: str
233 @param param_value str: value of parameter
234 @type param_value: str
235 @param caller_id: value of caller_id parameter to API method
236 @type caller_id: str
237 @raise ParameterInvalid: if the parameter does not meet validation
238 @return: new value for parameter, after validation
239 """
240 if validation == 'is_publishers_list':
241 if not type(param_value) == list:
242 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
243 for v in param_value:
244 if not isinstance(v, str):
245 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
246 parsed = urlparse.urlparse(v)
247 if not parsed[0] or not parsed[1]:
248 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
249 return param_value
250 else:
251 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
252
253
254
255 _remap_table = { }
256
257 @classmethod
259 """
260 @internal
261 @param cls: class to register remappings on
262 @type cls: Class: class to register remappings on
263 @return: parameters (by pos) that should be remapped because they are names
264 @rtype: list
265 """
266 if methodName in cls._remap_table:
267 return cls._remap_table[methodName]
268 else:
269 return []
270
271
272
273
274 @apivalidate('')
275
276
277
278
280 return 1, "", self.uri
281
282 @apivalidate('')
283
284
285
286
288 return 1, "", self.name
289
290
291
292
293
294 @apivalidate([])
296 """
297 Retrieve transport/topic statistics
298 @param caller_id: ROS caller id
299 @type caller_id: str
300 @return: [publishStats, subscribeStats, serviceStats]::
301 publishStats: [[topicName, messageDataSent, pubConnectionData]...[topicNameN, messageDataSentN, pubConnectionDataN]]
302 pubConnectionData: [connectionId, bytesSent, numSent, connected]* .
303 subscribeStats: [[topicName, subConnectionData]...[topicNameN, subConnectionDataN]]
304 subConnectionData: [connectionId, bytesReceived, dropEstimate, connected]* . dropEstimate is -1 if no estimate.
305 serviceStats: not sure yet, probably akin to [numRequests, bytesReceived, bytesSent]
306 """
307 pub_stats, sub_stats = get_topic_manager().get_pub_sub_stats()
308
309 return 1, '', [pub_stats, sub_stats, []]
310
311 @apivalidate([])
313 """
314 Retrieve transport/topic connection information
315 @param caller_id: ROS caller id
316 @type caller_id: str
317 """
318 return 1, "bus info", get_topic_manager().get_pub_sub_info()
319
320 @apivalidate('')
322 """
323 Get the URI of the master node.
324 @param caller_id: ROS caller id
325 @type caller_id: str
326 @return: [code, msg, masterUri]
327 @rtype: [int, str, str]
328 """
329 if self.masterUri:
330 return 1, self.masterUri, self.masterUri
331 else:
332 return 0, "master URI not set", ""
333
335 """
336 @param reason: human-readable debug string
337 @type reason: str
338 """
339 if not self.done:
340 self.done = True
341 if reason:
342 _logger.info(reason)
343 if self.protocol_handlers:
344 for handler in self.protocol_handlers:
345 handler.shutdown()
346 del self.protocol_handlers[:]
347 self.protocol_handlers = None
348 return True
349
350 @apivalidate(0, (None, ))
352 """
353 Stop this server
354 @param caller_id: ROS caller id
355 @type caller_id: str
356 @param msg: a message describing why the node is being shutdown.
357 @type msg: str
358 @return: [code, msg, 0]
359 @rtype: [int, str, int]
360 """
361 if msg:
362 print("shutdown request: %s"%msg)
363 else:
364 print("shutdown requst")
365 if self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)):
366 signal_shutdown('external shutdown request from [%s]: [%s]'%(caller_id, msg))
367 return 1, "shutdown", 0
368
369 @apivalidate(-1)
371 """
372 Get the PID of this server
373 @param caller_id: ROS caller id
374 @type caller_id: str
375 @return: [1, "", serverProcessPID]
376 @rtype: [int, str, int]
377 """
378 return 1, "", os.getpid()
379
380
381
382
383 @apivalidate([])
385 """
386 Retrieve a list of topics that this node subscribes to.
387 @param caller_id: ROS caller id
388 @type caller_id: str
389 @return: list of topics this node subscribes to.
390 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]]
391 """
392 return 1, "subscriptions", get_topic_manager().get_subscriptions()
393
394 @apivalidate([])
396 """
397 Retrieve a list of topics that this node publishes.
398 @param caller_id: ROS caller id
399 @type caller_id: str
400 @return: list of topics published by this node.
401 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]]
402 """
403 return 1, "publications", get_topic_manager().get_publications()
404
406 """
407 Connect subscriber to topic.
408 @param topic: Topic name to connect.
409 @type topic: str
410 @param pub_uri: API URI of topic publisher.
411 @type pub_uri: str
412 @return: [code, msg, numConnects]. numConnects is the number
413 of subscribers connected to the topic.
414 @rtype: [int, str, int]
415 """
416 caller_id = rospy.names.get_caller_id()
417 sub = get_topic_manager().get_subscriber_impl(topic)
418 if not sub:
419 return -1, "No subscriber for topic [%s]"%topic, 0
420 elif sub.has_connection(pub_uri):
421 return 1, "_connect_topic[%s]: subscriber already connected to publisher [%s]"%(topic, pub_uri), 0
422
423
424
425 protocols = []
426 for h in self.protocol_handlers:
427 protocols.extend(h.get_supported())
428 if not protocols:
429 return 0, "ERROR: no available protocol handlers", 0
430
431 _logger.debug("connect[%s]: calling requestTopic(%s, %s, %s)", topic, caller_id, topic, str(protocols))
432
433
434
435
436
437
438
439
440 socket.setdefaulttimeout(60.)
441 success = False
442 interval = 0.5
443
444
445
446
447 while not success and not is_shutdown():
448 try:
449 code, msg, result = \
450 xmlrpcapi(pub_uri, cache=False).requestTopic(caller_id, topic, protocols)
451 success = True
452 except Exception as e:
453 if getattr(e, 'errno', None) == errno.ECONNREFUSED:
454 code = -errno.ECONNREFUSED
455 msg = str(e)
456 break
457 elif not is_shutdown():
458 _logger.debug("Retrying for %s" % topic)
459 if interval < 30.0:
460
461 interval = interval * 2
462 time.sleep(interval)
463
464
465 if code <= 0:
466 _logger.debug("connect[%s]: requestTopic did not succeed %s, %s", pub_uri, code, msg)
467 return code, msg, 0
468 elif not result or type(protocols) != list:
469 return 0, "ERROR: publisher returned invalid protocol choice: %s"%(str(result)), 0
470 _logger.debug("connect[%s]: requestTopic returned protocol list %s", topic, result)
471 protocol = result[0]
472 for h in self.protocol_handlers:
473 if h.supports(protocol):
474 return h.create_transport(topic, pub_uri, result)
475 return 0, "ERROR: publisher returned unsupported protocol choice: %s"%result, 0
476
477 @apivalidate(-1, (global_name('parameter_key'), None))
478 - def paramUpdate(self, caller_id, parameter_key, parameter_value):
479 """
480 Callback from master of current publisher list for specified topic.
481 @param caller_id: ROS caller id
482 @type caller_id: str
483 @param parameter_key str: parameter name, globally resolved
484 @type parameter_key: str
485 @param parameter_value New parameter value
486 @type parameter_value: XMLRPC-legal value
487 @return: [code, status, ignore]. If code is -1 ERROR, the node
488 is not subscribed to parameter_key
489 @rtype: [int, str, int]
490 """
491 try:
492 get_param_server_cache().update(parameter_key, parameter_value)
493 return 1, '', 0
494 except KeyError:
495 return -1, 'not subscribed', 0
496
497 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
499 """
500 Callback from master of current publisher list for specified topic.
501 @param caller_id: ROS caller id
502 @type caller_id: str
503 @param topic str: topic name
504 @type topic: str
505 @param publishers: list of current publishers for topic in the form of XMLRPC URIs
506 @type publishers: [str]
507 @return: [code, status, ignore]
508 @rtype: [int, str, int]
509 """
510 if self.reg_man:
511 for uri in publishers:
512 self.reg_man.publisher_update(topic, publishers)
513 return 1, "", 0
514
515 _remap_table['requestTopic'] = [0]
516 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
518 """
519 Publisher node API method called by a subscriber node.
520
521 Request that source allocate a channel for communication. Subscriber provides
522 a list of desired protocols for communication. Publisher returns the
523 selected protocol along with any additional params required for
524 establishing connection. For example, for a TCP/IP-based connection,
525 the source node may return a port number of TCP/IP server.
526 @param caller_id str: ROS caller id
527 @type caller_id: str
528 @param topic: topic name
529 @type topic: str
530 @param protocols: list of desired
531 protocols for communication in order of preference. Each
532 protocol is a list of the form [ProtocolName,
533 ProtocolParam1, ProtocolParam2...N]
534 @type protocols: [[str, XmlRpcLegalValue*]]
535 @return: [code, msg, protocolParams]. protocolParams may be an
536 empty list if there are no compatible protocols.
537 @rtype: [int, str, [str, XmlRpcLegalValue*]]
538 """
539 if not get_topic_manager().has_publication(topic):
540 return -1, "Not a publisher of [%s]"%topic, []
541 for protocol in protocols:
542 protocol_id = protocol[0]
543 for h in self.protocol_handlers:
544 if h.supports(protocol_id):
545 _logger.debug("requestTopic[%s]: choosing protocol %s", topic, protocol_id)
546 return h.init_publisher(topic, protocol)
547 return 0, "no supported protocol implementations", []
548