1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """Internal use: Service-specific extensions for TCPROS support"""
34
35 import io
36 import socket
37 import struct
38 import sys
39 import logging
40
41 import time
42 import traceback
43
44 import genpy
45
46 import rosgraph
47 import rosgraph.names
48 import rosgraph.network
49
50 from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException
51 from rospy.service import _Service, ServiceException
52
53 from rospy.impl.registration import get_service_manager
54 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
55 get_tcpros_server_address, start_tcpros_server, recv_buff, \
56 DEFAULT_BUFF_SIZE
57
58 from rospy.core import logwarn, loginfo, logerr, logdebug
59 import rospy.core
60 import rospy.msg
61 import rospy.names
62
63 import rospy.impl.validators
64
65 import threading
66
67 if sys.hexversion > 0x03000000:
69 return isinstance(s, str)
70 else:
72 return isinstance(s, basestring)
73
74 logger = logging.getLogger('rospy.service')
75
76
77
78
80 """
81 Blocks until service is available. Use this in
82 initialization code if your program depends on a
83 service already running.
84 @param service: name of service
85 @type service: str
86 @param timeout: timeout time in seconds, None for no
87 timeout. NOTE: timeout=0 is invalid as wait_for_service actually
88 contacts the service, so non-blocking behavior is not
89 possible. For timeout=0 uses cases, just call the service without
90 waiting.
91 @type timeout: double
92 @raise ROSException: if specified timeout is exceeded
93 @raise ROSInterruptException: if shutdown interrupts wait
94 """
95 master = rosgraph.Master(rospy.names.get_caller_id())
96 def contact_service(resolved_name, timeout=10.0):
97 try:
98 uri = master.lookupService(resolved_name)
99 except rosgraph.MasterException:
100 return False
101
102 addr = rospy.core.parse_rosrpc_uri(uri)
103 if rosgraph.network.use_ipv6():
104 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
105 else:
106 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
107 try:
108
109
110 s.settimeout(timeout)
111 logdebug('connecting to ' + str(addr))
112 s.connect(addr)
113 h = { 'probe' : '1', 'md5sum' : '*',
114 'callerid' : rospy.core.get_caller_id(),
115 'service': resolved_name }
116 rosgraph.network.write_ros_handshake_header(s, h)
117 return uri
118 finally:
119 if s is not None:
120 s.close()
121 if timeout == 0.:
122 raise ValueError("timeout must be non-zero")
123 resolved_name = rospy.names.resolve_name(service)
124 contact_failed = False
125 if timeout:
126 timeout_t = time.time() + timeout
127 while not rospy.core.is_shutdown() and time.time() < timeout_t:
128 try:
129 uri = contact_service(resolved_name, timeout_t - time.time())
130 if uri:
131 if contact_failed:
132 rospy.core.loginfo("wait_for_service(%s): finally were able to contact [%s]"%(resolved_name, uri))
133 return
134 except KeyboardInterrupt:
135
136 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
137 raise
138 except:
139 contact_failed = True
140 rospy.core.logwarn_throttle(10, "wait_for_service(%s): failed to contact, will keep trying"%resolved_name)
141 time.sleep(0.3)
142 if rospy.core.is_shutdown():
143 raise ROSInterruptException("rospy shutdown")
144 else:
145 raise ROSException("timeout exceeded while waiting for service %s"%resolved_name)
146 else:
147 while not rospy.core.is_shutdown():
148 try:
149 uri = contact_service(resolved_name)
150 if uri:
151 if contact_failed:
152 rospy.core.loginfo("wait_for_service(%s): finally were able to contact [%s]"%(resolved_name, uri))
153 return
154 except KeyboardInterrupt:
155
156 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
157 raise
158 except:
159 contact_failed = True
160 rospy.core.logwarn_throttle(10, "wait_for_service(%s): failed to contact, will keep trying"%resolved_name)
161 time.sleep(0.3)
162 if rospy.core.is_shutdown():
163 raise ROSInterruptException("rospy shutdown")
164
165
167 """
168 Convert return value of function to response instance. The
169 rules/precedence for this are:
170
171 1. If the return type is the same as the response type, no conversion
172 is done.
173
174 2. If the return type is a dictionary, it is used as a keyword-style
175 initialization for a new response instance.
176
177 3. If the return type is *not* a list type, it is passed in as a single arg
178 to a new response instance.
179
180 4. If the return type is a list/tuple type, it is used as a args-style
181 initialization for a new response instance.
182 """
183
184
185
186
187 if isinstance(response, genpy.Message) and response._type == response_class._type:
188
189 return response
190 elif type(response) == dict:
191
192 try:
193 return response_class(**response)
194 except AttributeError as e:
195 raise ServiceException("handler returned invalid value: %s"%str(e))
196 elif response == None:
197 raise ServiceException("service handler returned None")
198 elif type(response) not in [list, tuple]:
199
200 try:
201 return response_class(response)
202 except TypeError as e:
203 raise ServiceException("handler returned invalid value: %s"%str(e))
204 else:
205
206
207 try:
208 return response_class(*response)
209 except TypeError as e:
210 raise ServiceException("handler returned wrong number of values: %s"%str(e))
211
213 """
214 Process incoming service connection. For use with
215 TCPROSServer. Reads in service name from handshake and creates the
216 appropriate service handler for the connection.
217 @param sock: socket connection
218 @type sock: socket
219 @param client_addr: client address
220 @type client_addr: (str, int)
221 @param header: key/value pairs from handshake header
222 @type header: dict
223 @return: error string or None
224 @rtype: str
225 """
226 for required in ['service', 'md5sum', 'callerid']:
227 if not required in header:
228 return "Missing required '%s' field"%required
229 else:
230 logger.debug("connection from %s:%s", client_addr[0], client_addr[1])
231 service_name = header['service']
232
233
234
235
236
237
238 sm = get_service_manager()
239 md5sum = header['md5sum']
240 service = sm.get_service(service_name)
241 if not service:
242 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name)
243 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum:
244 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum)
245 else:
246 transport = TCPROSTransport(service.protocol, service_name, header=header)
247 transport.set_socket(sock, header['callerid'])
248 transport.write_header()
249
250
251 t = threading.Thread(target=service.handle, args=(transport, header))
252 t.daemon = True
253 t.start()
254
255
257 """
258 Protocol implementation for Services over TCPROS
259 """
260
262 """
263 ctor.
264 @param resolved_name: name of service
265 @type resolved_name: str
266 @param service_class: Service data type class
267 @type service_class: Service
268 @param buff_size int: size of buffer (bytes) to use for reading incoming requests.
269 @type buff_size: int
270 """
271 super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size)
272 self.service_class = service_class
273
275 """
276 Protocol API
277 @return: header fields
278 @rtype: dict
279 """
280 return {'service': self.resolved_name, 'type': self.service_class._type,
281 'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() }
282
284 """Protocol Implementation for Service clients over TCPROS"""
285
287 """
288 ctor.
289 @param resolved_name: resolved service name
290 @type resolved_name: str
291 @param service_class: Service data type class
292 @type service_class: Service
293 @param headers: identifier for Service session
294 @type headers: dict
295 @param buff_size: size of buffer (bytes) for reading responses from Service.
296 @type buff_size: int
297 """
298 super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class)
299 self.service_class = service_class
300 self.headers = headers or {}
301 self.buff_size = buff_size
302
304 """
305 TCPROSTransportProtocol API
306 """
307 headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum,
308 'callerid': rospy.names.get_caller_id()}
309
310
311
312 for k, v in self.headers.items():
313 headers[k] = v
314 return headers
315
317 """
318 Utility for reading the OK-byte/error-message header preceding each message.
319 @param sock: socket connection. Will be read from if OK byte is
320 false and error message needs to be read
321 @type sock: socket.socket
322 @param b: buffer to read from
323 @type b: StringIO
324 """
325 if b.tell() == 0:
326 return
327 pos = b.tell()
328 b.seek(0)
329 ok = struct.unpack('<B', b.read(1))[0]
330 b.seek(pos)
331 if not ok:
332 str = self._read_service_error(sock, b)
333
334
335
336
337
338 b.seek(0)
339 b.truncate(0)
340 raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str))
341 else:
342
343 b.seek(pos)
344
346 """
347 In service implementation, reads in OK byte that preceeds each
348 response. The OK byte allows for the passing of error messages
349 instead of a response message
350 @param b: buffer
351 @type b: StringIO
352 @param msg_queue: Message queue to append to
353 @type msg_queue: [Message]
354 @param sock: socket to read from
355 @type sock: socket.socket
356 """
357 self._read_ok_byte(b, sock)
358 rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)
359
360
361
362 if b.tell() == 1:
363 b.seek(0)
364
366 """
367 Read service error from sock
368 @param sock: socket to read from
369 @type sock: socket
370 @param b: currently read data from sock
371 @type b: StringIO
372 """
373 buff_size = 256
374 while b.tell() < 5:
375 recv_buff(sock, b, buff_size)
376 bval = b.getvalue()
377 (length,) = struct.unpack('<I', bval[1:5])
378 while b.tell() < (5 + length):
379 recv_buff(sock, b, buff_size)
380 bval = b.getvalue()
381 return struct.unpack('<%ss'%length, bval[5:5+length])[0]
382
383
385 """
386 Create a handle to a ROS service for invoking calls.
387
388 Usage::
389 add_two_ints = ServiceProxy('add_two_ints', AddTwoInts)
390 resp = add_two_ints(1, 2)
391 """
392
393 - def __init__(self, name, service_class, persistent=False, headers=None):
394 """
395 ctor.
396 @param name: name of service to call
397 @type name: str
398 @param service_class: auto-generated service class
399 @type service_class: Service class
400 @param persistent: (optional) if True, proxy maintains a persistent
401 connection to service. While this results in better call
402 performance, persistent connections are discouraged as they are
403 less resistent to network issues and service restarts.
404 @type persistent: bool
405 @param headers: (optional) arbitrary headers
406 @type headers: dict
407 """
408 super(ServiceProxy, self).__init__(name, service_class)
409 self.uri = None
410 self.seq = 0
411 self.buff_size = DEFAULT_BUFF_SIZE
412 self.persistent = persistent
413 if persistent:
414 if not headers:
415 headers = {}
416 headers['persistent'] = '1'
417 self.protocol = TCPROSServiceClient(self.resolved_name,
418 self.service_class, headers=headers)
419 self.transport = None
420
423
424
426 """
427 Callable-style version of the service API. This accepts either a request message instance,
428 or you can call directly with arguments to create a new request instance. e.g.::
429
430 add_two_ints(AddTwoIntsRequest(1, 2))
431 add_two_ints(1, 2)
432 add_two_ints(a=1, b=2)
433
434 @param args: arguments to remote service
435 @param kwds: message keyword arguments
436 @raise ROSSerializationException: If unable to serialize
437 message. This is usually a type error with one of the fields.
438 """
439 return self.call(*args, **kwds)
440
442 """
443 private routine for getting URI of service to call
444 @param request: request message
445 @type request: L{rospy.Message}
446 """
447 if not isinstance(request, genpy.Message):
448 raise TypeError("request object is not a valid request message instance")
449
450
451
452
453 if not self.request_class._type == request._type:
454 raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class))
455
456
457
458 if 1:
459 try:
460 try:
461 master = rosgraph.Master(rospy.names.get_caller_id())
462 self.uri = master.lookupService(self.resolved_name)
463 except socket.error:
464 raise ServiceException("unable to contact master")
465 except rosgraph.MasterError as e:
466 logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, str(e))
467 raise ServiceException("service [%s] unavailable"%self.resolved_name)
468
469
470 try:
471 rospy.core.parse_rosrpc_uri(self.uri)
472 except rospy.impl.validators.ParameterInvalid:
473 raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri)
474 except socket.error as e:
475 logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name)
476 return self.uri
477
478 - def call(self, *args, **kwds):
479 """
480 Call the service. This accepts either a request message instance,
481 or you can call directly with arguments to create a new request instance. e.g.::
482
483 add_two_ints(AddTwoIntsRequest(1, 2))
484 add_two_ints(1, 2)
485 add_two_ints(a=1, b=2)
486
487 @raise TypeError: if request is not of the valid type (Message)
488 @raise ServiceException: if communication with remote service fails
489 @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call
490 @raise ROSSerializationException: If unable to serialize
491 message. This is usually a type error with one of the fields.
492 """
493
494
495 request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)
496
497
498 if self.transport is None:
499 service_uri = self._get_service_uri(request)
500 dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri)
501
502
503 transport = TCPROSTransport(self.protocol, self.resolved_name)
504 transport.buff_size = self.buff_size
505 try:
506 transport.connect(dest_addr, dest_port, service_uri)
507 except TransportInitError as e:
508
509 raise ServiceException("unable to connect to service: %s"%e)
510 self.transport = transport
511 else:
512 transport = self.transport
513
514
515 self.seq += 1
516 transport.send_message(request, self.seq)
517
518 try:
519 responses = transport.receive_once()
520 if len(responses) == 0:
521 raise ServiceException("service [%s] returned no response"%self.resolved_name)
522 elif len(responses) > 1:
523 raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses)))
524 except rospy.exceptions.TransportException as e:
525
526 if rospy.core.is_shutdown():
527 raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call")
528 else:
529 raise ServiceException("transport error completing service call: %s"%(str(e)))
530 finally:
531 if not self.persistent:
532 transport.close()
533 self.transport = None
534 return responses[0]
535
536
538 """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances."""
539 if self.transport is not None:
540 self.transport.close()
541
543 """
544 Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class.
545 """
546
548 super(ServiceImpl, self).__init__(name, service_class)
549
550 if not name or not isstring(name):
551 raise ValueError("service name is not a non-empty string")
552
553 if not rosgraph.names.is_legal_name(name):
554 import warnings
555 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
556
557
558 self.handler = handler
559 if error_handler is not None:
560 self.error_handler = error_handler
561 self.registered = False
562 self.seq = 0
563 self.done = False
564 self.buff_size=buff_size
565
566 start_tcpros_server()
567 host, port = get_tcpros_server_address()
568 self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port)
569 logdebug("... service URL is %s"%self.uri)
570
571 self.protocol = TCPService(self.resolved_name, service_class, self.buff_size)
572
573 logdebug("[%s]: new Service instance"%self.resolved_name)
574
575
576
578 logerr("Error processing request: %s\n%s" % (e, traceback.format_exception(exc_type, exc_value, tb)))
579
581 """
582 Stop this service
583 @param reason: human-readable shutdown reason
584 @type reason: str
585 """
586 self.done = True
587 logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason))
588 try:
589
590 get_service_manager().unregister(self.resolved_name, self)
591 except Exception as e:
592 logerr("Unable to unregister with master: "+traceback.format_exc())
593 raise ServiceException("Unable to connect to master: %s"%e)
594
596 """
597 Let service run and take over thread until service or node
598 shutdown. Use this method to keep your scripts from exiting
599 execution.
600 """
601 try:
602 while not rospy.core.is_shutdown() and not self.done:
603 time.sleep(0.5)
604 except KeyboardInterrupt:
605 logdebug("keyboard interrupt, shutting down")
606
608 """
609 Send error message to client
610 @param transport: transport connection to client
611 @type transport: Transport
612 @param err_msg: error message to send to client
613 @type err_msg: str
614 """
615 if sys.hexversion > 0x03000000:
616 err_msg = bytes(err_msg, 'utf-8')
617 transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg))
618
620 """
621 Process a single incoming request.
622 @param transport: transport instance
623 @type transport: L{TCPROSTransport}
624 @param request: Message
625 @type request: genpy.Message
626 """
627 try:
628
629 response = convert_return_to_response(self.handler(request), self.response_class)
630 self.seq += 1
631
632 transport.write_buff.write(struct.pack('<B', 1))
633 transport.send_message(response, self.seq)
634 except ServiceException as e:
635 rospy.core.rospydebug("handler raised ServiceException: %s"%(e))
636 self._write_service_error(transport, "service cannot process request: %s"%e)
637 except Exception as e:
638 (exc_type, exc_value, tb) = sys.exc_info()
639 self.error_handler(e, exc_type, exc_value, tb)
640 self._write_service_error(transport, "error processing request: %s"%e)
641
642 - def handle(self, transport, header):
643 """
644 Process incoming request. This method should be run in its
645 own thread. If header['persistent'] is set to 1, method will
646 block until connection is broken.
647 @param transport: transport instance
648 @type transport: L{TCPROSTransport}
649 @param header: headers from client
650 @type header: dict
651 """
652 if 'persistent' in header and \
653 header['persistent'].lower() in ['1', 'true']:
654 persistent = True
655 else:
656 persistent = False
657 if header.get('probe', None) == '1':
658
659 transport.close()
660 return
661 handle_done = False
662 while not handle_done:
663 try:
664 requests = transport.receive_once()
665 for request in requests:
666 self._handle_request(transport, request)
667 if not persistent:
668 handle_done = True
669 except rospy.exceptions.TransportTerminated as e:
670 if not persistent:
671 logerr("incoming connection failed: %s"%e)
672 logdebug("service[%s]: transport terminated"%self.resolved_name)
673 handle_done = True
674 transport.close()
675
676
678 """
679 Declare a ROS service. Service requests are passed to the
680 specified handler.
681
682 Service Usage::
683 s = Service('getmapservice', GetMap, get_map_handler)
684 """
685
688 """
689 ctor.
690
691 @param name: service name, ``str``
692 @param service_class: Service definition class
693
694 @param handler: callback function for processing service
695 request. Function takes in a ServiceRequest and returns a
696 ServiceResponse of the appropriate type. Function may also
697 return a list, tuple, or dictionary with arguments to initialize
698 a ServiceResponse instance of the correct type.
699
700 If handler cannot process request, it may either return None,
701 to indicate failure, or it may raise a rospy.ServiceException
702 to send a specific error message to the client. Returning None
703 is always considered a failure.
704
705 @type handler: fn(req)->resp
706
707 @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message
708 @type buff_size: int
709
710 @param error_handler: callback function for handling errors
711 raised in the service code.
712 @type error_handler: fn(exception, exception_type, exception_value, traceback)->None
713 """
714 super(Service, self).__init__(name, service_class, handler, buff_size,
715 error_handler)
716
717
718 get_service_manager().register(self.resolved_name, self)
719