1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: Service-specific extensions for TCPROS support"""
36
37 import cStringIO
38 import socket
39 import struct
40 import sys
41 import logging
42 import thread
43 import time
44 import traceback
45
46 import roslib.scriptutil
47
48 from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException
49 from rospy.service import _Service, ServiceException
50
51 from rospy.impl.registration import get_service_manager
52 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
53 get_tcpros_server_address, start_tcpros_server, recv_buff, \
54 DEFAULT_BUFF_SIZE
55
56 from rospy.core import logwarn, loginfo, logerr, logdebug
57 import rospy.core
58 import rospy.msg
59 import rospy.names
60
61 import rospy.impl.validators
62
63 logger = logging.getLogger('rospy.service')
64
65
66
67
69 """
70 Blocks until service is available. Use this in
71 initialization code if your program depends on a
72 service already running.
73 @param service: name of service
74 @type service: str
75 @param timeout: timeout time in seconds, None for no
76 timeout. NOTE: timeout=0 is invalid as wait_for_service actually
77 contacts the service, so non-blocking behavior is not
78 possible. For timeout=0 uses cases, just call the service without
79 waiting.
80 @type timeout: double
81 @raise ROSException: if specified timeout is exceeded
82 @raise ROSInterruptException: if shutdown interrupts wait
83 """
84 def contact_service(resolved_name, timeout=10.0):
85 code, _, uri = master.lookupService(rospy.names.get_caller_id(), resolved_name)
86 if False and code == 1:
87 return True
88 elif True and code == 1:
89
90 addr = rospy.core.parse_rosrpc_uri(uri)
91 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
92 try:
93
94
95 s.settimeout(timeout)
96 s.connect(addr)
97 h = { 'probe' : '1', 'md5sum' : '*',
98 'callerid' : rospy.core.get_caller_id(),
99 'service': resolved_name }
100 roslib.network.write_ros_handshake_header(s, h)
101 return True
102 finally:
103 if s is not None:
104 s.close()
105 if timeout == 0.:
106 raise ValueError("timeout must be non-zero")
107 resolved_name = rospy.names.resolve_name(service)
108 master = roslib.scriptutil.get_master()
109 first = False
110 if timeout:
111 timeout_t = time.time() + timeout
112 while not rospy.core.is_shutdown() and time.time() < timeout_t:
113 try:
114 if contact_service(resolved_name, timeout_t-time.time()):
115 return
116 time.sleep(0.3)
117 except KeyboardInterrupt:
118
119 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
120 raise
121 except:
122 if first:
123 first = False
124 rospy.core.logerr("wait_for_service(%s): failed to contact [%s], will keep trying"%(resolved_name, uri))
125 if rospy.core.is_shutdown():
126 raise ROSInterruptException("rospy shutdown")
127 else:
128 raise ROSException("timeout exceeded while waiting for service %s"%resolved_name)
129 else:
130 while not rospy.core.is_shutdown():
131 try:
132 if contact_service(resolved_name):
133 return
134 time.sleep(0.3)
135 except KeyboardInterrupt:
136
137 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising")
138 raise
139 except:
140 if first:
141 first = False
142 rospy.core.logerr("wait_for_service(%s): failed to contact [%s], will keep trying"%(resolved_name, uri))
143 if rospy.core.is_shutdown():
144 raise ROSInterruptException("rospy shutdown")
145
146
148 """
149 Convert return value of function to response instance. The
150 rules/precedence for this are:
151
152 1. If the return type is the same as the response type, no conversion
153 is done.
154
155 2. If the return type is a dictionary, it is used as a keyword-style
156 initialization for a new response instance.
157
158 3. If the return type is *not* a list type, it is passed in as a single arg
159 to a new response instance.
160
161 4. If the return type is a list/tuple type, it is used as a args-style
162 initialization for a new response instance.
163 """
164
165
166
167
168 if isinstance(response, roslib.message.Message) and response._type == response_class._type:
169
170 return response
171 elif type(response) == dict:
172
173 try:
174 return response_class(**response)
175 except AttributeError, e:
176 raise ServiceException("handler returned invalid value: %s"%str(e))
177 elif response == None:
178 raise ServiceException("service handler returned None")
179 elif type(response) not in [list, tuple]:
180
181 try:
182 return response_class(response)
183 except TypeError, e:
184 raise ServiceException("handler returned invalid value: %s"%str(e))
185 else:
186
187
188 try:
189 return response_class(*response)
190 except TypeError, e:
191 raise ServiceException("handler returned wrong number of values: %s"%str(e))
192
194 """
195 Process incoming service connection. For use with
196 TCPROSServer. Reads in service name from handshake and creates the
197 appropriate service handler for the connection.
198 @param sock: socket connection
199 @type sock: socket
200 @param client_addr: client address
201 @type client_addr: (str, int)
202 @param header: key/value pairs from handshake header
203 @type header: dict
204 @return: error string or None
205 @rtype: str
206 """
207 for required in ['service', 'md5sum', 'callerid']:
208 if not required in header:
209 return "Missing required '%s' field"%required
210 else:
211 logger.debug("connection from %s:%s", client_addr[0], client_addr[1])
212 service_name = header['service']
213
214
215
216
217
218
219 sm = get_service_manager()
220 md5sum = header['md5sum']
221 service = sm.get_service(service_name)
222 if not service:
223 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name)
224 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum:
225 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum)
226 else:
227 transport = TCPROSTransport(service.protocol, service_name, header=header)
228 transport.set_socket(sock, header['callerid'])
229 transport.write_header()
230
231
232 thread.start_new_thread(service.handle, (transport, header))
233
234
236 """
237 Protocol implementation for Services over TCPROS
238 """
239
241 """
242 ctor.
243 @param resolved_name: name of service
244 @type resolved_name: str
245 @param service_class: Service data type class
246 @type service_class: Service
247 @param buff_size int: size of buffer (bytes) to use for reading incoming requests.
248 @type buff_size: int
249 """
250 super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size)
251 self.service_class = service_class
252
254 """
255 Protocol API
256 @return: header fields
257 @rtype: dict
258 """
259 return {'service': self.resolved_name, 'type': self.service_class._type,
260 'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() }
261
263 """Protocol Implementation for Service clients over TCPROS"""
264
266 """
267 ctor.
268 @param resolved_name: resolved service name
269 @type resolved_name: str
270 @param service_class: Service data type class
271 @type service_class: Service
272 @param headers: identifier for Service session
273 @type headers: dict
274 @param buff_size: size of buffer (bytes) for reading responses from Service.
275 @type buff_size: int
276 """
277 super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class)
278 self.service_class = service_class
279 self.headers = headers or {}
280 self.buff_size = buff_size
281
283 """
284 TCPROSTransportProtocol API
285 """
286 headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum,
287 'callerid': rospy.names.get_caller_id()}
288
289
290
291 for k, v in self.headers.iteritems():
292 headers[k] = v
293 return headers
294
296 """
297 Utility for reading the OK-byte/error-message header preceding each message.
298 @param sock: socket connection. Will be read from if OK byte is
299 false and error message needs to be read
300 @type sock: socket.socket
301 @param b: buffer to read from
302 @type b: StringIO
303 """
304 if b.tell() == 0:
305 return
306 pos = b.tell()
307 b.seek(0)
308 ok = struct.unpack('<B', b.read(1))[0]
309 b.seek(pos)
310 if not ok:
311 str = self._read_service_error(sock, b)
312
313
314
315
316
317 b.seek(0)
318 b.truncate(0)
319 raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str))
320 else:
321
322 b.seek(pos)
323
325 """
326 In service implementation, reads in OK byte that preceeds each
327 response. The OK byte allows for the passing of error messages
328 instead of a response message
329 @param b: buffer
330 @type b: StringIO
331 @param msg_queue: Message queue to append to
332 @type msg_queue: [Message]
333 @param sock: socket to read from
334 @type sock: socket.socket
335 """
336 self._read_ok_byte(b, sock)
337 rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1)
338
339
340
341 if b.tell() == 1:
342 b.seek(0)
343
345 """
346 Read service error from sock
347 @param sock: socket to read from
348 @type sock: socket
349 @param b: currently read data from sock
350 @type b: StringIO
351 """
352 buff_size = 256
353 while b.tell() < 5:
354 recv_buff(sock, b, buff_size)
355 bval = b.getvalue()
356 (length,) = struct.unpack('<I', bval[1:5])
357 while b.tell() < (5 + length):
358 recv_buff(sock, b, buff_size)
359 bval = b.getvalue()
360 return struct.unpack('<%ss'%length, bval[5:5+length])[0]
361
362
364 """
365 Create a handle to a ROS service for invoking calls.
366
367 Usage::
368 add_two_ints = ServiceProxy('add_two_ints', AddTwoInts)
369 resp = add_two_ints(1, 2)
370 """
371
372 - def __init__(self, name, service_class, persistent=False, headers=None):
373 """
374 ctor.
375 @param name: name of service to call
376 @type name: str
377 @param service_class: auto-generated service class
378 @type service_class: Service class
379 @param persistent: (optional) if True, proxy maintains a persistent
380 connection to service. While this results in better call
381 performance, persistent connections are discouraged as they are
382 less resistent to network issues and service restarts.
383 @type persistent: bool
384 @param headers: (optional) arbitrary headers
385 @type headers: dict
386 """
387 super(ServiceProxy, self).__init__(name, service_class)
388 self.uri = None
389 self.seq = 0
390 self.buff_size = DEFAULT_BUFF_SIZE
391 self.persistent = persistent
392 if persistent:
393 if not headers:
394 headers = {}
395 headers['persistent'] = '1'
396 self.protocol = TCPROSServiceClient(self.resolved_name,
397 self.service_class, headers=headers)
398 self.transport = None
399
402
403
405 """
406 Callable-style version of the service API. This accepts either a request message instance,
407 or you can call directly with arguments to create a new request instance. e.g.::
408
409 add_two_ints(AddTwoIntsRequest(1, 2))
410 add_two_ints(1, 2)
411 add_two_ints(a=1, b=2)
412
413 @param args: arguments to remote service
414 @param kwds: message keyword arguments
415 @raise ROSSerializationException: If unable to serialize
416 message. This is usually a type error with one of the fields.
417 """
418 return self.call(*args, **kwds)
419
421 """
422 private routine for getting URI of service to call
423 @param request: request message
424 @type request: L{rospy.Message}
425 """
426 if not isinstance(request, roslib.message.Message):
427 raise TypeError("request object is not a valid request message instance")
428
429
430
431
432 if not self.request_class._type == request._type:
433 raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class))
434
435
436
437 if 1:
438 try:
439 try:
440 code, msg, self.uri = roslib.scriptutil.get_master().lookupService(rospy.names.get_caller_id(), self.resolved_name)
441 except:
442 raise ServiceException("unable to contact master")
443 if code != 1:
444 logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, msg)
445 raise ServiceException("service [%s] unavailable"%self.resolved_name)
446
447
448 try:
449 rospy.core.parse_rosrpc_uri(self.uri)
450 except rospy.impl.validators.ParameterInvalid:
451 raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri)
452 except socket.error, e:
453 logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name)
454 return self.uri
455
456 - def call(self, *args, **kwds):
457 """
458 Call the service. This accepts either a request message instance,
459 or you can call directly with arguments to create a new request instance. e.g.::
460
461 add_two_ints(AddTwoIntsRequest(1, 2))
462 add_two_ints(1, 2)
463 add_two_ints(a=1, b=2)
464
465 @raise TypeError: if request is not of the valid type (Message)
466 @raise ServiceException: if communication with remote service fails
467 @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call
468 @raise ROSSerializationException: If unable to serialize
469 message. This is usually a type error with one of the fields.
470 """
471
472
473 request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds)
474
475
476 if self.transport is None:
477 service_uri = self._get_service_uri(request)
478 dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri)
479
480
481 transport = TCPROSTransport(self.protocol, self.resolved_name)
482 transport.buff_size = self.buff_size
483 try:
484 transport.connect(dest_addr, dest_port, service_uri)
485 except TransportInitError, e:
486
487 raise ServiceException("unable to connect to service: %s"%e)
488 self.transport = transport
489 else:
490 transport = self.transport
491
492
493 self.seq += 1
494 transport.send_message(request, self.seq)
495
496 try:
497 responses = transport.receive_once()
498 if len(responses) == 0:
499 raise ServiceException("service [%s] returned no response"%self.resolved_name)
500 elif len(responses) > 1:
501 raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses)))
502 except rospy.exceptions.TransportException, e:
503
504 if rospy.core.is_shutdown():
505 raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call")
506 else:
507 raise ServiceException("transport error completing service call: %s"%(str(e)))
508 finally:
509 if not self.persistent:
510 transport.close()
511 self.transport = None
512 return responses[0]
513
514
516 """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances."""
517 if self.transport is not None:
518 self.transport.close()
519
521 """
522 Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class.
523 """
524
526 super(ServiceImpl, self).__init__(name, service_class)
527
528 if not name or not isinstance(name, basestring):
529 raise ValueError("service name is not a non-empty string")
530
531 if not roslib.names.is_legal_name(name):
532 import warnings
533 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
534
535
536 self.handler = handler
537 self.registered = False
538 self.seq = 0
539 self.done = False
540 self.buff_size=buff_size
541
542 start_tcpros_server()
543 host, port = get_tcpros_server_address()
544 self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port)
545 logdebug("... service URL is %s"%self.uri)
546
547 self.protocol = TCPService(self.resolved_name, service_class, self.buff_size)
548
549 logdebug("[%s]: new Service instance"%self.resolved_name)
550
551
552
554 """
555 Stop this service
556 @param reason: human-readable shutdown reason
557 @type reason: str
558 """
559 self.done = True
560 logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason))
561 try:
562
563 get_service_manager().unregister(self.resolved_name, self)
564 except Exception, e:
565 logerr("Unable to unregister with master: "+traceback.format_exc())
566 raise ServiceException("Unable to connect to master: %s"%e)
567
569 """
570 Let service run and take over thread until service or node
571 shutdown. Use this method to keep your scripts from exiting
572 execution.
573 """
574 try:
575 while not rospy.core.is_shutdown() and not self.done:
576 time.sleep(0.5)
577 except KeyboardInterrupt:
578 logdebug("keyboard interrupt, shutting down")
579
581 """
582 Send error message to client
583 @param transport: transport connection to client
584 @type transport: Transport
585 @param err_msg: error message to send to client
586 @type err_msg: str
587 """
588 transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg))
589
591 """
592 Process a single incoming request.
593 @param transport: transport instance
594 @type transport: L{TCPROSTransport}
595 @param request: Message
596 @type request: roslib.message.Message
597 """
598 try:
599
600 response = convert_return_to_response(self.handler(request), self.response_class)
601 self.seq += 1
602
603 transport.write_buff.write(struct.pack('<B', 1))
604 transport.send_message(response, self.seq)
605 except ServiceException, e:
606 rospy.core.rospydebug("handler raised ServiceException: %s"%(e))
607 self._write_service_error(transport, "service cannot process request: %s"%e)
608 except Exception, e:
609 logerr("Error processing request: %s\n%s"%(e,traceback.print_exc()))
610 self._write_service_error(transport, "error processing request: %s"%e)
611
612 - def handle(self, transport, header):
613 """
614 Process incoming request. This method should be run in its
615 own thread. If header['persistent'] is set to 1, method will
616 block until connection is broken.
617 @param transport: transport instance
618 @type transport: L{TCPROSTransport}
619 @param header: headers from client
620 @type header: dict
621 """
622 if 'persistent' in header and \
623 header['persistent'].lower() in ['1', 'true']:
624 persistent = True
625 else:
626 persistent = False
627 if header.get('probe', None) == '1':
628
629 transport.close()
630 return
631 handle_done = False
632 while not handle_done:
633 try:
634 requests = transport.receive_once()
635 for request in requests:
636 self._handle_request(transport, request)
637 if not persistent:
638 handle_done = True
639 except rospy.exceptions.TransportTerminated, e:
640 if not persistent:
641 logerr("incoming connection failed: %s"%e)
642 logdebug("service[%s]: transport terminated"%self.resolved_name)
643 handle_done = True
644 transport.close()
645
646
648 """
649 Declare a ROS service. Service requests are passed to the
650 specified handler.
651
652 Service Usage::
653 s = Service('getmapservice', GetMap, get_map_handler)
654 """
655
657 """
658 ctor.
659 @param name: service name
660 @type name: str
661 @param service_class: ServiceDefinition class
662 @type service_class: ServiceDefinition class
663
664 @param handler: callback function for processing service
665 request. Function takes in a ServiceRequest and returns a
666 ServiceResponse of the appropriate type. Function may also
667 return a list, tuple, or dictionary with arguments to initialize
668 a ServiceResponse instance of the correct type.
669
670 If handler cannot process request, it may either return None,
671 to indicate failure, or it may raise a rospy.ServiceException
672 to send a specific error message to the client. Returning None
673 is always considered a failure.
674
675 @type handler: fn(req)->resp
676 @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message
677 @type buff_size: int
678 """
679 super(Service, self).__init__(name, service_class, handler, buff_size)
680
681
682 get_service_manager().register(self.resolved_name, self)
683