1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """Internal use: Service-specific extensions for TCPROS support"""
34
35 import io
36 import socket
37 import struct
38 import sys
39 import logging
40
41 import time
42 import traceback
43
44 import genpy
45
46 import rosgraph
47 import rosgraph.names
48 import rosgraph.network
49
50 from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException
51 from rospy.service import _Service, ServiceException
52
53 from rospy.impl.registration import get_service_manager
54 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
55 get_tcpros_server_address, start_tcpros_server, recv_buff, \
56 DEFAULT_BUFF_SIZE
57
58 from rospy.core import logwarn, loginfo, logerr, logdebug
59 import rospy.core
60 import rospy.msg
61 import rospy.names
62
63 import rospy.impl.validators
64
65 import threading
66
67 if sys.hexversion > 0x03000000:
69 return isinstance(s, str)
70 else:
72 return isinstance(s, basestring)
73
74 logger = logging.getLogger('rospy.service')
75
76
77
78
80 """
81 Blocks until service is available. Use this in
82 initialization code if your program depends on a
83 service already running.
84 @param service: name of service
85 @type service: str
86 @param timeout: timeout time in seconds or Duration, None for no
87 timeout. NOTE: timeout=0 is invalid as wait_for_service actually
88 contacts the service, so non-blocking behavior is not
89 possible. For timeout=0 uses cases, just call the service without
90 waiting.
91 @type timeout: double|rospy.Duration
92 @note roscpp waitForService() has timeout specified in millisecs.
93 @raise ROSException: if specified timeout is exceeded
94 @raise ROSInterruptException: if shutdown interrupts wait
95 """
96 master = rosgraph.Master(rospy.names.get_caller_id())
97 def contact_service(resolved_name, timeout=10.0):
98 try:
99 uri = master.lookupService(resolved_name)
100 except rosgraph.MasterException:
101 return False
102
103 addr = rospy.core.parse_rosrpc_uri(uri)
104 if rosgraph.network.use_ipv6():
105 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
106 else:
107 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
108 try:
109
110
111 s.settimeout(timeout)
112 logdebug('connecting to ' + str(addr))
113 s.connect(addr)
114 h = { 'probe' : '1', 'md5sum' : '*',
115 'callerid' : rospy.core.get_caller_id(),
116 'service': resolved_name }
117 rosgraph.network.write_ros_handshake_header(s, h)
118 return uri
119 finally:
120 if s is not None:
121 s.close()
122 if timeout is not None and isinstance(timeout, rospy.Duration):
123 timeout = timeout.to_sec()
124 if timeout == 0.:
125 raise ValueError("timeout must be non-zero")
126 resolved_name = rospy.names.resolve_name(service)
127 contact_failed = False
128 if timeout:
129 timeout_t = time.time() + timeout
130 while not rospy.core.is_shutdown() and time.time() < timeout_t:
131 try:
132 uri = contact_service(resolved_name, timeout_t - time.time())
133 if uri:
134 if contact_failed:
135 rospy.core.loginfo("wait_for_service(%s): finally were able to contact [%s]"%(resolved_name, uri))
136 return
137 except KeyboardInterrupt:
138
139 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
140 raise
141 except:
142 contact_failed = True
143 rospy.core.logwarn_throttle(10, "wait_for_service(%s): failed to contact, will keep trying"%resolved_name)
144 time.sleep(0.3)
145 if rospy.core.is_shutdown():
146 raise ROSInterruptException("rospy shutdown")
147 else:
148 raise ROSException("timeout exceeded while waiting for service %s"%resolved_name)
149 else:
150 while not rospy.core.is_shutdown():
151 try:
152 uri = contact_service(resolved_name)
153 if uri:
154 if contact_failed:
155 rospy.core.loginfo("wait_for_service(%s): finally were able to contact [%s]"%(resolved_name, uri))
156 return
157 except KeyboardInterrupt:
158
159 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
160 raise
161 except:
162 contact_failed = True
163 rospy.core.logwarn_throttle(10, "wait_for_service(%s): failed to contact, will keep trying"%resolved_name)
164 time.sleep(0.3)
165 if rospy.core.is_shutdown():
166 raise ROSInterruptException("rospy shutdown")
167
168
170 """
171 Convert return value of function to response instance. The
172 rules/precedence for this are:
173
174 1. If the return type is the same as the response type, no conversion
175 is done.
176
177 2. If the return type is a dictionary, it is used as a keyword-style
178 initialization for a new response instance.
179
180 3. If the return type is *not* a list type, it is passed in as a single arg
181 to a new response instance.
182
183 4. If the return type is a list/tuple type, it is used as a args-style
184 initialization for a new response instance.
185 """
186
187
188
189
190 if isinstance(response, genpy.Message) and response._type == response_class._type:
191
192 return response
193 elif type(response) == dict:
194
195 try:
196 return response_class(**response)
197 except AttributeError as e:
198 raise ServiceException("handler returned invalid value: %s"%str(e))
199 elif response == None:
200 raise ServiceException("service handler returned None")
201 elif type(response) not in [list, tuple]:
202
203 try:
204 return response_class(response)
205 except TypeError as e:
206 raise ServiceException("handler returned invalid value: %s"%str(e))
207 else:
208
209
210 try:
211 return response_class(*response)
212 except TypeError as e:
213 raise ServiceException("handler returned wrong number of values: %s"%str(e))
214
216 """
217 Process incoming service connection. For use with
218 TCPROSServer. Reads in service name from handshake and creates the
219 appropriate service handler for the connection.
220 @param sock: socket connection
221 @type sock: socket
222 @param client_addr: client address
223 @type client_addr: (str, int)
224 @param header: key/value pairs from handshake header
225 @type header: dict
226 @return: error string or None
227 @rtype: str
228 """
229 for required in ['service', 'md5sum', 'callerid']:
230 if not required in header:
231 return "Missing required '%s' field"%required
232 else:
233 logger.debug("connection from %s:%s", client_addr[0], client_addr[1])
234 service_name = header['service']
235
236
237
238
239
240
241 sm = get_service_manager()
242 md5sum = header['md5sum']
243 service = sm.get_service(service_name)
244 if not service:
245 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name)
246 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum:
247 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum)
248 else:
249 transport = TCPROSTransport(service.protocol, service_name, header=header)
250 transport.set_socket(sock, header['callerid'])
251 transport.write_header()
252
253
254 t = threading.Thread(target=service.handle, args=(transport, header))
255 t.daemon = True
256 t.start()
257
258
260 """
261 Protocol implementation for Services over TCPROS
262 """
263
265 """
266 ctor.
267 @param resolved_name: name of service
268 @type resolved_name: str
269 @param service_class: Service data type class
270 @type service_class: Service
271 @param buff_size int: size of buffer (bytes) to use for reading incoming requests.
272 @type buff_size: int
273 """
274 super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size)
275 self.service_class = service_class
276
278 """
279 Protocol API
280 @return: header fields
281 @rtype: dict
282 """
283 return {'service': self.resolved_name, 'type': self.service_class._type,
284 'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() }
285
287 """Protocol Implementation for Service clients over TCPROS"""
288
290 """
291 ctor.
292 @param resolved_name: resolved service name
293 @type resolved_name: str
294 @param service_class: Service data type class
295 @type service_class: Service
296 @param headers: identifier for Service session
297 @type headers: dict
298 @param buff_size: size of buffer (bytes) for reading responses from Service.
299 @type buff_size: int
300 """
301 super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class)
302 self.service_class = service_class
303 self.headers = headers or {}
304 self.buff_size = buff_size
305
307 """
308 TCPROSTransportProtocol API
309 """
310 headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum,
311 'callerid': rospy.names.get_caller_id()}
312
313
314
315 for k, v in self.headers.items():
316 headers[k] = v
317 return headers
318
320 """
321 Utility for reading the OK-byte/error-message header preceding each message.
322 @param sock: socket connection. Will be read from if OK byte is
323 false and error message needs to be read
324 @type sock: socket.socket
325 @param b: buffer to read from
326 @type b: StringIO
327 """
328 if b.tell() == 0:
329 return
330 pos = b.tell()
331 b.seek(0)
332 ok = struct.unpack('<B', b.read(1))[0]
333 b.seek(pos)
334 if not ok:
335 str = self._read_service_error(sock, b)
336
337
338
339
340
341 b.seek(0)
342 b.truncate(0)
343 raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str))
344 else:
345
346 b.seek(pos)
347
349 """
350 In service implementation, reads in OK byte that precedes each
351 response. The OK byte allows for the passing of error messages
352 instead of a response message
353 @param b: buffer
354 @type b: StringIO
355 @param msg_queue: Message queue to append to
356 @type msg_queue: [Message]
357 @param sock: socket to read from
358 @type sock: socket.socket
359 """
360 self._read_ok_byte(b, sock)
361 rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)
362
363
364
365 if b.tell() == 1:
366 b.seek(0)
367
369 """
370 Read service error from sock
371 @param sock: socket to read from
372 @type sock: socket
373 @param b: currently read data from sock
374 @type b: StringIO
375 """
376 buff_size = 256
377 while b.tell() < 5:
378 recv_buff(sock, b, buff_size)
379 bval = b.getvalue()
380 (length,) = struct.unpack('<I', bval[1:5])
381 while b.tell() < (5 + length):
382 recv_buff(sock, b, buff_size)
383 bval = b.getvalue()
384 return struct.unpack('<%ss'%length, bval[5:5+length])[0]
385
386
388 """
389 Create a handle to a ROS service for invoking calls.
390
391 Usage::
392 add_two_ints = ServiceProxy('add_two_ints', AddTwoInts)
393 resp = add_two_ints(1, 2)
394 """
395
396 - def __init__(self, name, service_class, persistent=False, headers=None):
397 """
398 ctor.
399 @param name: name of service to call
400 @type name: str
401 @param service_class: auto-generated service class
402 @type service_class: Service class
403 @param persistent: (optional) if True, proxy maintains a persistent
404 connection to service. While this results in better call
405 performance, persistent connections are discouraged as they are
406 less resistant to network issues and service restarts.
407 @type persistent: bool
408 @param headers: (optional) arbitrary headers
409 @type headers: dict
410 """
411 super(ServiceProxy, self).__init__(name, service_class)
412 self.uri = None
413 self.seq = 0
414 self.buff_size = DEFAULT_BUFF_SIZE
415 self.persistent = persistent
416 if persistent:
417 if not headers:
418 headers = {}
419 headers['persistent'] = '1'
420 self.protocol = TCPROSServiceClient(self.resolved_name,
421 self.service_class, headers=headers)
422 self.transport = None
423
426
427
429 """
430 Callable-style version of the service API. This accepts either a request message instance,
431 or you can call directly with arguments to create a new request instance. e.g.::
432
433 add_two_ints(AddTwoIntsRequest(1, 2))
434 add_two_ints(1, 2)
435 add_two_ints(a=1, b=2)
436
437 @param args: arguments to remote service
438 @param kwds: message keyword arguments
439 @raise ROSSerializationException: If unable to serialize
440 message. This is usually a type error with one of the fields.
441 """
442 return self.call(*args, **kwds)
443
445 """
446 private routine for getting URI of service to call
447 @param request: request message
448 @type request: L{rospy.Message}
449 """
450 if not isinstance(request, genpy.Message):
451 raise TypeError("request object is not a valid request message instance")
452
453
454
455
456 if not self.request_class._type == request._type:
457 raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class))
458
459
460
461 if 1:
462 try:
463 try:
464 master = rosgraph.Master(rospy.names.get_caller_id())
465 self.uri = master.lookupService(self.resolved_name)
466 except socket.error:
467 raise ServiceException("unable to contact master")
468 except rosgraph.MasterError as e:
469 logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, str(e))
470 raise ServiceException("service [%s] unavailable"%self.resolved_name)
471
472
473 try:
474 rospy.core.parse_rosrpc_uri(self.uri)
475 except rospy.impl.validators.ParameterInvalid:
476 raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri)
477 except socket.error as e:
478 logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name)
479 return self.uri
480
481 - def call(self, *args, **kwds):
482 """
483 Call the service. This accepts either a request message instance,
484 or you can call directly with arguments to create a new request instance. e.g.::
485
486 add_two_ints(AddTwoIntsRequest(1, 2))
487 add_two_ints(1, 2)
488 add_two_ints(a=1, b=2)
489
490 @raise TypeError: if request is not of the valid type (Message)
491 @raise ServiceException: if communication with remote service fails
492 @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call
493 @raise ROSSerializationException: If unable to serialize
494 message. This is usually a type error with one of the fields.
495 """
496
497
498 request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)
499
500
501 if self.transport is None:
502 service_uri = self._get_service_uri(request)
503 dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri)
504
505
506 transport = TCPROSTransport(self.protocol, self.resolved_name)
507 transport.buff_size = self.buff_size
508 try:
509 transport.connect(dest_addr, dest_port, service_uri)
510 except TransportInitError as e:
511
512 raise ServiceException("unable to connect to service: %s"%e)
513 if self.persistent:
514 self.transport = transport
515 else:
516 transport = self.transport
517
518
519 self.seq += 1
520 transport.send_message(request, self.seq)
521
522 try:
523 responses = transport.receive_once()
524 if len(responses) == 0:
525 raise ServiceException("service [%s] returned no response"%self.resolved_name)
526 elif len(responses) > 1:
527 raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses)))
528 except rospy.exceptions.TransportException as e:
529
530 if rospy.core.is_shutdown():
531 raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call")
532 else:
533 raise ServiceException("transport error completing service call: %s"%(str(e)))
534 finally:
535 if not self.persistent:
536 transport.close()
537 self.transport = None
538 return responses[0]
539
540
542 """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances."""
543 if self.transport is not None:
544 self.transport.close()
545
547 """
548 Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class.
549 """
550
552 super(ServiceImpl, self).__init__(name, service_class)
553
554 if not name or not isstring(name):
555 raise ValueError("service name is not a non-empty string")
556
557 if not rosgraph.names.is_legal_name(name):
558 import warnings
559 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
560
561
562 self.handler = handler
563 if error_handler is not None:
564 self.error_handler = error_handler
565 self.registered = False
566 self.seq = 0
567 self.done = False
568 self.buff_size=buff_size
569
570 start_tcpros_server()
571 host, port = get_tcpros_server_address()
572 self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port)
573 logdebug("... service URL is %s"%self.uri)
574
575 self.protocol = TCPService(self.resolved_name, service_class, self.buff_size)
576
577 logdebug("[%s]: new Service instance"%self.resolved_name)
578
579
580
582 logerr("Error processing request: %s\n%s" % (e, traceback.format_exception(exc_type, exc_value, tb)))
583
585 """
586 Stop this service
587 @param reason: human-readable shutdown reason
588 @type reason: str
589 """
590 self.done = True
591 logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason))
592 try:
593
594 get_service_manager().unregister(self.resolved_name, self)
595 except Exception as e:
596 logerr("Unable to unregister with master: "+traceback.format_exc())
597 raise ServiceException("Unable to connect to master: %s"%e)
598
600 """
601 Let service run and take over thread until service or node
602 shutdown. Use this method to keep your scripts from exiting
603 execution.
604 """
605 try:
606 while not rospy.core.is_shutdown() and not self.done:
607 time.sleep(0.5)
608 except KeyboardInterrupt:
609 logdebug("keyboard interrupt, shutting down")
610
612 """
613 Send error message to client
614 @param transport: transport connection to client
615 @type transport: Transport
616 @param err_msg: error message to send to client
617 @type err_msg: str
618 """
619 if sys.hexversion > 0x03000000:
620 err_msg = bytes(err_msg, 'utf-8')
621 transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg))
622
624 """
625 Process a single incoming request.
626 @param transport: transport instance
627 @type transport: L{TCPROSTransport}
628 @param request: Message
629 @type request: genpy.Message
630 """
631 try:
632
633 response = convert_return_to_response(self.handler(request), self.response_class)
634 self.seq += 1
635
636 transport.write_buff.write(struct.pack('<B', 1))
637 transport.send_message(response, self.seq)
638 except ServiceException as e:
639 rospy.core.rospydebug("handler raised ServiceException: %s"%(e))
640 self._write_service_error(transport, "service cannot process request: %s"%e)
641 except Exception as e:
642 (exc_type, exc_value, tb) = sys.exc_info()
643 self.error_handler(e, exc_type, exc_value, tb)
644 self._write_service_error(transport, "error processing request: %s"%e)
645
646 - def handle(self, transport, header):
647 """
648 Process incoming request. This method should be run in its
649 own thread. If header['persistent'] is set to 1, method will
650 block until connection is broken.
651 @param transport: transport instance
652 @type transport: L{TCPROSTransport}
653 @param header: headers from client
654 @type header: dict
655 """
656 if 'persistent' in header and \
657 header['persistent'].lower() in ['1', 'true']:
658 persistent = True
659 else:
660 persistent = False
661 if header.get('probe', None) == '1':
662
663 transport.close()
664 return
665 handle_done = False
666 while not handle_done:
667 try:
668 requests = transport.receive_once()
669 for request in requests:
670 self._handle_request(transport, request)
671 if not persistent:
672 handle_done = True
673 except rospy.exceptions.TransportTerminated as e:
674 if not persistent:
675 logerr("incoming connection failed: %s"%e)
676 logdebug("service[%s]: transport terminated"%self.resolved_name)
677 handle_done = True
678 transport.close()
679
680
682 """
683 Declare a ROS service. Service requests are passed to the
684 specified handler.
685
686 Service Usage::
687 s = Service('getmapservice', GetMap, get_map_handler)
688 """
689
692 """
693 ctor.
694
695 @param name: service name, ``str``
696 @param service_class: Service definition class
697
698 @param handler: callback function for processing service
699 request. Function takes in a ServiceRequest and returns a
700 ServiceResponse of the appropriate type. Function may also
701 return a list, tuple, or dictionary with arguments to initialize
702 a ServiceResponse instance of the correct type.
703
704 If handler cannot process request, it may either return None,
705 to indicate failure, or it may raise a rospy.ServiceException
706 to send a specific error message to the client. Returning None
707 is always considered a failure.
708
709 @type handler: fn(req)->resp
710
711 @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message
712 @type buff_size: int
713
714 @param error_handler: callback function for handling errors
715 raised in the service code.
716 @type error_handler: fn(exception, exception_type, exception_value, traceback)->None
717 """
718 super(Service, self).__init__(name, service_class, handler, buff_size,
719 error_handler)
720
721
722 get_service_manager().register(self.resolved_name, self)
723