1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """Internal use: Service-specific extensions for TCPROS support"""
34
35 import io
36 import socket
37 import struct
38 import sys
39 import logging
40
41 import time
42 import traceback
43
44 import genpy
45
46 import rosgraph
47 import rosgraph.names
48 import rosgraph.network
49
50 from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException
51 from rospy.service import _Service, ServiceException
52
53 from rospy.impl.registration import get_service_manager
54 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
55 get_tcpros_server_address, start_tcpros_server, recv_buff, \
56 DEFAULT_BUFF_SIZE
57
58 from rospy.core import logwarn, loginfo, logerr, logdebug
59 import rospy.core
60 import rospy.msg
61 import rospy.names
62
63 import rospy.impl.validators
64
65 try:
66 import _thread
67 except ImportError:
68 import thread as _thread
69
70 if sys.hexversion > 0x03000000:
72 return isinstance(s, str)
73 else:
75 return isinstance(s, basestring)
76
77 logger = logging.getLogger('rospy.service')
78
79
80
81
83 """
84 Blocks until service is available. Use this in
85 initialization code if your program depends on a
86 service already running.
87 @param service: name of service
88 @type service: str
89 @param timeout: timeout time in seconds, None for no
90 timeout. NOTE: timeout=0 is invalid as wait_for_service actually
91 contacts the service, so non-blocking behavior is not
92 possible. For timeout=0 uses cases, just call the service without
93 waiting.
94 @type timeout: double
95 @raise ROSException: if specified timeout is exceeded
96 @raise ROSInterruptException: if shutdown interrupts wait
97 """
98 master = rosgraph.Master(rospy.names.get_caller_id())
99 def contact_service(resolved_name, timeout=10.0):
100 try:
101 uri = master.lookupService(resolved_name)
102 except rosgraph.MasterException:
103 return False
104
105 addr = rospy.core.parse_rosrpc_uri(uri)
106 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
107 try:
108
109
110 s.settimeout(timeout)
111 s.connect(addr)
112 h = { 'probe' : '1', 'md5sum' : '*',
113 'callerid' : rospy.core.get_caller_id(),
114 'service': resolved_name }
115 rosgraph.network.write_ros_handshake_header(s, h)
116 return True
117 finally:
118 if s is not None:
119 s.close()
120 if timeout == 0.:
121 raise ValueError("timeout must be non-zero")
122 resolved_name = rospy.names.resolve_name(service)
123 first = False
124 if timeout:
125 timeout_t = time.time() + timeout
126 while not rospy.core.is_shutdown() and time.time() < timeout_t:
127 try:
128 if contact_service(resolved_name, timeout_t-time.time()):
129 return
130 time.sleep(0.3)
131 except KeyboardInterrupt:
132
133 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
134 raise
135 except:
136 if first:
137 first = False
138 rospy.core.logerr("wait_for_service(%s): failed to contact [%s], will keep trying"%(resolved_name, uri))
139 if rospy.core.is_shutdown():
140 raise ROSInterruptException("rospy shutdown")
141 else:
142 raise ROSException("timeout exceeded while waiting for service %s"%resolved_name)
143 else:
144 while not rospy.core.is_shutdown():
145 try:
146 if contact_service(resolved_name):
147 return
148 time.sleep(0.3)
149 except KeyboardInterrupt:
150
151 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
152 raise
153 except:
154 if first:
155 first = False
156 rospy.core.logerr("wait_for_service(%s): failed to contact [%s], will keep trying"%(resolved_name, uri))
157 if rospy.core.is_shutdown():
158 raise ROSInterruptException("rospy shutdown")
159
160
162 """
163 Convert return value of function to response instance. The
164 rules/precedence for this are:
165
166 1. If the return type is the same as the response type, no conversion
167 is done.
168
169 2. If the return type is a dictionary, it is used as a keyword-style
170 initialization for a new response instance.
171
172 3. If the return type is *not* a list type, it is passed in as a single arg
173 to a new response instance.
174
175 4. If the return type is a list/tuple type, it is used as a args-style
176 initialization for a new response instance.
177 """
178
179
180
181
182 if isinstance(response, genpy.Message) and response._type == response_class._type:
183
184 return response
185 elif type(response) == dict:
186
187 try:
188 return response_class(**response)
189 except AttributeError as e:
190 raise ServiceException("handler returned invalid value: %s"%str(e))
191 elif response == None:
192 raise ServiceException("service handler returned None")
193 elif type(response) not in [list, tuple]:
194
195 try:
196 return response_class(response)
197 except TypeError as e:
198 raise ServiceException("handler returned invalid value: %s"%str(e))
199 else:
200
201
202 try:
203 return response_class(*response)
204 except TypeError as e:
205 raise ServiceException("handler returned wrong number of values: %s"%str(e))
206
208 """
209 Process incoming service connection. For use with
210 TCPROSServer. Reads in service name from handshake and creates the
211 appropriate service handler for the connection.
212 @param sock: socket connection
213 @type sock: socket
214 @param client_addr: client address
215 @type client_addr: (str, int)
216 @param header: key/value pairs from handshake header
217 @type header: dict
218 @return: error string or None
219 @rtype: str
220 """
221 for required in ['service', 'md5sum', 'callerid']:
222 if not required in header:
223 return "Missing required '%s' field"%required
224 else:
225 logger.debug("connection from %s:%s", client_addr[0], client_addr[1])
226 service_name = header['service']
227
228
229
230
231
232
233 sm = get_service_manager()
234 md5sum = header['md5sum']
235 service = sm.get_service(service_name)
236 if not service:
237 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name)
238 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum:
239 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum)
240 else:
241 transport = TCPROSTransport(service.protocol, service_name, header=header)
242 transport.set_socket(sock, header['callerid'])
243 transport.write_header()
244
245
246 _thread.start_new_thread(service.handle, (transport, header))
247
248
250 """
251 Protocol implementation for Services over TCPROS
252 """
253
255 """
256 ctor.
257 @param resolved_name: name of service
258 @type resolved_name: str
259 @param service_class: Service data type class
260 @type service_class: Service
261 @param buff_size int: size of buffer (bytes) to use for reading incoming requests.
262 @type buff_size: int
263 """
264 super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size)
265 self.service_class = service_class
266
268 """
269 Protocol API
270 @return: header fields
271 @rtype: dict
272 """
273 return {'service': self.resolved_name, 'type': self.service_class._type,
274 'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() }
275
277 """Protocol Implementation for Service clients over TCPROS"""
278
280 """
281 ctor.
282 @param resolved_name: resolved service name
283 @type resolved_name: str
284 @param service_class: Service data type class
285 @type service_class: Service
286 @param headers: identifier for Service session
287 @type headers: dict
288 @param buff_size: size of buffer (bytes) for reading responses from Service.
289 @type buff_size: int
290 """
291 super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class)
292 self.service_class = service_class
293 self.headers = headers or {}
294 self.buff_size = buff_size
295
297 """
298 TCPROSTransportProtocol API
299 """
300 headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum,
301 'callerid': rospy.names.get_caller_id()}
302
303
304
305 for k, v in self.headers.items():
306 headers[k] = v
307 return headers
308
310 """
311 Utility for reading the OK-byte/error-message header preceding each message.
312 @param sock: socket connection. Will be read from if OK byte is
313 false and error message needs to be read
314 @type sock: socket.socket
315 @param b: buffer to read from
316 @type b: StringIO
317 """
318 if b.tell() == 0:
319 return
320 pos = b.tell()
321 b.seek(0)
322 ok = struct.unpack('<B', b.read(1))[0]
323 b.seek(pos)
324 if not ok:
325 str = self._read_service_error(sock, b)
326
327
328
329
330
331 b.seek(0)
332 b.truncate(0)
333 raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str))
334 else:
335
336 b.seek(pos)
337
339 """
340 In service implementation, reads in OK byte that preceeds each
341 response. The OK byte allows for the passing of error messages
342 instead of a response message
343 @param b: buffer
344 @type b: StringIO
345 @param msg_queue: Message queue to append to
346 @type msg_queue: [Message]
347 @param sock: socket to read from
348 @type sock: socket.socket
349 """
350 self._read_ok_byte(b, sock)
351 rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)
352
353
354
355 if b.tell() == 1:
356 b.seek(0)
357
359 """
360 Read service error from sock
361 @param sock: socket to read from
362 @type sock: socket
363 @param b: currently read data from sock
364 @type b: StringIO
365 """
366 buff_size = 256
367 while b.tell() < 5:
368 recv_buff(sock, b, buff_size)
369 bval = b.getvalue()
370 (length,) = struct.unpack('<I', bval[1:5])
371 while b.tell() < (5 + length):
372 recv_buff(sock, b, buff_size)
373 bval = b.getvalue()
374 return struct.unpack('<%ss'%length, bval[5:5+length])[0]
375
376
378 """
379 Create a handle to a ROS service for invoking calls.
380
381 Usage::
382 add_two_ints = ServiceProxy('add_two_ints', AddTwoInts)
383 resp = add_two_ints(1, 2)
384 """
385
386 - def __init__(self, name, service_class, persistent=False, headers=None):
387 """
388 ctor.
389 @param name: name of service to call
390 @type name: str
391 @param service_class: auto-generated service class
392 @type service_class: Service class
393 @param persistent: (optional) if True, proxy maintains a persistent
394 connection to service. While this results in better call
395 performance, persistent connections are discouraged as they are
396 less resistent to network issues and service restarts.
397 @type persistent: bool
398 @param headers: (optional) arbitrary headers
399 @type headers: dict
400 """
401 super(ServiceProxy, self).__init__(name, service_class)
402 self.uri = None
403 self.seq = 0
404 self.buff_size = DEFAULT_BUFF_SIZE
405 self.persistent = persistent
406 if persistent:
407 if not headers:
408 headers = {}
409 headers['persistent'] = '1'
410 self.protocol = TCPROSServiceClient(self.resolved_name,
411 self.service_class, headers=headers)
412 self.transport = None
413
416
417
419 """
420 Callable-style version of the service API. This accepts either a request message instance,
421 or you can call directly with arguments to create a new request instance. e.g.::
422
423 add_two_ints(AddTwoIntsRequest(1, 2))
424 add_two_ints(1, 2)
425 add_two_ints(a=1, b=2)
426
427 @param args: arguments to remote service
428 @param kwds: message keyword arguments
429 @raise ROSSerializationException: If unable to serialize
430 message. This is usually a type error with one of the fields.
431 """
432 return self.call(*args, **kwds)
433
435 """
436 private routine for getting URI of service to call
437 @param request: request message
438 @type request: L{rospy.Message}
439 """
440 if not isinstance(request, genpy.Message):
441 raise TypeError("request object is not a valid request message instance")
442
443
444
445
446 if not self.request_class._type == request._type:
447 raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class))
448
449
450
451 if 1:
452 try:
453 try:
454 master = rosgraph.Master(rospy.names.get_caller_id())
455 self.uri = master.lookupService(self.resolved_name)
456 except socket.error:
457 raise ServiceException("unable to contact master")
458 except rosgraph.MasterError as e:
459 logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, str(e))
460 raise ServiceException("service [%s] unavailable"%self.resolved_name)
461
462
463 try:
464 rospy.core.parse_rosrpc_uri(self.uri)
465 except rospy.impl.validators.ParameterInvalid:
466 raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri)
467 except socket.error as e:
468 logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name)
469 return self.uri
470
471 - def call(self, *args, **kwds):
472 """
473 Call the service. This accepts either a request message instance,
474 or you can call directly with arguments to create a new request instance. e.g.::
475
476 add_two_ints(AddTwoIntsRequest(1, 2))
477 add_two_ints(1, 2)
478 add_two_ints(a=1, b=2)
479
480 @raise TypeError: if request is not of the valid type (Message)
481 @raise ServiceException: if communication with remote service fails
482 @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call
483 @raise ROSSerializationException: If unable to serialize
484 message. This is usually a type error with one of the fields.
485 """
486
487
488 request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)
489
490
491 if self.transport is None:
492 service_uri = self._get_service_uri(request)
493 dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri)
494
495
496 transport = TCPROSTransport(self.protocol, self.resolved_name)
497 transport.buff_size = self.buff_size
498 try:
499 transport.connect(dest_addr, dest_port, service_uri)
500 except TransportInitError as e:
501
502 raise ServiceException("unable to connect to service: %s"%e)
503 self.transport = transport
504 else:
505 transport = self.transport
506
507
508 self.seq += 1
509 transport.send_message(request, self.seq)
510
511 try:
512 responses = transport.receive_once()
513 if len(responses) == 0:
514 raise ServiceException("service [%s] returned no response"%self.resolved_name)
515 elif len(responses) > 1:
516 raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses)))
517 except rospy.exceptions.TransportException as e:
518
519 if rospy.core.is_shutdown():
520 raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call")
521 else:
522 raise ServiceException("transport error completing service call: %s"%(str(e)))
523 finally:
524 if not self.persistent:
525 transport.close()
526 self.transport = None
527 return responses[0]
528
529
531 """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances."""
532 if self.transport is not None:
533 self.transport.close()
534
536 """
537 Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class.
538 """
539
541 super(ServiceImpl, self).__init__(name, service_class)
542
543 if not name or not isstring(name):
544 raise ValueError("service name is not a non-empty string")
545
546 if not rosgraph.names.is_legal_name(name):
547 import warnings
548 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
549
550
551 self.handler = handler
552 self.registered = False
553 self.seq = 0
554 self.done = False
555 self.buff_size=buff_size
556
557 start_tcpros_server()
558 host, port = get_tcpros_server_address()
559 self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port)
560 logdebug("... service URL is %s"%self.uri)
561
562 self.protocol = TCPService(self.resolved_name, service_class, self.buff_size)
563
564 logdebug("[%s]: new Service instance"%self.resolved_name)
565
566
567
569 """
570 Stop this service
571 @param reason: human-readable shutdown reason
572 @type reason: str
573 """
574 self.done = True
575 logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason))
576 try:
577
578 get_service_manager().unregister(self.resolved_name, self)
579 except Exception as e:
580 logerr("Unable to unregister with master: "+traceback.format_exc())
581 raise ServiceException("Unable to connect to master: %s"%e)
582
584 """
585 Let service run and take over thread until service or node
586 shutdown. Use this method to keep your scripts from exiting
587 execution.
588 """
589 try:
590 while not rospy.core.is_shutdown() and not self.done:
591 time.sleep(0.5)
592 except KeyboardInterrupt:
593 logdebug("keyboard interrupt, shutting down")
594
596 """
597 Send error message to client
598 @param transport: transport connection to client
599 @type transport: Transport
600 @param err_msg: error message to send to client
601 @type err_msg: str
602 """
603 transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg))
604
606 """
607 Process a single incoming request.
608 @param transport: transport instance
609 @type transport: L{TCPROSTransport}
610 @param request: Message
611 @type request: genpy.Message
612 """
613 try:
614
615 response = convert_return_to_response(self.handler(request), self.response_class)
616 self.seq += 1
617
618 transport.write_buff.write(struct.pack('<B', 1))
619 transport.send_message(response, self.seq)
620 except ServiceException as e:
621 rospy.core.rospydebug("handler raised ServiceException: %s"%(e))
622 self._write_service_error(transport, "service cannot process request: %s"%e)
623 except Exception as e:
624 logerr("Error processing request: %s\n%s"%(e,traceback.print_exc()))
625 self._write_service_error(transport, "error processing request: %s"%e)
626
627 - def handle(self, transport, header):
628 """
629 Process incoming request. This method should be run in its
630 own thread. If header['persistent'] is set to 1, method will
631 block until connection is broken.
632 @param transport: transport instance
633 @type transport: L{TCPROSTransport}
634 @param header: headers from client
635 @type header: dict
636 """
637 if 'persistent' in header and \
638 header['persistent'].lower() in ['1', 'true']:
639 persistent = True
640 else:
641 persistent = False
642 if header.get('probe', None) == '1':
643
644 transport.close()
645 return
646 handle_done = False
647 while not handle_done:
648 try:
649 requests = transport.receive_once()
650 for request in requests:
651 self._handle_request(transport, request)
652 if not persistent:
653 handle_done = True
654 except rospy.exceptions.TransportTerminated as e:
655 if not persistent:
656 logerr("incoming connection failed: %s"%e)
657 logdebug("service[%s]: transport terminated"%self.resolved_name)
658 handle_done = True
659 transport.close()
660
661
663 """
664 Declare a ROS service. Service requests are passed to the
665 specified handler.
666
667 Service Usage::
668 s = Service('getmapservice', GetMap, get_map_handler)
669 """
670
672 """
673 ctor.
674
675 @param name: service name, ``str``
676 @param service_class: Service definition class
677
678 @param handler: callback function for processing service
679 request. Function takes in a ServiceRequest and returns a
680 ServiceResponse of the appropriate type. Function may also
681 return a list, tuple, or dictionary with arguments to initialize
682 a ServiceResponse instance of the correct type.
683
684 If handler cannot process request, it may either return None,
685 to indicate failure, or it may raise a rospy.ServiceException
686 to send a specific error message to the client. Returning None
687 is always considered a failure.
688
689 @type handler: fn(req)->resp
690 @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message
691 @type buff_size: int
692 """
693 super(Service, self).__init__(name, service_class, handler, buff_size)
694
695
696 get_service_manager().register(self.resolved_name, self)
697