1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: Service-specific extensions for TCPROS support"""
36
37 import io
38 import socket
39 import struct
40 import sys
41 import logging
42
43 import time
44 import traceback
45
46 import roslib.scriptutil
47
48 from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException
49 from rospy.service import _Service, ServiceException
50
51 from rospy.impl.registration import get_service_manager
52 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
53 get_tcpros_server_address, start_tcpros_server, recv_buff, \
54 DEFAULT_BUFF_SIZE
55
56 from rospy.core import logwarn, loginfo, logerr, logdebug
57 import rospy.core
58 import rospy.msg
59 import rospy.names
60
61 import rospy.impl.validators
62
63 try:
64 import _thread
65 except ImportError:
66 import thread as _thread
67
68 if sys.hexversion > 0x03000000:
70 return isinstance(s, str)
71 else:
73 return isinstance(s, basestring)
74
75 logger = logging.getLogger('rospy.service')
76
77
78
79
81 """
82 Blocks until service is available. Use this in
83 initialization code if your program depends on a
84 service already running.
85 @param service: name of service
86 @type service: str
87 @param timeout: timeout time in seconds, None for no
88 timeout. NOTE: timeout=0 is invalid as wait_for_service actually
89 contacts the service, so non-blocking behavior is not
90 possible. For timeout=0 uses cases, just call the service without
91 waiting.
92 @type timeout: double
93 @raise ROSException: if specified timeout is exceeded
94 @raise ROSInterruptException: if shutdown interrupts wait
95 """
96 def contact_service(resolved_name, timeout=10.0):
97 code, _, uri = master.lookupService(rospy.names.get_caller_id(), resolved_name)
98 if False and code == 1:
99 return True
100 elif True and code == 1:
101
102 addr = rospy.core.parse_rosrpc_uri(uri)
103 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104 try:
105
106
107 s.settimeout(timeout)
108 s.connect(addr)
109 h = { 'probe' : '1', 'md5sum' : '*',
110 'callerid' : rospy.core.get_caller_id(),
111 'service': resolved_name }
112 roslib.network.write_ros_handshake_header(s, h)
113 return True
114 finally:
115 if s is not None:
116 s.close()
117 if timeout == 0.:
118 raise ValueError("timeout must be non-zero")
119 resolved_name = rospy.names.resolve_name(service)
120 master = roslib.scriptutil.get_master()
121 first = False
122 if timeout:
123 timeout_t = time.time() + timeout
124 while not rospy.core.is_shutdown() and time.time() < timeout_t:
125 try:
126 if contact_service(resolved_name, timeout_t-time.time()):
127 return
128 time.sleep(0.3)
129 except KeyboardInterrupt:
130
131 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
132 raise
133 except:
134 if first:
135 first = False
136 rospy.core.logerr("wait_for_service(%s): failed to contact [%s], will keep trying"%(resolved_name, uri))
137 if rospy.core.is_shutdown():
138 raise ROSInterruptException("rospy shutdown")
139 else:
140 raise ROSException("timeout exceeded while waiting for service %s"%resolved_name)
141 else:
142 while not rospy.core.is_shutdown():
143 try:
144 if contact_service(resolved_name):
145 return
146 time.sleep(0.3)
147 except KeyboardInterrupt:
148
149 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
150 raise
151 except:
152 if first:
153 first = False
154 rospy.core.logerr("wait_for_service(%s): failed to contact [%s], will keep trying"%(resolved_name, uri))
155 if rospy.core.is_shutdown():
156 raise ROSInterruptException("rospy shutdown")
157
158
160 """
161 Convert return value of function to response instance. The
162 rules/precedence for this are:
163
164 1. If the return type is the same as the response type, no conversion
165 is done.
166
167 2. If the return type is a dictionary, it is used as a keyword-style
168 initialization for a new response instance.
169
170 3. If the return type is *not* a list type, it is passed in as a single arg
171 to a new response instance.
172
173 4. If the return type is a list/tuple type, it is used as a args-style
174 initialization for a new response instance.
175 """
176
177
178
179
180 if isinstance(response, roslib.message.Message) and response._type == response_class._type:
181
182 return response
183 elif type(response) == dict:
184
185 try:
186 return response_class(**response)
187 except AttributeError as e:
188 raise ServiceException("handler returned invalid value: %s"%str(e))
189 elif response == None:
190 raise ServiceException("service handler returned None")
191 elif type(response) not in [list, tuple]:
192
193 try:
194 return response_class(response)
195 except TypeError as e:
196 raise ServiceException("handler returned invalid value: %s"%str(e))
197 else:
198
199
200 try:
201 return response_class(*response)
202 except TypeError as e:
203 raise ServiceException("handler returned wrong number of values: %s"%str(e))
204
206 """
207 Process incoming service connection. For use with
208 TCPROSServer. Reads in service name from handshake and creates the
209 appropriate service handler for the connection.
210 @param sock: socket connection
211 @type sock: socket
212 @param client_addr: client address
213 @type client_addr: (str, int)
214 @param header: key/value pairs from handshake header
215 @type header: dict
216 @return: error string or None
217 @rtype: str
218 """
219 for required in ['service', 'md5sum', 'callerid']:
220 if not required in header:
221 return "Missing required '%s' field"%required
222 else:
223 logger.debug("connection from %s:%s", client_addr[0], client_addr[1])
224 service_name = header['service']
225
226
227
228
229
230
231 sm = get_service_manager()
232 md5sum = header['md5sum']
233 service = sm.get_service(service_name)
234 if not service:
235 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name)
236 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum:
237 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum)
238 else:
239 transport = TCPROSTransport(service.protocol, service_name, header=header)
240 transport.set_socket(sock, header['callerid'])
241 transport.write_header()
242
243
244 _thread.start_new_thread(service.handle, (transport, header))
245
246
248 """
249 Protocol implementation for Services over TCPROS
250 """
251
253 """
254 ctor.
255 @param resolved_name: name of service
256 @type resolved_name: str
257 @param service_class: Service data type class
258 @type service_class: Service
259 @param buff_size int: size of buffer (bytes) to use for reading incoming requests.
260 @type buff_size: int
261 """
262 super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size)
263 self.service_class = service_class
264
266 """
267 Protocol API
268 @return: header fields
269 @rtype: dict
270 """
271 return {'service': self.resolved_name, 'type': self.service_class._type,
272 'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() }
273
275 """Protocol Implementation for Service clients over TCPROS"""
276
278 """
279 ctor.
280 @param resolved_name: resolved service name
281 @type resolved_name: str
282 @param service_class: Service data type class
283 @type service_class: Service
284 @param headers: identifier for Service session
285 @type headers: dict
286 @param buff_size: size of buffer (bytes) for reading responses from Service.
287 @type buff_size: int
288 """
289 super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class)
290 self.service_class = service_class
291 self.headers = headers or {}
292 self.buff_size = buff_size
293
295 """
296 TCPROSTransportProtocol API
297 """
298 headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum,
299 'callerid': rospy.names.get_caller_id()}
300
301
302
303 for k, v in self.headers.items():
304 headers[k] = v
305 return headers
306
308 """
309 Utility for reading the OK-byte/error-message header preceding each message.
310 @param sock: socket connection. Will be read from if OK byte is
311 false and error message needs to be read
312 @type sock: socket.socket
313 @param b: buffer to read from
314 @type b: StringIO
315 """
316 if b.tell() == 0:
317 return
318 pos = b.tell()
319 b.seek(0)
320 ok = struct.unpack('<B', b.read(1))[0]
321 b.seek(pos)
322 if not ok:
323 str = self._read_service_error(sock, b)
324
325
326
327
328
329 b.seek(0)
330 b.truncate(0)
331 raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str))
332 else:
333
334 b.seek(pos)
335
337 """
338 In service implementation, reads in OK byte that preceeds each
339 response. The OK byte allows for the passing of error messages
340 instead of a response message
341 @param b: buffer
342 @type b: StringIO
343 @param msg_queue: Message queue to append to
344 @type msg_queue: [Message]
345 @param sock: socket to read from
346 @type sock: socket.socket
347 """
348 self._read_ok_byte(b, sock)
349 rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)
350
351
352
353 if b.tell() == 1:
354 b.seek(0)
355
357 """
358 Read service error from sock
359 @param sock: socket to read from
360 @type sock: socket
361 @param b: currently read data from sock
362 @type b: StringIO
363 """
364 buff_size = 256
365 while b.tell() < 5:
366 recv_buff(sock, b, buff_size)
367 bval = b.getvalue()
368 (length,) = struct.unpack('<I', bval[1:5])
369 while b.tell() < (5 + length):
370 recv_buff(sock, b, buff_size)
371 bval = b.getvalue()
372 return struct.unpack('<%ss'%length, bval[5:5+length])[0]
373
374
376 """
377 Create a handle to a ROS service for invoking calls.
378
379 Usage::
380 add_two_ints = ServiceProxy('add_two_ints', AddTwoInts)
381 resp = add_two_ints(1, 2)
382 """
383
384 - def __init__(self, name, service_class, persistent=False, headers=None):
385 """
386 ctor.
387 @param name: name of service to call
388 @type name: str
389 @param service_class: auto-generated service class
390 @type service_class: Service class
391 @param persistent: (optional) if True, proxy maintains a persistent
392 connection to service. While this results in better call
393 performance, persistent connections are discouraged as they are
394 less resistent to network issues and service restarts.
395 @type persistent: bool
396 @param headers: (optional) arbitrary headers
397 @type headers: dict
398 """
399 super(ServiceProxy, self).__init__(name, service_class)
400 self.uri = None
401 self.seq = 0
402 self.buff_size = DEFAULT_BUFF_SIZE
403 self.persistent = persistent
404 if persistent:
405 if not headers:
406 headers = {}
407 headers['persistent'] = '1'
408 self.protocol = TCPROSServiceClient(self.resolved_name,
409 self.service_class, headers=headers)
410 self.transport = None
411
414
415
417 """
418 Callable-style version of the service API. This accepts either a request message instance,
419 or you can call directly with arguments to create a new request instance. e.g.::
420
421 add_two_ints(AddTwoIntsRequest(1, 2))
422 add_two_ints(1, 2)
423 add_two_ints(a=1, b=2)
424
425 @param args: arguments to remote service
426 @param kwds: message keyword arguments
427 @raise ROSSerializationException: If unable to serialize
428 message. This is usually a type error with one of the fields.
429 """
430 return self.call(*args, **kwds)
431
433 """
434 private routine for getting URI of service to call
435 @param request: request message
436 @type request: L{rospy.Message}
437 """
438 if not isinstance(request, roslib.message.Message):
439 raise TypeError("request object is not a valid request message instance")
440
441
442
443
444 if not self.request_class._type == request._type:
445 raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class))
446
447
448
449 if 1:
450 try:
451 try:
452 code, msg, self.uri = roslib.scriptutil.get_master().lookupService(rospy.names.get_caller_id(), self.resolved_name)
453 except:
454 raise ServiceException("unable to contact master")
455 if code != 1:
456 logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, msg)
457 raise ServiceException("service [%s] unavailable"%self.resolved_name)
458
459
460 try:
461 rospy.core.parse_rosrpc_uri(self.uri)
462 except rospy.impl.validators.ParameterInvalid:
463 raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri)
464 except socket.error as e:
465 logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name)
466 return self.uri
467
468 - def call(self, *args, **kwds):
469 """
470 Call the service. This accepts either a request message instance,
471 or you can call directly with arguments to create a new request instance. e.g.::
472
473 add_two_ints(AddTwoIntsRequest(1, 2))
474 add_two_ints(1, 2)
475 add_two_ints(a=1, b=2)
476
477 @raise TypeError: if request is not of the valid type (Message)
478 @raise ServiceException: if communication with remote service fails
479 @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call
480 @raise ROSSerializationException: If unable to serialize
481 message. This is usually a type error with one of the fields.
482 """
483
484
485 request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)
486
487
488 if self.transport is None:
489 service_uri = self._get_service_uri(request)
490 dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri)
491
492
493 transport = TCPROSTransport(self.protocol, self.resolved_name)
494 transport.buff_size = self.buff_size
495 try:
496 transport.connect(dest_addr, dest_port, service_uri)
497 except TransportInitError as e:
498
499 raise ServiceException("unable to connect to service: %s"%e)
500 self.transport = transport
501 else:
502 transport = self.transport
503
504
505 self.seq += 1
506 transport.send_message(request, self.seq)
507
508 try:
509 responses = transport.receive_once()
510 if len(responses) == 0:
511 raise ServiceException("service [%s] returned no response"%self.resolved_name)
512 elif len(responses) > 1:
513 raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses)))
514 except rospy.exceptions.TransportException as e:
515
516 if rospy.core.is_shutdown():
517 raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call")
518 else:
519 raise ServiceException("transport error completing service call: %s"%(str(e)))
520 finally:
521 if not self.persistent:
522 transport.close()
523 self.transport = None
524 return responses[0]
525
526
528 """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances."""
529 if self.transport is not None:
530 self.transport.close()
531
533 """
534 Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class.
535 """
536
538 super(ServiceImpl, self).__init__(name, service_class)
539
540 if not name or not isstring(name):
541 raise ValueError("service name is not a non-empty string")
542
543 if not roslib.names.is_legal_name(name):
544 import warnings
545 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
546
547
548 self.handler = handler
549 self.registered = False
550 self.seq = 0
551 self.done = False
552 self.buff_size=buff_size
553
554 start_tcpros_server()
555 host, port = get_tcpros_server_address()
556 self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port)
557 logdebug("... service URL is %s"%self.uri)
558
559 self.protocol = TCPService(self.resolved_name, service_class, self.buff_size)
560
561 logdebug("[%s]: new Service instance"%self.resolved_name)
562
563
564
566 """
567 Stop this service
568 @param reason: human-readable shutdown reason
569 @type reason: str
570 """
571 self.done = True
572 logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason))
573 try:
574
575 get_service_manager().unregister(self.resolved_name, self)
576 except Exception as e:
577 logerr("Unable to unregister with master: "+traceback.format_exc())
578 raise ServiceException("Unable to connect to master: %s"%e)
579
581 """
582 Let service run and take over thread until service or node
583 shutdown. Use this method to keep your scripts from exiting
584 execution.
585 """
586 try:
587 while not rospy.core.is_shutdown() and not self.done:
588 time.sleep(0.5)
589 except KeyboardInterrupt:
590 logdebug("keyboard interrupt, shutting down")
591
593 """
594 Send error message to client
595 @param transport: transport connection to client
596 @type transport: Transport
597 @param err_msg: error message to send to client
598 @type err_msg: str
599 """
600 transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg))
601
603 """
604 Process a single incoming request.
605 @param transport: transport instance
606 @type transport: L{TCPROSTransport}
607 @param request: Message
608 @type request: roslib.message.Message
609 """
610 try:
611
612 response = convert_return_to_response(self.handler(request), self.response_class)
613 self.seq += 1
614
615 transport.write_buff.write(struct.pack('<B', 1))
616 transport.send_message(response, self.seq)
617 except ServiceException as e:
618 rospy.core.rospydebug("handler raised ServiceException: %s"%(e))
619 self._write_service_error(transport, "service cannot process request: %s"%e)
620 except Exception as e:
621 logerr("Error processing request: %s\n%s"%(e,traceback.print_exc()))
622 self._write_service_error(transport, "error processing request: %s"%e)
623
624 - def handle(self, transport, header):
625 """
626 Process incoming request. This method should be run in its
627 own thread. If header['persistent'] is set to 1, method will
628 block until connection is broken.
629 @param transport: transport instance
630 @type transport: L{TCPROSTransport}
631 @param header: headers from client
632 @type header: dict
633 """
634 if 'persistent' in header and \
635 header['persistent'].lower() in ['1', 'true']:
636 persistent = True
637 else:
638 persistent = False
639 if header.get('probe', None) == '1':
640
641 transport.close()
642 return
643 handle_done = False
644 while not handle_done:
645 try:
646 requests = transport.receive_once()
647 for request in requests:
648 self._handle_request(transport, request)
649 if not persistent:
650 handle_done = True
651 except rospy.exceptions.TransportTerminated as e:
652 if not persistent:
653 logerr("incoming connection failed: %s"%e)
654 logdebug("service[%s]: transport terminated"%self.resolved_name)
655 handle_done = True
656 transport.close()
657
658
660 """
661 Declare a ROS service. Service requests are passed to the
662 specified handler.
663
664 Service Usage::
665 s = Service('getmapservice', GetMap, get_map_handler)
666 """
667
669 """
670 ctor.
671 @param name: service name
672 @type name: str
673 @param service_class: ServiceDefinition class
674 @type service_class: ServiceDefinition class
675
676 @param handler: callback function for processing service
677 request. Function takes in a ServiceRequest and returns a
678 ServiceResponse of the appropriate type. Function may also
679 return a list, tuple, or dictionary with arguments to initialize
680 a ServiceResponse instance of the correct type.
681
682 If handler cannot process request, it may either return None,
683 to indicate failure, or it may raise a rospy.ServiceException
684 to send a specific error message to the client. Returning None
685 is always considered a failure.
686
687 @type handler: fn(req)->resp
688 @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message
689 @type buff_size: int
690 """
691 super(Service, self).__init__(name, service_class, handler, buff_size)
692
693
694 get_service_manager().register(self.resolved_name, self)
695