1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 """
35 Internal use: ROS Node (Slave) API.
36
37 The Node API is implemented by the L{ROSHandler}.
38
39 API return convention: (statusCode, statusMessage, returnValue)
40
41 - statusCode: an integer indicating the completion condition of the method.
42 - statusMessage: a human-readable string message for debugging
43 - returnValue: the return value of the method; method-specific.
44
45 Current status codes:
46
47 - -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
48 - 0: FAILURE: Method was attempted but failed to complete correctly.
49 - 1: SUCCESS: Method completed successfully.
50
51 Individual methods may assign additional meaning/semantics to statusCode.
52 """
53
54 import os
55 import sys
56 import itertools
57 import logging
58 import socket
59 import threading
60 import traceback
61 import time
62 import urlparse
63
64 from roslib.xmlrpc import XmlRpcHandler
65
66 import rospy.names
67 import rospy.rostime
68
69 import rospy.impl.tcpros
70
71 from rospy.core import *
72 from rospy.impl.paramserver import get_param_server_cache
73 from rospy.impl.registration import RegManager, get_topic_manager
74 from rospy.impl.validators import non_empty, ParameterInvalid
75
76
77 STATUS = 0
78 MSG = 1
79 VAL = 2
85 return ('is_publishers_list', paramName)
86
87 _logger = logging.getLogger("rospy.impl.masterslave")
88
89 LOG_API = True
92 """
93 ROS master/slave arg-checking decorator. Applies the specified
94 validator to the corresponding argument and also remaps each
95 argument to be the value returned by the validator. Thus,
96 arguments can be simultaneously validated and canonicalized prior
97 to actual function call.
98 @param error_return_value: API value to return if call unexpectedly fails
99 @param validators: sequence of validators to apply to each
100 arg. None means no validation for the parameter is required. As all
101 api methods take caller_id as the first parameter, the validators
102 start with the second param.
103 @type validators: sequence
104 """
105 def check_validates(f):
106 assert len(validators) == f.func_code.co_argcount - 2, "%s failed arg check"%f
107 def validated_f(*args, **kwds):
108 if LOG_API:
109 _logger.debug("%s%s", f.func_name, str(args[1:]))
110
111 if len(args) == 1:
112 _logger.error("%s invoked without caller_id paramter"%f.func_name)
113 return -1, "missing required caller_id parameter", error_return_value
114 elif len(args) != f.func_code.co_argcount:
115 return -1, "Error: bad call arity", error_return_value
116
117 instance = args[0]
118 caller_id = args[1]
119 if not isinstance(caller_id, basestring):
120 _logger.error("%s: invalid caller_id param type", f.func_name)
121 return -1, "caller_id must be a string", error_return_value
122
123 newArgs = [instance, caller_id]
124 try:
125 for (v, a) in itertools.izip(validators, args[2:]):
126 if v:
127 try:
128
129 if type(v) == list or type(v) == tuple:
130 newArgs.append(instance._custom_validate(v[0], v[1], a, caller_id))
131 else:
132 newArgs.append(v(a, caller_id))
133 except ParameterInvalid, e:
134 _logger.error("%s: invalid parameter: %s", f.func_name, str(e) or 'error')
135 return -1, str(e) or 'error', error_return_value
136 else:
137 newArgs.append(a)
138
139 if LOG_API:
140 retval = f(*newArgs, **kwds)
141 _logger.debug("%s%s returns %s", f.func_name, args[1:], retval)
142 return retval
143 else:
144 code, msg, val = f(*newArgs, **kwds)
145 if val is None:
146 return -1, "Internal error (None value returned)", error_return_value
147 return code, msg, val
148 except TypeError, te:
149 _logger.error(traceback.format_exc())
150 return -1, "Error: invalid arguments: %s"%te, error_return_value
151 except Exception, e:
152 _logger.error(traceback.format_exc())
153 return 0, "Internal failure: %s"%e, error_return_value
154 validated_f.func_name = f.func_name
155 validated_f.__doc__ = f.__doc__
156 return validated_f
157 return check_validates
158
161 """
162 Base handler for both slave and master nodes. API methods
163 generally provide the capability for establishing point-to-point
164 connections with other nodes.
165
166 Instance methods are XML-RPC API methods, so care must be taken as
167 to what is added here.
168 """
169
171 """
172 Base constructor for ROS nodes/masters
173 @param name: ROS name of this node
174 @type name: str
175 @param master_uri: URI of master node, or None if this node is the master
176 @type master_uri: str
177 """
178 super(ROSHandler, self).__init__()
179 self.masterUri = master_uri
180 self.name = name
181 self.uri = None
182 self.done = False
183
184
185 self.protocol_handlers = []
186 handler = rospy.impl.tcpros.get_tcpros_handler()
187 if handler is not None:
188 self.protocol_handlers.append(handler)
189
190 self.reg_man = RegManager(self)
191
192
193
194
196 """
197 @return: True if slave API is registered with master.
198 @rtype: bool
199 """
200 if self.reg_man is None:
201 return False
202 else:
203 return self.reg_man.is_registered()
204
205
207 """
208 @param uri: XML-RPC URI
209 @type uri: str
210 callback from ROSNode to inform handler of correct i/o information
211 """
212 _logger.info("_ready: %s", uri)
213 self.uri = uri
214
215 if self.reg_man:
216 t = threading.Thread(target=self.reg_man.start, args=(uri, self.masterUri))
217 rospy.core._add_shutdown_thread(t)
218 t.start()
219
221 """
222 Implements validation rules that require access to internal ROSHandler state.
223 @param validation: name of validation rule to use
224 @type validation: str
225 @param param_name: name of parameter being validated
226 @type param_name: str
227 @param param_value str: value of parameter
228 @type param_value: str
229 @param caller_id: value of caller_id parameter to API method
230 @type caller_id: str
231 @raise ParameterInvalid: if the parameter does not meet validation
232 @return: new value for parameter, after validation
233 """
234 if validation == 'is_publishers_list':
235 if not type(param_value) == list:
236 raise ParameterInvalid("ERROR: param [%s] must be a list"%param_name)
237 for v in param_value:
238 if not isinstance(v, basestring):
239 raise ParameterInvalid("ERROR: param [%s] must be a list of strings"%param_name)
240 parsed = urlparse.urlparse(v)
241 if not parsed[0] or not parsed[1]:
242 raise ParameterInvalid("ERROR: param [%s] does not contain valid URLs [%s]"%(param_name, v))
243 return param_value
244 else:
245 raise ParameterInvalid("ERROR: param [%s] has an unknown validation type [%s]"%(param_name, validation))
246
247
248
249 _remap_table = { }
250
251 @classmethod
253 """
254 @internal
255 @param cls: class to register remappings on
256 @type cls: Class: class to register remappings on
257 @return: parameters (by pos) that should be remapped because they are names
258 @rtype: list
259 """
260 if methodName in cls._remap_table:
261 return cls._remap_table[methodName]
262 else:
263 return []
264
265
266
267
268 @apivalidate('')
269
270
271
272
274 return 1, "", self.uri
275
276 @apivalidate('')
277
278
279
280
282 return 1, "", self.name
283
284
285
286
287
288 @apivalidate([])
290 """
291 Retrieve transport/topic statistics
292 @param caller_id: ROS caller id
293 @type caller_id: str
294 @return: [publishStats, subscribeStats, serviceStats]::
295 publishStats: [[topicName, messageDataSent, pubConnectionData]...[topicNameN, messageDataSentN, pubConnectionDataN]]
296 pubConnectionData: [connectionId, bytesSent, numSent, connected]* .
297 subscribeStats: [[topicName, subConnectionData]...[topicNameN, subConnectionDataN]]
298 subConnectionData: [connectionId, bytesReceived, dropEstimate, connected]* . dropEstimate is -1 if no estimate.
299 serviceStats: not sure yet, probably akin to [numRequests, bytesReceived, bytesSent]
300 """
301 pub_stats, sub_stats = get_topic_manager().get_pub_sub_stats()
302
303 return 1, '', [pub_stats, sub_stats, []]
304
305 @apivalidate([])
307 """
308 Retrieve transport/topic connection information
309 @param caller_id: ROS caller id
310 @type caller_id: str
311 """
312 return 1, "bus info", get_topic_manager().get_pub_sub_info()
313
314 @apivalidate('')
316 """
317 Get the URI of the master node.
318 @param caller_id: ROS caller id
319 @type caller_id: str
320 @return: [code, msg, masterUri]
321 @rtype: [int, str, str]
322 """
323 if self.masterUri:
324 return 1, self.masterUri, self.masterUri
325 else:
326 return 0, "master URI not set", ""
327
329 """
330 @param reason: human-readable debug string
331 @type reason: str
332 """
333 if not self.done:
334 self.done = True
335 if reason:
336 _logger.info(reason)
337 if self.protocol_handlers:
338 for handler in self.protocol_handlers:
339 handler.shutdown()
340 del self.protocol_handlers[:]
341 self.protocol_handlers = None
342 return True
343
344 @apivalidate(0, (None, ))
346 """
347 Stop this server
348 @param caller_id: ROS caller id
349 @type caller_id: str
350 @param msg: a message describing why the node is being shutdown.
351 @type msg: str
352 @return: [code, msg, 0]
353 @rtype: [int, str, int]
354 """
355 if msg:
356 print >> sys.stdout, "shutdown request: %s"%msg
357 else:
358 print >> sys.stdout, "shutdown requst"
359 if self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg)):
360 signal_shutdown('external shutdown request from [%s]: [%s]'%(caller_id, msg))
361 return 1, "shutdown", 0
362
363 @apivalidate(-1)
365 """
366 Get the PID of this server
367 @param caller_id: ROS caller id
368 @type caller_id: str
369 @return: [1, "", serverProcessPID]
370 @rtype: [int, str, int]
371 """
372 return 1, "", os.getpid()
373
374
375
376
377 @apivalidate([])
379 """Retrieve a list of topics that this node subscribes to
380 @param caller_id: ROS caller id
381 @type caller_id: str
382 @return [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]]: list of topics this node subscribes to
383 """
384 return 1, "subscriptions", get_topic_manager().get_subscriptions()
385
386 @apivalidate([])
388 """
389 Retrieve a list of topics that this node publishes.
390 @param caller_id: ROS caller id
391 @type caller_id: str
392 @return: list of topics published by this node
393 @rtype: [int, str, [ [topic1, topicType1]...[topicN, topicTypeN]]]
394 """
395 return 1, "publications", get_topic_manager().get_publications()
396
398 """
399 Connect subscriber to topic
400 @param topic: topic name to connect
401 @type topic: str
402 @param pub_uri: API URI of topic publisher
403 @type pub_uri: str
404 @return: [code, msg, numConnects]. numConnects is the number
405 of subscribers connected to the topic
406 @rtype: [int, str, int]
407 """
408 caller_id = rospy.names.get_caller_id()
409 sub = get_topic_manager().get_subscriber_impl(topic)
410 if not sub:
411 return -1, "No subscriber for topic [%s]"%topic, 0
412 elif sub.has_connection(pub_uri):
413 return 1, "_connect_topic[%s]: subscriber already connected to publisher [%s]"%(topic, pub_uri), 0
414
415
416
417 protocols = []
418 for h in self.protocol_handlers:
419 protocols.extend(h.get_supported())
420 if not protocols:
421 return 0, "ERROR: no available protocol handlers", 0
422
423 _logger.debug("connect[%s]: calling requestTopic(%s, %s, %s)", topic, caller_id, topic, str(protocols))
424
425
426
427
428
429
430
431
432 socket.setdefaulttimeout(60.)
433 tries = 0
434 max_num_tries = 3
435 success = False
436 while not success:
437 tries += 1
438 try:
439 code, msg, result = \
440 xmlrpcapi(pub_uri).requestTopic(caller_id, topic, protocols)
441 success = True
442 except Exception, e:
443 if tries >= max_num_tries:
444 return 0, "unable to requestTopic: %s"%str(e), 0
445 else:
446 _logger.debug("Retrying for %s" % topic)
447
448
449 if code <= 0:
450 _logger.debug("connect[%s]: requestTopic did not succeed %s, %s", pub_uri, code, msg)
451 return code, msg, 0
452 elif not result or type(protocols) != list:
453 return 0, "ERROR: publisher returned invalid protocol choice: %s"%(str(result)), 0
454 _logger.debug("connect[%s]: requestTopic returned protocol list %s", topic, result)
455 protocol = result[0]
456 for h in self.protocol_handlers:
457 if h.supports(protocol):
458 return h.create_transport(topic, pub_uri, result)
459 return 0, "ERROR: publisher returned unsupported protocol choice: %s"%result, 0
460
461 @apivalidate(-1, (global_name('parameter_key'), None))
462 - def paramUpdate(self, caller_id, parameter_key, parameter_value):
463 """
464 Callback from master of current publisher list for specified topic.
465 @param caller_id: ROS caller id
466 @type caller_id: str
467 @param parameter_key str: parameter name, globally resolved
468 @type parameter_key: str
469 @param parameter_value New parameter value
470 @type parameter_value: XMLRPC-legal value
471 @return: [code, status, ignore]. If code is -1 ERROR, the node
472 is not subscribed to parameter_key
473 @rtype: [int, str, int]
474 """
475 try:
476 get_param_server_cache().update(parameter_key, parameter_value)
477 return 1, '', 0
478 except KeyError:
479 return -1, 'not subscribed', 0
480
481 @apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
483 """
484 Callback from master of current publisher list for specified topic.
485 @param caller_id: ROS caller id
486 @type caller_id: str
487 @param topic str: topic name
488 @type topic: str
489 @param publishers: list of current publishers for topic in the form of XMLRPC URIs
490 @type publishers: [str]
491 @return: [code, status, ignore]
492 @rtype: [int, str, int]
493 """
494 if self.reg_man:
495 for uri in publishers:
496 self.reg_man.publisher_update(topic, publishers)
497 return 1, "", 0
498
499 _remap_table['requestTopic'] = [0]
500 @apivalidate([], (is_topic('topic'), non_empty('protocols')))
502 """
503 Publisher node API method called by a subscriber node.
504
505 Request that source allocate a channel for communication. Subscriber provides
506 a list of desired protocols for communication. Publisher returns the
507 selected protocol along with any additional params required for
508 establishing connection. For example, for a TCP/IP-based connection,
509 the source node may return a port number of TCP/IP server.
510 @param caller_id str: ROS caller id
511 @type caller_id: str
512 @param topic: topic name
513 @type topic: str
514 @param protocols: list of desired
515 protocols for communication in order of preference. Each
516 protocol is a list of the form [ProtocolName,
517 ProtocolParam1, ProtocolParam2...N]
518 @type protocols: [[str, XmlRpcLegalValue*]]
519 @return: [code, msg, protocolParams]. protocolParams may be an
520 empty list if there are no compatible protocols.
521 @rtype: [int, str, [str, XmlRpcLegalValue*]]
522 """
523 if not get_topic_manager().has_publication(topic):
524 return -1, "Not a publisher of [%s]"%topic, []
525 for protocol in protocols:
526 protocol_id = protocol[0]
527 for h in self.protocol_handlers:
528 if h.supports(protocol_id):
529 _logger.debug("requestTopic[%s]: choosing protocol %s", topic, protocol_id)
530 return h.init_publisher(topic, protocol)
531 return 0, "no supported protocol implementations", []
532