1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """Internal use: Service-specific extensions for TCPROS support"""
34
35 import io
36 import socket
37 import struct
38 import sys
39 import logging
40
41 import time
42 import traceback
43
44 import genpy
45
46 import rosgraph
47 import rosgraph.names
48 import rosgraph.network
49
50 from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException
51 from rospy.service import _Service, ServiceException
52
53 from rospy.impl.registration import get_service_manager
54 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
55 get_tcpros_server_address, start_tcpros_server, recv_buff, \
56 DEFAULT_BUFF_SIZE
57
58 from rospy.core import logwarn, loginfo, logerr, logdebug
59 import rospy.core
60 import rospy.msg
61 import rospy.names
62
63 import rospy.impl.validators
64
65 import threading
66
67 if sys.hexversion > 0x03000000:
69 return isinstance(s, str)
70 else:
72 return isinstance(s, basestring)
73
74 logger = logging.getLogger('rospy.service')
75
76
77
78
80 """
81 Blocks until service is available. Use this in
82 initialization code if your program depends on a
83 service already running.
84 @param service: name of service
85 @type service: str
86 @param timeout: timeout time in seconds, None for no
87 timeout. NOTE: timeout=0 is invalid as wait_for_service actually
88 contacts the service, so non-blocking behavior is not
89 possible. For timeout=0 uses cases, just call the service without
90 waiting.
91 @type timeout: double
92 @raise ROSException: if specified timeout is exceeded
93 @raise ROSInterruptException: if shutdown interrupts wait
94 """
95 master = rosgraph.Master(rospy.names.get_caller_id())
96 def contact_service(resolved_name, timeout=10.0):
97 try:
98 uri = master.lookupService(resolved_name)
99 except rosgraph.MasterException:
100 return False
101
102 addr = rospy.core.parse_rosrpc_uri(uri)
103 if rosgraph.network.use_ipv6():
104 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
105 else:
106 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
107 try:
108
109
110 s.settimeout(timeout)
111 logdebug('connecting to ' + str(addr))
112 s.connect(addr)
113 h = { 'probe' : '1', 'md5sum' : '*',
114 'callerid' : rospy.core.get_caller_id(),
115 'service': resolved_name }
116 rosgraph.network.write_ros_handshake_header(s, h)
117 return True
118 finally:
119 if s is not None:
120 s.close()
121 if timeout == 0.:
122 raise ValueError("timeout must be non-zero")
123 resolved_name = rospy.names.resolve_name(service)
124 first = False
125 if timeout:
126 timeout_t = time.time() + timeout
127 while not rospy.core.is_shutdown() and time.time() < timeout_t:
128 try:
129 if contact_service(resolved_name, timeout_t-time.time()):
130 return
131 except KeyboardInterrupt:
132
133 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
134 raise
135 except:
136 if first:
137 first = False
138 rospy.core.logerr("wait_for_service(%s): failed to contact, will keep trying"%(resolved_name))
139 time.sleep(0.3)
140 if rospy.core.is_shutdown():
141 raise ROSInterruptException("rospy shutdown")
142 else:
143 raise ROSException("timeout exceeded while waiting for service %s"%resolved_name)
144 else:
145 while not rospy.core.is_shutdown():
146 try:
147 if contact_service(resolved_name):
148 return
149 except KeyboardInterrupt:
150
151 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
152 raise
153 except:
154 if first:
155 first = False
156 rospy.core.logerr("wait_for_service(%s): failed to contact, will keep trying"%(resolved_name))
157 time.sleep(0.3)
158 if rospy.core.is_shutdown():
159 raise ROSInterruptException("rospy shutdown")
160
161
163 """
164 Convert return value of function to response instance. The
165 rules/precedence for this are:
166
167 1. If the return type is the same as the response type, no conversion
168 is done.
169
170 2. If the return type is a dictionary, it is used as a keyword-style
171 initialization for a new response instance.
172
173 3. If the return type is *not* a list type, it is passed in as a single arg
174 to a new response instance.
175
176 4. If the return type is a list/tuple type, it is used as a args-style
177 initialization for a new response instance.
178 """
179
180
181
182
183 if isinstance(response, genpy.Message) and response._type == response_class._type:
184
185 return response
186 elif type(response) == dict:
187
188 try:
189 return response_class(**response)
190 except AttributeError as e:
191 raise ServiceException("handler returned invalid value: %s"%str(e))
192 elif response == None:
193 raise ServiceException("service handler returned None")
194 elif type(response) not in [list, tuple]:
195
196 try:
197 return response_class(response)
198 except TypeError as e:
199 raise ServiceException("handler returned invalid value: %s"%str(e))
200 else:
201
202
203 try:
204 return response_class(*response)
205 except TypeError as e:
206 raise ServiceException("handler returned wrong number of values: %s"%str(e))
207
209 """
210 Process incoming service connection. For use with
211 TCPROSServer. Reads in service name from handshake and creates the
212 appropriate service handler for the connection.
213 @param sock: socket connection
214 @type sock: socket
215 @param client_addr: client address
216 @type client_addr: (str, int)
217 @param header: key/value pairs from handshake header
218 @type header: dict
219 @return: error string or None
220 @rtype: str
221 """
222 for required in ['service', 'md5sum', 'callerid']:
223 if not required in header:
224 return "Missing required '%s' field"%required
225 else:
226 logger.debug("connection from %s:%s", client_addr[0], client_addr[1])
227 service_name = header['service']
228
229
230
231
232
233
234 sm = get_service_manager()
235 md5sum = header['md5sum']
236 service = sm.get_service(service_name)
237 if not service:
238 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name)
239 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum:
240 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum)
241 else:
242 transport = TCPROSTransport(service.protocol, service_name, header=header)
243 transport.set_socket(sock, header['callerid'])
244 transport.write_header()
245
246
247 t = threading.Thread(target=service.handle, args=(transport, header))
248 t.setDaemon(True)
249 t.start()
250
251
253 """
254 Protocol implementation for Services over TCPROS
255 """
256
258 """
259 ctor.
260 @param resolved_name: name of service
261 @type resolved_name: str
262 @param service_class: Service data type class
263 @type service_class: Service
264 @param buff_size int: size of buffer (bytes) to use for reading incoming requests.
265 @type buff_size: int
266 """
267 super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size)
268 self.service_class = service_class
269
271 """
272 Protocol API
273 @return: header fields
274 @rtype: dict
275 """
276 return {'service': self.resolved_name, 'type': self.service_class._type,
277 'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() }
278
280 """Protocol Implementation for Service clients over TCPROS"""
281
283 """
284 ctor.
285 @param resolved_name: resolved service name
286 @type resolved_name: str
287 @param service_class: Service data type class
288 @type service_class: Service
289 @param headers: identifier for Service session
290 @type headers: dict
291 @param buff_size: size of buffer (bytes) for reading responses from Service.
292 @type buff_size: int
293 """
294 super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class)
295 self.service_class = service_class
296 self.headers = headers or {}
297 self.buff_size = buff_size
298
300 """
301 TCPROSTransportProtocol API
302 """
303 headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum,
304 'callerid': rospy.names.get_caller_id()}
305
306
307
308 for k, v in self.headers.items():
309 headers[k] = v
310 return headers
311
313 """
314 Utility for reading the OK-byte/error-message header preceding each message.
315 @param sock: socket connection. Will be read from if OK byte is
316 false and error message needs to be read
317 @type sock: socket.socket
318 @param b: buffer to read from
319 @type b: StringIO
320 """
321 if b.tell() == 0:
322 return
323 pos = b.tell()
324 b.seek(0)
325 ok = struct.unpack('<B', b.read(1))[0]
326 b.seek(pos)
327 if not ok:
328 str = self._read_service_error(sock, b)
329
330
331
332
333
334 b.seek(0)
335 b.truncate(0)
336 raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str))
337 else:
338
339 b.seek(pos)
340
342 """
343 In service implementation, reads in OK byte that preceeds each
344 response. The OK byte allows for the passing of error messages
345 instead of a response message
346 @param b: buffer
347 @type b: StringIO
348 @param msg_queue: Message queue to append to
349 @type msg_queue: [Message]
350 @param sock: socket to read from
351 @type sock: socket.socket
352 """
353 self._read_ok_byte(b, sock)
354 rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)
355
356
357
358 if b.tell() == 1:
359 b.seek(0)
360
362 """
363 Read service error from sock
364 @param sock: socket to read from
365 @type sock: socket
366 @param b: currently read data from sock
367 @type b: StringIO
368 """
369 buff_size = 256
370 while b.tell() < 5:
371 recv_buff(sock, b, buff_size)
372 bval = b.getvalue()
373 (length,) = struct.unpack('<I', bval[1:5])
374 while b.tell() < (5 + length):
375 recv_buff(sock, b, buff_size)
376 bval = b.getvalue()
377 return struct.unpack('<%ss'%length, bval[5:5+length])[0]
378
379
381 """
382 Create a handle to a ROS service for invoking calls.
383
384 Usage::
385 add_two_ints = ServiceProxy('add_two_ints', AddTwoInts)
386 resp = add_two_ints(1, 2)
387 """
388
389 - def __init__(self, name, service_class, persistent=False, headers=None):
390 """
391 ctor.
392 @param name: name of service to call
393 @type name: str
394 @param service_class: auto-generated service class
395 @type service_class: Service class
396 @param persistent: (optional) if True, proxy maintains a persistent
397 connection to service. While this results in better call
398 performance, persistent connections are discouraged as they are
399 less resistent to network issues and service restarts.
400 @type persistent: bool
401 @param headers: (optional) arbitrary headers
402 @type headers: dict
403 """
404 super(ServiceProxy, self).__init__(name, service_class)
405 self.uri = None
406 self.seq = 0
407 self.buff_size = DEFAULT_BUFF_SIZE
408 self.persistent = persistent
409 if persistent:
410 if not headers:
411 headers = {}
412 headers['persistent'] = '1'
413 self.protocol = TCPROSServiceClient(self.resolved_name,
414 self.service_class, headers=headers)
415 self.transport = None
416
419
420
422 """
423 Callable-style version of the service API. This accepts either a request message instance,
424 or you can call directly with arguments to create a new request instance. e.g.::
425
426 add_two_ints(AddTwoIntsRequest(1, 2))
427 add_two_ints(1, 2)
428 add_two_ints(a=1, b=2)
429
430 @param args: arguments to remote service
431 @param kwds: message keyword arguments
432 @raise ROSSerializationException: If unable to serialize
433 message. This is usually a type error with one of the fields.
434 """
435 return self.call(*args, **kwds)
436
438 """
439 private routine for getting URI of service to call
440 @param request: request message
441 @type request: L{rospy.Message}
442 """
443 if not isinstance(request, genpy.Message):
444 raise TypeError("request object is not a valid request message instance")
445
446
447
448
449 if not self.request_class._type == request._type:
450 raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class))
451
452
453
454 if 1:
455 try:
456 try:
457 master = rosgraph.Master(rospy.names.get_caller_id())
458 self.uri = master.lookupService(self.resolved_name)
459 except socket.error:
460 raise ServiceException("unable to contact master")
461 except rosgraph.MasterError as e:
462 logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, str(e))
463 raise ServiceException("service [%s] unavailable"%self.resolved_name)
464
465
466 try:
467 rospy.core.parse_rosrpc_uri(self.uri)
468 except rospy.impl.validators.ParameterInvalid:
469 raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri)
470 except socket.error as e:
471 logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name)
472 return self.uri
473
474 - def call(self, *args, **kwds):
475 """
476 Call the service. This accepts either a request message instance,
477 or you can call directly with arguments to create a new request instance. e.g.::
478
479 add_two_ints(AddTwoIntsRequest(1, 2))
480 add_two_ints(1, 2)
481 add_two_ints(a=1, b=2)
482
483 @raise TypeError: if request is not of the valid type (Message)
484 @raise ServiceException: if communication with remote service fails
485 @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call
486 @raise ROSSerializationException: If unable to serialize
487 message. This is usually a type error with one of the fields.
488 """
489
490
491 request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)
492
493
494 if self.transport is None:
495 service_uri = self._get_service_uri(request)
496 dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri)
497
498
499 transport = TCPROSTransport(self.protocol, self.resolved_name)
500 transport.buff_size = self.buff_size
501 try:
502 transport.connect(dest_addr, dest_port, service_uri)
503 except TransportInitError as e:
504
505 raise ServiceException("unable to connect to service: %s"%e)
506 self.transport = transport
507 else:
508 transport = self.transport
509
510
511 self.seq += 1
512 transport.send_message(request, self.seq)
513
514 try:
515 responses = transport.receive_once()
516 if len(responses) == 0:
517 raise ServiceException("service [%s] returned no response"%self.resolved_name)
518 elif len(responses) > 1:
519 raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses)))
520 except rospy.exceptions.TransportException as e:
521
522 if rospy.core.is_shutdown():
523 raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call")
524 else:
525 raise ServiceException("transport error completing service call: %s"%(str(e)))
526 finally:
527 if not self.persistent:
528 transport.close()
529 self.transport = None
530 return responses[0]
531
532
534 """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances."""
535 if self.transport is not None:
536 self.transport.close()
537
539 """
540 Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class.
541 """
542
544 super(ServiceImpl, self).__init__(name, service_class)
545
546 if not name or not isstring(name):
547 raise ValueError("service name is not a non-empty string")
548
549 if not rosgraph.names.is_legal_name(name):
550 import warnings
551 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
552
553
554 self.handler = handler
555 if error_handler is not None:
556 self.error_handler = error_handler
557 self.registered = False
558 self.seq = 0
559 self.done = False
560 self.buff_size=buff_size
561
562 start_tcpros_server()
563 host, port = get_tcpros_server_address()
564 self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port)
565 logdebug("... service URL is %s"%self.uri)
566
567 self.protocol = TCPService(self.resolved_name, service_class, self.buff_size)
568
569 logdebug("[%s]: new Service instance"%self.resolved_name)
570
571
572
574 logerr("Error processing request: %s\n%s" % (e, traceback.format_exception(exc_type, exc_value, tb)))
575
577 """
578 Stop this service
579 @param reason: human-readable shutdown reason
580 @type reason: str
581 """
582 self.done = True
583 logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason))
584 try:
585
586 get_service_manager().unregister(self.resolved_name, self)
587 except Exception as e:
588 logerr("Unable to unregister with master: "+traceback.format_exc())
589 raise ServiceException("Unable to connect to master: %s"%e)
590
592 """
593 Let service run and take over thread until service or node
594 shutdown. Use this method to keep your scripts from exiting
595 execution.
596 """
597 try:
598 while not rospy.core.is_shutdown() and not self.done:
599 time.sleep(0.5)
600 except KeyboardInterrupt:
601 logdebug("keyboard interrupt, shutting down")
602
604 """
605 Send error message to client
606 @param transport: transport connection to client
607 @type transport: Transport
608 @param err_msg: error message to send to client
609 @type err_msg: str
610 """
611 if sys.hexversion > 0x03000000:
612 err_msg = bytes(err_msg, 'utf-8')
613 transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg))
614
616 """
617 Process a single incoming request.
618 @param transport: transport instance
619 @type transport: L{TCPROSTransport}
620 @param request: Message
621 @type request: genpy.Message
622 """
623 try:
624
625 response = convert_return_to_response(self.handler(request), self.response_class)
626 self.seq += 1
627
628 transport.write_buff.write(struct.pack('<B', 1))
629 transport.send_message(response, self.seq)
630 except ServiceException as e:
631 rospy.core.rospydebug("handler raised ServiceException: %s"%(e))
632 self._write_service_error(transport, "service cannot process request: %s"%e)
633 except Exception as e:
634 (exc_type, exc_value, tb) = sys.exc_info()
635 self.error_handler(e, exc_type, exc_value, tb)
636 self._write_service_error(transport, "error processing request: %s"%e)
637
638 - def handle(self, transport, header):
639 """
640 Process incoming request. This method should be run in its
641 own thread. If header['persistent'] is set to 1, method will
642 block until connection is broken.
643 @param transport: transport instance
644 @type transport: L{TCPROSTransport}
645 @param header: headers from client
646 @type header: dict
647 """
648 if 'persistent' in header and \
649 header['persistent'].lower() in ['1', 'true']:
650 persistent = True
651 else:
652 persistent = False
653 if header.get('probe', None) == '1':
654
655 transport.close()
656 return
657 handle_done = False
658 while not handle_done:
659 try:
660 requests = transport.receive_once()
661 for request in requests:
662 self._handle_request(transport, request)
663 if not persistent:
664 handle_done = True
665 except rospy.exceptions.TransportTerminated as e:
666 if not persistent:
667 logerr("incoming connection failed: %s"%e)
668 logdebug("service[%s]: transport terminated"%self.resolved_name)
669 handle_done = True
670 transport.close()
671
672
674 """
675 Declare a ROS service. Service requests are passed to the
676 specified handler.
677
678 Service Usage::
679 s = Service('getmapservice', GetMap, get_map_handler)
680 """
681
684 """
685 ctor.
686
687 @param name: service name, ``str``
688 @param service_class: Service definition class
689
690 @param handler: callback function for processing service
691 request. Function takes in a ServiceRequest and returns a
692 ServiceResponse of the appropriate type. Function may also
693 return a list, tuple, or dictionary with arguments to initialize
694 a ServiceResponse instance of the correct type.
695
696 If handler cannot process request, it may either return None,
697 to indicate failure, or it may raise a rospy.ServiceException
698 to send a specific error message to the client. Returning None
699 is always considered a failure.
700
701 @type handler: fn(req)->resp
702
703 @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message
704 @type buff_size: int
705
706 @param error_handler: callback function for handling errors
707 raised in the service code.
708 @type error_handler: fn(exception, exception_type, exception_value, traceback)->None
709 """
710 super(Service, self).__init__(name, service_class, handler, buff_size,
711 error_handler)
712
713
714 get_service_manager().register(self.resolved_name, self)
715