Package rospy :: Package impl :: Module tcpros_service

Source Code for Module rospy.impl.tcpros_service

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  """Internal use: Service-specific extensions for TCPROS support""" 
 34   
 35  import io 
 36  import socket 
 37  import struct 
 38  import sys 
 39  import logging 
 40   
 41  import time 
 42  import traceback 
 43   
 44  import genpy 
 45   
 46  import rosgraph 
 47  import rosgraph.names 
 48  import rosgraph.network 
 49   
 50  from rospy.exceptions import TransportInitError, TransportTerminated, ROSException, ROSInterruptException 
 51  from rospy.service import _Service, ServiceException 
 52   
 53  from rospy.impl.registration import get_service_manager 
 54  from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \ 
 55      get_tcpros_server_address, start_tcpros_server, recv_buff, \ 
 56      DEFAULT_BUFF_SIZE 
 57   
 58  from rospy.core import logwarn, loginfo, logerr, logdebug 
 59  import rospy.core 
 60  import rospy.msg 
 61  import rospy.names 
 62   
 63  import rospy.impl.validators 
 64   
 65  import threading 
 66   
 67  if sys.hexversion > 0x03000000: #Python3 
68 - def isstring(s):
69 return isinstance(s, str) #Python 3.x
70 else:
71 - def isstring(s):
72 return isinstance(s, basestring) #Python 2.x
73 74 logger = logging.getLogger('rospy.service') 75 76 ######################################################### 77 # Service helpers 78
79 -def wait_for_service(service, timeout=None):
80 """ 81 Blocks until service is available. Use this in 82 initialization code if your program depends on a 83 service already running. 84 @param service: name of service 85 @type service: str 86 @param timeout: timeout time in seconds or Duration, None for no 87 timeout. NOTE: timeout=0 is invalid as wait_for_service actually 88 contacts the service, so non-blocking behavior is not 89 possible. For timeout=0 uses cases, just call the service without 90 waiting. 91 @type timeout: double|rospy.Duration 92 @note roscpp waitForService() has timeout specified in millisecs. 93 @raise ROSException: if specified timeout is exceeded 94 @raise ROSInterruptException: if shutdown interrupts wait 95 """ 96 master = rosgraph.Master(rospy.names.get_caller_id()) 97 def contact_service(resolved_name, timeout=10.0): 98 try: 99 uri = master.lookupService(resolved_name) 100 except rosgraph.MasterException: 101 return False 102 103 addr = rospy.core.parse_rosrpc_uri(uri) 104 if rosgraph.network.use_ipv6(): 105 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 106 else: 107 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 108 try: 109 # we always want to timeout just in case we're connecting 110 # to a down service. 111 s.settimeout(timeout) 112 logdebug('connecting to ' + str(addr)) 113 s.connect(addr) 114 h = { 'probe' : '1', 'md5sum' : '*', 115 'callerid' : rospy.core.get_caller_id(), 116 'service': resolved_name } 117 rosgraph.network.write_ros_handshake_header(s, h) 118 return uri 119 finally: 120 if s is not None: 121 s.close()
122 if timeout is not None and isinstance(timeout, rospy.Duration): 123 timeout = timeout.to_sec() 124 if timeout == 0.: 125 raise ValueError("timeout must be non-zero") 126 resolved_name = rospy.names.resolve_name(service) 127 contact_failed = False 128 if timeout: 129 timeout_t = time.time() + timeout 130 while not rospy.core.is_shutdown() and time.time() < timeout_t: 131 try: 132 uri = contact_service(resolved_name, timeout_t - time.time()) 133 if uri: 134 if contact_failed: 135 rospy.core.loginfo("wait_for_service(%s): finally were able to contact [%s]"%(resolved_name, uri)) 136 return 137 except KeyboardInterrupt: 138 # re-raise 139 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising") 140 raise 141 except: # service not actually up 142 contact_failed = True 143 rospy.core.logwarn_throttle(10, "wait_for_service(%s): failed to contact, will keep trying"%resolved_name) 144 time.sleep(0.3) 145 if rospy.core.is_shutdown(): 146 raise ROSInterruptException("rospy shutdown") 147 else: 148 raise ROSException("timeout exceeded while waiting for service %s"%resolved_name) 149 else: 150 while not rospy.core.is_shutdown(): 151 try: 152 uri = contact_service(resolved_name) 153 if uri: 154 if contact_failed: 155 rospy.core.loginfo("wait_for_service(%s): finally were able to contact [%s]"%(resolved_name, uri)) 156 return 157 except KeyboardInterrupt: 158 # re-raise 159 rospy.core.logdebug("wait_for_service: received keyboard interrupt, assuming signals disabled and re-raising") 160 raise 161 except: # service not actually up 162 contact_failed = True 163 rospy.core.logwarn_throttle(10, "wait_for_service(%s): failed to contact, will keep trying"%resolved_name) 164 time.sleep(0.3) 165 if rospy.core.is_shutdown(): 166 raise ROSInterruptException("rospy shutdown") 167 168
169 -def convert_return_to_response(response, response_class):
170 """ 171 Convert return value of function to response instance. The 172 rules/precedence for this are: 173 174 1. If the return type is the same as the response type, no conversion 175 is done. 176 177 2. If the return type is a dictionary, it is used as a keyword-style 178 initialization for a new response instance. 179 180 3. If the return type is *not* a list type, it is passed in as a single arg 181 to a new response instance. 182 183 4. If the return type is a list/tuple type, it is used as a args-style 184 initialization for a new response instance. 185 """ 186 187 # use this declared ROS type check instead of a direct instance 188 # check, which allows us to play tricks with serialization and 189 # deserialization 190 if isinstance(response, genpy.Message) and response._type == response_class._type: 191 #if isinstance(response, response_class): 192 return response 193 elif type(response) == dict: 194 # kwds response 195 try: 196 return response_class(**response) 197 except AttributeError as e: 198 raise ServiceException("handler returned invalid value: %s"%str(e)) 199 elif response == None: 200 raise ServiceException("service handler returned None") 201 elif type(response) not in [list, tuple]: 202 # single, non-list arg 203 try: 204 return response_class(response) 205 except TypeError as e: 206 raise ServiceException("handler returned invalid value: %s"%str(e)) 207 else: 208 # user returned a list, which has some ambiguous cases. Our resolution is that 209 # all list/tuples are converted to *args 210 try: 211 return response_class(*response) 212 except TypeError as e: 213 raise ServiceException("handler returned wrong number of values: %s"%str(e))
214
215 -def service_connection_handler(sock, client_addr, header):
216 """ 217 Process incoming service connection. For use with 218 TCPROSServer. Reads in service name from handshake and creates the 219 appropriate service handler for the connection. 220 @param sock: socket connection 221 @type sock: socket 222 @param client_addr: client address 223 @type client_addr: (str, int) 224 @param header: key/value pairs from handshake header 225 @type header: dict 226 @return: error string or None 227 @rtype: str 228 """ 229 for required in ['service', 'md5sum', 'callerid']: 230 if not required in header: 231 return "Missing required '%s' field"%required 232 else: 233 logger.debug("connection from %s:%s", client_addr[0], client_addr[1]) 234 service_name = header['service'] 235 236 #TODO: make service manager configurable. I think the right 237 #thing to do is to make these singletons private members of a 238 #Node instance and enable rospy to have multiple node 239 #instances. 240 241 sm = get_service_manager() 242 md5sum = header['md5sum'] 243 service = sm.get_service(service_name) 244 if not service: 245 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name) 246 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum: 247 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum) 248 else: 249 transport = TCPROSTransport(service.protocol, service_name, header=header) 250 transport.set_socket(sock, header['callerid']) 251 transport.write_header() 252 # using threadpool reduced performance by an order of 253 # magnitude, need to investigate better 254 t = threading.Thread(target=service.handle, args=(transport, header)) 255 t.daemon = True 256 t.start()
257 258
259 -class TCPService(TCPROSTransportProtocol):
260 """ 261 Protocol implementation for Services over TCPROS 262 """ 263
264 - def __init__(self, resolved_name, service_class, buff_size=DEFAULT_BUFF_SIZE):
265 """ 266 ctor. 267 @param resolved_name: name of service 268 @type resolved_name: str 269 @param service_class: Service data type class 270 @type service_class: Service 271 @param buff_size int: size of buffer (bytes) to use for reading incoming requests. 272 @type buff_size: int 273 """ 274 super(TCPService, self).__init__(resolved_name, service_class._request_class, buff_size=buff_size) 275 self.service_class = service_class
276
277 - def get_header_fields(self):
278 """ 279 Protocol API 280 @return: header fields 281 @rtype: dict 282 """ 283 return {'service': self.resolved_name, 'type': self.service_class._type, 284 'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() }
285
286 -class TCPROSServiceClient(TCPROSTransportProtocol):
287 """Protocol Implementation for Service clients over TCPROS""" 288
289 - def __init__(self, resolved_name, service_class, headers=None, buff_size=DEFAULT_BUFF_SIZE):
290 """ 291 ctor. 292 @param resolved_name: resolved service name 293 @type resolved_name: str 294 @param service_class: Service data type class 295 @type service_class: Service 296 @param headers: identifier for Service session 297 @type headers: dict 298 @param buff_size: size of buffer (bytes) for reading responses from Service. 299 @type buff_size: int 300 """ 301 super(TCPROSServiceClient, self).__init__(resolved_name, service_class._response_class) 302 self.service_class = service_class 303 self.headers = headers or {} 304 self.buff_size = buff_size
305
306 - def get_header_fields(self):
307 """ 308 TCPROSTransportProtocol API 309 """ 310 headers = {'service': self.resolved_name, 'md5sum': self.service_class._md5sum, 311 'callerid': rospy.names.get_caller_id()} 312 # The current implementation allows user-supplied headers to 313 # override protocol-specific headers. We may want to 314 # eliminate this in the future if it is abused too severely. 315 for k, v in self.headers.items(): 316 headers[k] = v 317 return headers
318
319 - def _read_ok_byte(self, b, sock):
320 """ 321 Utility for reading the OK-byte/error-message header preceding each message. 322 @param sock: socket connection. Will be read from if OK byte is 323 false and error message needs to be read 324 @type sock: socket.socket 325 @param b: buffer to read from 326 @type b: StringIO 327 """ 328 if b.tell() == 0: 329 return 330 pos = b.tell() 331 b.seek(0) 332 ok = struct.unpack('<B', b.read(1))[0] # read in ok byte 333 b.seek(pos) 334 if not ok: 335 str = self._read_service_error(sock, b) 336 337 #_read_ok_byte has to reset state of the buffer to 338 #consumed as this exception will bypass rest of 339 #deserialized_messages logic. we currently can't have 340 #multiple requests in flight, so we can keep this simple 341 b.seek(0) 342 b.truncate(0) 343 raise ServiceException("service [%s] responded with an error: %s"%(self.resolved_name, str)) 344 else: 345 # success, set seek point to start of message 346 b.seek(pos)
347
348 - def read_messages(self, b, msg_queue, sock):
349 """ 350 In service implementation, reads in OK byte that precedes each 351 response. The OK byte allows for the passing of error messages 352 instead of a response message 353 @param b: buffer 354 @type b: StringIO 355 @param msg_queue: Message queue to append to 356 @type msg_queue: [Message] 357 @param sock: socket to read from 358 @type sock: socket.socket 359 """ 360 self._read_ok_byte(b, sock) 361 rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1) #rospy.msg 362 #deserialize_messages only resets the buffer to the start 363 #point if everything was consumed, so we have to further reset 364 #it. 365 if b.tell() == 1: 366 b.seek(0)
367
368 - def _read_service_error(self, sock, b):
369 """ 370 Read service error from sock 371 @param sock: socket to read from 372 @type sock: socket 373 @param b: currently read data from sock 374 @type b: StringIO 375 """ 376 buff_size = 256 #can be small given that we are just reading an error string 377 while b.tell() < 5: 378 recv_buff(sock, b, buff_size) 379 bval = b.getvalue() 380 (length,) = struct.unpack('<I', bval[1:5]) # ready in len byte 381 while b.tell() < (5 + length): 382 recv_buff(sock, b, buff_size) 383 bval = b.getvalue() 384 return struct.unpack('<%ss'%length, bval[5:5+length])[0] # ready in len byte
385 386
387 -class ServiceProxy(_Service):
388 """ 389 Create a handle to a ROS service for invoking calls. 390 391 Usage:: 392 add_two_ints = ServiceProxy('add_two_ints', AddTwoInts) 393 resp = add_two_ints(1, 2) 394 """ 395
396 - def __init__(self, name, service_class, persistent=False, headers=None):
397 """ 398 ctor. 399 @param name: name of service to call 400 @type name: str 401 @param service_class: auto-generated service class 402 @type service_class: Service class 403 @param persistent: (optional) if True, proxy maintains a persistent 404 connection to service. While this results in better call 405 performance, persistent connections are discouraged as they are 406 less resistant to network issues and service restarts. 407 @type persistent: bool 408 @param headers: (optional) arbitrary headers 409 @type headers: dict 410 """ 411 super(ServiceProxy, self).__init__(name, service_class) 412 self.uri = None 413 self.seq = 0 414 self.buff_size = DEFAULT_BUFF_SIZE 415 self.persistent = persistent 416 if persistent: 417 if not headers: 418 headers = {} 419 headers['persistent'] = '1' 420 self.protocol = TCPROSServiceClient(self.resolved_name, 421 self.service_class, headers=headers) 422 self.transport = None #for saving persistent connections
423
424 - def wait_for_service(self, timeout=None):
425 wait_for_service(self.resolved_name, timeout=timeout)
426 427 # #425
428 - def __call__(self, *args, **kwds):
429 """ 430 Callable-style version of the service API. This accepts either a request message instance, 431 or you can call directly with arguments to create a new request instance. e.g.:: 432 433 add_two_ints(AddTwoIntsRequest(1, 2)) 434 add_two_ints(1, 2) 435 add_two_ints(a=1, b=2) 436 437 @param args: arguments to remote service 438 @param kwds: message keyword arguments 439 @raise ROSSerializationException: If unable to serialize 440 message. This is usually a type error with one of the fields. 441 """ 442 return self.call(*args, **kwds)
443
444 - def _get_service_uri(self, request):
445 """ 446 private routine for getting URI of service to call 447 @param request: request message 448 @type request: L{rospy.Message} 449 """ 450 if not isinstance(request, genpy.Message): 451 raise TypeError("request object is not a valid request message instance") 452 # in order to support more interesting overrides, we only 453 # check that it declares the same ROS type instead of a 454 # stricter class check 455 #if not self.request_class == request.__class__: 456 if not self.request_class._type == request._type: 457 raise TypeError("request object type [%s] does not match service type [%s]"%(request.__class__, self.request_class)) 458 459 #TODO: subscribe to service changes 460 #if self.uri is None: 461 if 1: #always do lookup for now, in the future we need to optimize 462 try: 463 try: 464 master = rosgraph.Master(rospy.names.get_caller_id()) 465 self.uri = master.lookupService(self.resolved_name) 466 except socket.error: 467 raise ServiceException("unable to contact master") 468 except rosgraph.MasterError as e: 469 logger.error("[%s]: lookup service failed with message [%s]", self.resolved_name, str(e)) 470 raise ServiceException("service [%s] unavailable"%self.resolved_name) 471 472 # validate 473 try: 474 rospy.core.parse_rosrpc_uri(self.uri) 475 except rospy.impl.validators.ParameterInvalid: 476 raise ServiceException("master returned invalid ROSRPC URI: %s"%self.uri) 477 except socket.error as e: 478 logger.error("[%s]: socket error contacting service, master is probably unavailable",self.resolved_name) 479 return self.uri
480
481 - def call(self, *args, **kwds):
482 """ 483 Call the service. This accepts either a request message instance, 484 or you can call directly with arguments to create a new request instance. e.g.:: 485 486 add_two_ints(AddTwoIntsRequest(1, 2)) 487 add_two_ints(1, 2) 488 add_two_ints(a=1, b=2) 489 490 @raise TypeError: if request is not of the valid type (Message) 491 @raise ServiceException: if communication with remote service fails 492 @raise ROSInterruptException: if node shutdown (e.g. ctrl-C) interrupts service call 493 @raise ROSSerializationException: If unable to serialize 494 message. This is usually a type error with one of the fields. 495 """ 496 497 # convert args/kwds to request message class 498 request = rospy.msg.args_kwds_to_message(self.request_class, args, kwds) 499 500 # initialize transport 501 if self.transport is None: 502 service_uri = self._get_service_uri(request) 503 dest_addr, dest_port = rospy.core.parse_rosrpc_uri(service_uri) 504 505 # connect to service 506 transport = TCPROSTransport(self.protocol, self.resolved_name) 507 transport.buff_size = self.buff_size 508 try: 509 transport.connect(dest_addr, dest_port, service_uri) 510 except TransportInitError as e: 511 # can be a connection or md5sum mismatch 512 raise ServiceException("unable to connect to service: %s"%e) 513 if self.persistent: 514 self.transport = transport 515 else: 516 transport = self.transport 517 518 # send the actual request message 519 self.seq += 1 520 transport.send_message(request, self.seq) 521 522 try: 523 responses = transport.receive_once() 524 if len(responses) == 0: 525 raise ServiceException("service [%s] returned no response"%self.resolved_name) 526 elif len(responses) > 1: 527 raise ServiceException("service [%s] returned multiple responses: %s"%(self.resolved_name, len(responses))) 528 except rospy.exceptions.TransportException as e: 529 # convert lower-level exception to exposed type 530 if rospy.core.is_shutdown(): 531 raise rospy.exceptions.ROSInterruptException("node shutdown interrupted service call") 532 else: 533 raise ServiceException("transport error completing service call: %s"%(str(e))) 534 finally: 535 if not self.persistent: 536 transport.close() 537 self.transport = None 538 return responses[0]
539 540
541 - def close(self):
542 """Close this ServiceProxy. This only has an effect on persistent ServiceProxy instances.""" 543 if self.transport is not None: 544 self.transport.close()
545
546 -class ServiceImpl(_Service):
547 """ 548 Implementation of ROS Service. This intermediary class allows for more configuration of behavior than the Service class. 549 """ 550
551 - def __init__(self, name, service_class, handler, buff_size=DEFAULT_BUFF_SIZE, error_handler=None):
552 super(ServiceImpl, self).__init__(name, service_class) 553 554 if not name or not isstring(name): 555 raise ValueError("service name is not a non-empty string") 556 # #2202 557 if not rosgraph.names.is_legal_name(name): 558 import warnings 559 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 560 561 562 self.handler = handler 563 if error_handler is not None: 564 self.error_handler = error_handler 565 self.registered = False 566 self.seq = 0 567 self.done = False 568 self.buff_size=buff_size 569 570 start_tcpros_server() #lazy-init the tcprosserver 571 host, port = get_tcpros_server_address() 572 self.uri = '%s%s:%s'%(rospy.core.ROSRPC, host, port) 573 logdebug("... service URL is %s"%self.uri) 574 575 self.protocol = TCPService(self.resolved_name, service_class, self.buff_size) 576 577 logdebug("[%s]: new Service instance"%self.resolved_name)
578 579 # TODO: should consider renaming to unregister 580
581 - def error_handler(self, e, exc_type, exc_value, tb):
582 logerr("Error processing request: %s\n%s" % (e, traceback.format_exception(exc_type, exc_value, tb)))
583
584 - def shutdown(self, reason=''):
585 """ 586 Stop this service 587 @param reason: human-readable shutdown reason 588 @type reason: str 589 """ 590 self.done = True 591 logdebug('[%s].shutdown: reason [%s]'%(self.resolved_name, reason)) 592 try: 593 #TODO: make service manager configurable 594 get_service_manager().unregister(self.resolved_name, self) 595 except Exception as e: 596 logerr("Unable to unregister with master: "+traceback.format_exc()) 597 raise ServiceException("Unable to connect to master: %s"%e)
598
599 - def spin(self):
600 """ 601 Let service run and take over thread until service or node 602 shutdown. Use this method to keep your scripts from exiting 603 execution. 604 """ 605 try: 606 while not rospy.core.is_shutdown() and not self.done: 607 time.sleep(0.5) 608 except KeyboardInterrupt: 609 logdebug("keyboard interrupt, shutting down")
610
611 - def _write_service_error(self, transport, err_msg):
612 """ 613 Send error message to client 614 @param transport: transport connection to client 615 @type transport: Transport 616 @param err_msg: error message to send to client 617 @type err_msg: str 618 """ 619 if sys.hexversion > 0x03000000: #Python3 620 err_msg = bytes(err_msg, 'utf-8') 621 transport.write_data(struct.pack('<BI%ss'%len(err_msg), 0, len(err_msg), err_msg))
622
623 - def _handle_request(self, transport, request):
624 """ 625 Process a single incoming request. 626 @param transport: transport instance 627 @type transport: L{TCPROSTransport} 628 @param request: Message 629 @type request: genpy.Message 630 """ 631 try: 632 # convert return type to response Message instance 633 response = convert_return_to_response(self.handler(request), self.response_class) 634 self.seq += 1 635 # ok byte 636 transport.write_buff.write(struct.pack('<B', 1)) 637 transport.send_message(response, self.seq) 638 except ServiceException as e: 639 rospy.core.rospydebug("handler raised ServiceException: %s"%(e)) 640 self._write_service_error(transport, "service cannot process request: %s"%e) 641 except Exception as e: 642 (exc_type, exc_value, tb) = sys.exc_info() 643 self.error_handler(e, exc_type, exc_value, tb) 644 self._write_service_error(transport, "error processing request: %s"%e)
645
646 - def handle(self, transport, header):
647 """ 648 Process incoming request. This method should be run in its 649 own thread. If header['persistent'] is set to 1, method will 650 block until connection is broken. 651 @param transport: transport instance 652 @type transport: L{TCPROSTransport} 653 @param header: headers from client 654 @type header: dict 655 """ 656 if 'persistent' in header and \ 657 header['persistent'].lower() in ['1', 'true']: 658 persistent = True 659 else: 660 persistent = False 661 if header.get('probe', None) == '1': 662 #this will likely do more in the future 663 transport.close() 664 return 665 handle_done = False 666 while not handle_done: 667 try: 668 requests = transport.receive_once() 669 for request in requests: 670 self._handle_request(transport, request) 671 if not persistent: 672 handle_done = True 673 except rospy.exceptions.TransportTerminated as e: 674 if not persistent: 675 logerr("incoming connection failed: %s"%e) 676 logdebug("service[%s]: transport terminated"%self.resolved_name) 677 handle_done = True 678 transport.close()
679 680
681 -class Service(ServiceImpl):
682 """ 683 Declare a ROS service. Service requests are passed to the 684 specified handler. 685 686 Service Usage:: 687 s = Service('getmapservice', GetMap, get_map_handler) 688 """ 689
690 - def __init__(self, name, service_class, handler, 691 buff_size=DEFAULT_BUFF_SIZE, error_handler=None):
692 """ 693 ctor. 694 695 @param name: service name, ``str`` 696 @param service_class: Service definition class 697 698 @param handler: callback function for processing service 699 request. Function takes in a ServiceRequest and returns a 700 ServiceResponse of the appropriate type. Function may also 701 return a list, tuple, or dictionary with arguments to initialize 702 a ServiceResponse instance of the correct type. 703 704 If handler cannot process request, it may either return None, 705 to indicate failure, or it may raise a rospy.ServiceException 706 to send a specific error message to the client. Returning None 707 is always considered a failure. 708 709 @type handler: fn(req)->resp 710 711 @param buff_size: size of buffer for reading incoming requests. Should be at least size of request message 712 @type buff_size: int 713 714 @param error_handler: callback function for handling errors 715 raised in the service code. 716 @type error_handler: fn(exception, exception_type, exception_value, traceback)->None 717 """ 718 super(Service, self).__init__(name, service_class, handler, buff_size, 719 error_handler) 720 721 #TODO: make service manager configurable 722 get_service_manager().register(self.resolved_name, self)
723