Package rospy :: Package impl :: Module tcpros_base

Source Code for Module rospy.impl.tcpros_base

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: tcpros_base.py 16621 2012-04-06 00:36:58Z kwc $ 
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37   
 38  try: 
 39      from cStringIO import StringIO #Python 2.x 
 40      python3 = 0 
 41  except ImportError: 
 42      from io import StringIO, BytesIO #Python 3.x 
 43      python3 = 1 
 44  import socket 
 45  import logging 
 46   
 47  try: 
 48      import _thread 
 49  except ImportError: 
 50      import thread as _thread 
 51       
 52  import threading 
 53  import time 
 54  import traceback 
 55  import select 
 56   
 57  import roslib.rosenv 
 58  from roslib.message import DeserializationError, Message 
 59  from roslib.network import read_ros_handshake_header, write_ros_handshake_header 
 60   
 61  # TODO: remove * import from core 
 62  from rospy.core import * 
 63  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 64  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 65  from rospy.msg import deserialize_messages, serialize_message 
 66  from rospy.service import ServiceException 
 67   
 68  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 69   
 70  logger = logging.getLogger('rospy.tcpros') 
 71   
 72  # Receive buffer size for topics/services (in bytes) 
 73  DEFAULT_BUFF_SIZE = 65536 
 74   
 75  ## name of our customized TCP protocol for accepting flows over server socket 
 76  TCPROS = "TCPROS"  
 77   
 78  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 79  _use_tcp_keepalive = None 
 80  _use_tcp_keepalive_lock = threading.Lock() 
81 -def _is_use_tcp_keepalive():
82 global _use_tcp_keepalive 83 if _use_tcp_keepalive is not None: 84 return _use_tcp_keepalive 85 with _use_tcp_keepalive_lock: 86 if _use_tcp_keepalive is not None: 87 return _use_tcp_keepalive 88 # in order to prevent circular dependencies, this does not use the 89 # builtin libraries for interacting with the parameter server 90 m = rospy.core.xmlrpcapi(roslib.rosenv.get_master_uri()) 91 code, msg, val = m.getParam(rospy.names.get_caller_id(), _PARAM_TCP_KEEPALIVE) 92 _use_tcp_keepalive = val if code == 1 else True 93 return _use_tcp_keepalive
94
95 -def recv_buff(sock, b, buff_size):
96 """ 97 Read data from socket into buffer. 98 @param sock: socket to read from 99 @type sock: socket.socket 100 @param b: buffer to receive into 101 @type b: StringIO 102 @param buff_size: recv read size 103 @type buff_size: int 104 @return: number of bytes read 105 @rtype: int 106 """ 107 d = sock.recv(buff_size) 108 if d: 109 b.write(d) 110 return len(d) 111 else: #bomb out 112 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
113
114 -class TCPServer(object):
115 """ 116 Simple server that accepts inbound TCP/IP connections and hands 117 them off to a handler function. TCPServer obeys the 118 ROS_IP/ROS_HOSTNAME environment variables 119 """ 120
121 - def __init__(self, inbound_handler, port=0):
122 """ 123 Setup a server socket listening on the specified port. If the 124 port is omitted, will choose any open port. 125 @param inbound_handler: handler to invoke with 126 new connection 127 @type inbound_handler: fn(sock, addr) 128 @param port: port to bind to, omit/0 to bind to any 129 @type port: int 130 """ 131 self.port = port #will get overwritten if port=0 132 self.addr = None #set at socket bind 133 self.is_shutdown = False 134 self.inbound_handler = inbound_handler 135 try: 136 self.server_sock = self._create_server_sock() 137 except: 138 self.server_sock = None 139 raise
140
141 - def start(self):
142 """Runs the run() loop in a separate thread""" 143 _thread.start_new_thread(self.run, ())
144
145 - def run(self):
146 """ 147 Main TCP receive loop. Should be run in a separate thread -- use start() 148 to do this automatically. 149 """ 150 self.is_shutdown = False 151 if not self.server_sock: 152 raise ROSInternalException("%s did not connect"%self.__class__.__name__) 153 while not self.is_shutdown: 154 try: 155 (client_sock, client_addr) = self.server_sock.accept() 156 except socket.timeout: 157 continue 158 except IOError as e: 159 (errno, msg) = e.args 160 if errno == 4: #interrupted system call 161 continue 162 raise 163 if self.is_shutdown: 164 break 165 try: 166 #leave threading decisions up to inbound_handler 167 self.inbound_handler(client_sock, client_addr) 168 except socket.error as e: 169 if not self.is_shutdown: 170 traceback.print_exc() 171 logwarn("Failed to handle inbound connection due to socket error: %s"%e) 172 logdebug("TCPServer[%s] shutting down", self.port)
173 174
175 - def get_full_addr(self):
176 """ 177 @return: (ip address, port) of server socket binding 178 @rtype: (str, int) 179 """ 180 # return roslib.network.get_host_name() instead of address so that it 181 # obeys ROS_IP/ROS_HOSTNAME behavior 182 return (roslib.network.get_host_name(), self.port)
183
184 - def _create_server_sock(self):
185 """ 186 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 187 binding to loopback interface. 188 """ 189 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 190 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 191 server_sock.bind((roslib.network.get_bind_address(), self.port)) 192 (self.addr, self.port) = server_sock.getsockname() 193 server_sock.listen(5) 194 return server_sock
195
196 - def shutdown(self):
197 """shutdown I/O resources uses by this server""" 198 if not self.is_shutdown: 199 self.is_shutdown = True 200 self.server_sock.close()
201 202 # base maintains a tcpros_server singleton that is shared between 203 # services and topics for inbound connections. This global is set in 204 # the tcprosserver constructor. Constructor is called by init_tcpros() 205 _tcpros_server = None 206
207 -def init_tcpros_server():
208 """starts the TCPROS server socket for inbound connections""" 209 global _tcpros_server 210 if _tcpros_server is None: 211 _tcpros_server = TCPROSServer() 212 rospy.core.add_shutdown_hook(_tcpros_server.shutdown) 213 return _tcpros_server
214
215 -def start_tcpros_server():
216 """ 217 start the TCPROS server if it has not started already 218 """ 219 if _tcpros_server is None: 220 init_tcpros_server() 221 return _tcpros_server.start_server()
222 223 # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers 224
225 -def get_tcpros_server_address():
226 """ 227 get the address of the tcpros server. 228 @raise Exception: if tcpros server has not been started or created 229 """ 230 return _tcpros_server.get_address()
231
232 -def _error_connection_handler(sock, client_addr, header):
233 """ 234 utility handler that does nothing more than provide a rejection header 235 @param sock: socket connection 236 @type sock: socket.socket 237 @param client_addr: client address 238 @type client_addr: str 239 @param header: request header 240 @type header: dict 241 """ 242 return {'error': "unhandled connection"}
243
244 -class TCPROSServer(object):
245 """ 246 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 247 connections as well as ROS service connections over TCP. TCP server 248 socket is run once start_server() is called -- this is implicitly 249 called during init_publisher(). 250 """ 251
252 - def __init__(self):
253 """ctor.""" 254 self.tcp_ros_server = None #: server for receiving tcp conn 255 self.lock = threading.Lock() 256 # should be set to fn(sock, client_addr, header) for topic connections 257 self.topic_connection_handler = _error_connection_handler 258 # should be set to fn(sock, client_addr, header) for service connections 259 self.service_connection_handler = _error_connection_handler
260
261 - def start_server(self, port=0):
262 """ 263 Starts the TCP socket server if one is not already running 264 @param port: port number to bind to (default 0/any) 265 @type port: int 266 """ 267 if self.tcp_ros_server: 268 return 269 with self.lock: 270 try: 271 if not self.tcp_ros_server: 272 self.tcp_ros_server = TCPServer(self._tcp_server_callback, port) 273 self.tcp_ros_server.start() 274 except Exception as e: 275 self.tcp_ros_server = None 276 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 277 return 0, "unable to establish TCPROS server: %s"%e, []
278
279 - def get_address(self):
280 """ 281 @return: address and port of TCP server socket for accepting 282 inbound connections 283 @rtype: str, int 284 """ 285 if self.tcp_ros_server is not None: 286 return self.tcp_ros_server.get_full_addr() 287 return None, None
288
289 - def shutdown(self, reason=''):
290 """stops the TCP/IP server responsible for receiving inbound connections""" 291 if self.tcp_ros_server: 292 self.tcp_ros_server.shutdown()
293
294 - def _tcp_server_callback(self, sock, client_addr):
295 """ 296 TCPServer callback: detects incoming topic or service connection and passes connection accordingly 297 298 @param sock: socket connection 299 @type sock: socket.socket 300 @param client_addr: client address 301 @type client_addr: (str, int) 302 @raise TransportInitError: If transport cannot be succesfully initialized 303 """ 304 #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, 305 #and then use that to do the writing 306 try: 307 buff_size = 4096 # size of read buffer 308 if python3 == 0: 309 #initialize read_ros_handshake_header with BytesIO for Python 3 (instead of bytesarray()) 310 header = read_ros_handshake_header(sock, StringIO(), buff_size) 311 else: 312 header = read_ros_handshake_header(sock, BytesIO(), buff_size) 313 314 if 'topic' in header: 315 err_msg = self.topic_connection_handler(sock, client_addr, header) 316 elif 'service' in header: 317 err_msg = self.service_connection_handler(sock, client_addr, header) 318 else: 319 err_msg = 'no topic or service name detected' 320 if err_msg: 321 # shutdown race condition: nodes that come up and down 322 # quickly can receive connections during teardown. 323 324 # We use is_shutdown_requested() because we can get 325 # into bad connection states during client shutdown 326 # hooks. 327 if not rospy.core.is_shutdown_requested(): 328 write_ros_handshake_header(sock, {'error' : err_msg}) 329 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 330 else: 331 write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 332 return 333 except rospy.exceptions.TransportInitError as e: 334 logwarn(str(e)) 335 if sock is not None: 336 sock.close() 337 except Exception as e: 338 # collect stack trace separately in local log file 339 if not rospy.core.is_shutdown_requested(): 340 logwarn("Inbound TCP/IP connection failed: %s", e) 341 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc(e)) 342 if sock is not None: 343 sock.close()
344
345 -class TCPROSTransportProtocol(object):
346 """ 347 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 348 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 349 well as which fields to send when creating a new connection (get_header_fields()). 350 """ 351
352 - def __init__(self, resolved_name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
353 """ 354 ctor 355 @param resolved_name: resolved service or topic name 356 @type resolved_name: str 357 @param recv_data_class: message class for deserializing inbound messages 358 @type recv_data_class: Class 359 @param queue_size: maximum number of inbound messages to maintain 360 @type queue_size: int 361 @param buff_size: receive buffer size (in bytes) for reading from the connection. 362 @type buff_size: int 363 """ 364 if recv_data_class and not issubclass(recv_data_class, Message): 365 raise TransportInitError("Unable to initialize transport: data class is not a message data class") 366 self.resolved_name = resolved_name 367 self.recv_data_class = recv_data_class 368 self.queue_size = queue_size 369 self.buff_size = buff_size 370 self.direction = BIDIRECTIONAL
371 372
373 - def read_messages(self, b, msg_queue, sock):
374 """ 375 @param b StringIO: read buffer 376 @param msg_queue [Message]: queue of deserialized messages 377 @type msg_queue: [Message] 378 @param sock socket: protocol can optionally read more data from 379 the socket, but in most cases the required data will already be 380 in b 381 """ 382 # default implementation 383 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
384
385 - def get_header_fields(self):
386 """ 387 Header fields that should be sent over the connection. The header fields 388 are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 389 @return: {str : str}: header fields to send over connection 390 @rtype: dict 391 """ 392 return {}
393 394 # TODO: this still isn't as clean and seamless as I want it to 395 # be. This code came from the merger of publisher, subscriber, and 396 # service code into a common TCPROS transport class. The transport is 397 # customized by a 'protocol' class, which is how the different 398 # pub/sub/service behaviors are achieved. More behavior needs to be 399 # transferred from the transport class into the protocol class, 400 # e.g. deserialization as the state each maintains is somewhat 401 # duplicative. I would also come up with a better name than 402 # protocol. 403
404 -class TCPROSTransport(Transport):
405 """ 406 Generic implementation of TCPROS exchange routines for both topics and services 407 """ 408 transport_type = 'TCPROS' 409
410 - def __init__(self, protocol, name, header=None):
411 """ 412 ctor 413 @param name str: identifier 414 @param protocol TCPROSTransportProtocol protocol implementation 415 @param header dict: (optional) handshake header if transport handshake header was 416 already read off of transport. 417 @raise TransportInitError if transport cannot be initialized according to arguments 418 """ 419 super(TCPROSTransport, self).__init__(protocol.direction, name=name) 420 if not name: 421 raise TransportInitError("Unable to initialize transport: name is not set") 422 423 self.protocol = protocol 424 425 self.socket = None 426 self.endpoint_id = 'unknown' 427 self.dest_address = None # for reconnection 428 429 if python3 == 0: # Python 2.x 430 self.read_buff = StringIO() 431 self.write_buff = StringIO() 432 else: # Python 3.x 433 self.read_buff = BytesIO() 434 self.write_buff = BytesIO() 435 436 #self.write_buff = StringIO() 437 self.header = header 438 439 # #1852 have to hold onto latched messages on subscriber side 440 self.is_latched = False 441 self.latch = None 442 443 # these fields are actually set by the remote 444 # publisher/service. they are set for tools that connect 445 # without knowing the actual field name 446 self.md5sum = None 447 self.type = None
448
449 - def set_socket(self, sock, endpoint_id):
450 """ 451 Set the socket for this transport 452 @param sock: socket 453 @type sock: socket.socket 454 @param endpoint_id: identifier for connection endpoint 455 @type endpoint_id: str 456 @raise TransportInitError: if socket has already been set 457 """ 458 if self.socket is not None: 459 raise TransportInitError("socket already initialized") 460 self.socket = sock 461 self.endpoint_id = endpoint_id
462
463 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
464 """ 465 Establish TCP connection to the specified 466 address/port. connect() always calls L{write_header()} and 467 L{read_header()} after the connection is made 468 @param dest_addr: destination IP address 469 @type dest_addr: str 470 @param dest_port: destination port 471 @type dest_port: int 472 @param endpoint_id: string identifier for connection (for statistics) 473 @type endpoint_id: str 474 @param timeout: (optional keyword) timeout in seconds 475 @type timeout: float 476 @raise TransportInitError: if unable to create connection 477 """ 478 try: 479 self.endpoint_id = endpoint_id 480 self.dest_address = (dest_addr, dest_port) 481 482 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 483 if _is_use_tcp_keepalive(): 484 # OSX (among others) does not define these options 485 if hasattr(socket, 'TCP_KEEPCNT') and \ 486 hasattr(socket, 'TCP_KEEPIDLE') and \ 487 hasattr(socket, 'TCP_KEEPINTVL'): 488 # turn on KEEPALIVE 489 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 490 # - # keepalive failures before actual connection failure 491 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 492 # - timeout before starting KEEPALIVE process 493 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 494 # - interval to send KEEPALIVE after IDLE timeout 495 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 496 if timeout is not None: 497 s.settimeout(timeout) 498 self.socket = s 499 self.socket.connect((dest_addr, dest_port)) 500 self.write_header() 501 self.read_header() 502 except TransportInitError as tie: 503 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 504 raise 505 except Exception as e: 506 # FATAL: no reconnection as error is unknown 507 self.done = True 508 if self.socket: 509 try: 510 self.socket.shutdown() 511 except: 512 pass 513 finally: 514 self.socket.close() 515 self.socket = None 516 517 #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e))) 518 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 519 raise TransportInitError(str(e)) #re-raise i/o error
520
521 - def _validate_header(self, header):
522 """ 523 Validate header and initialize fields accordingly 524 @param header: header fields from publisher 525 @type header: dict 526 @raise TransportInitError: if header fails to validate 527 """ 528 self.header = header 529 if 'error' in header: 530 raise TransportInitError("remote error reported: %s"%header['error']) 531 for required in ['md5sum', 'type']: 532 if not required in header: 533 raise TransportInitError("header missing required field [%s]"%required) 534 self.md5sum = header['md5sum'] 535 self.type = header['type'] 536 if header.get('latching', '0') == '1': 537 self.is_latched = True
538
539 - def write_header(self):
540 """Writes the TCPROS header to the active connection.""" 541 sock = self.socket 542 # socket may still be getting spun up, so wait for it to be writable 543 fileno = sock.fileno() 544 ready = None 545 while not ready: 546 _, ready, _ = select.select([], [fileno], []) 547 logger.debug("[%s]: writing header", self.name) 548 sock.setblocking(1) 549 self.stat_bytes += write_ros_handshake_header(sock, self.protocol.get_header_fields())
550
551 - def read_header(self):
552 """ 553 Read TCPROS header from active socket 554 @raise TransportInitError if header fails to validate 555 """ 556 self.socket.setblocking(1) 557 self._validate_header(read_ros_handshake_header(self.socket, self.read_buff, self.protocol.buff_size))
558
559 - def send_message(self, msg, seq):
560 """ 561 Convenience routine for services to send a message across a 562 particular connection. NOTE: write_data is much more efficient 563 if same message is being sent to multiple connections. Not 564 threadsafe. 565 @param msg: message to send 566 @type msg: Msg 567 @param seq: sequence number for message 568 @type seq: int 569 @raise TransportException: if error occurred sending message 570 """ 571 # this will call write_data(), so no need to keep track of stats 572 serialize_message(self.write_buff, seq, msg) 573 self.write_data(self.write_buff.getvalue()) 574 self.write_buff.truncate(0)
575
576 - def write_data(self, data):
577 """ 578 Write raw data to transport 579 @raise TransportInitialiationError: could not be initialized 580 @raise TransportTerminated: no longer open for publishing 581 """ 582 if not self.socket: 583 raise TransportInitError("TCPROS transport was not successfully initialized") 584 if self.done: 585 raise TransportTerminated("connection closed") 586 try: 587 #TODO: get rid of sendalls and replace with async-style publishing 588 self.socket.sendall(data) 589 self.stat_bytes += len(data) 590 self.stat_num_msg += 1 591 except IOError as ioe: 592 #for now, just document common errno's in code 593 (errno, msg) = ioe.args 594 if errno == 32: #broken pipe 595 logdebug("ERROR: Broken Pipe") 596 self.close() 597 raise TransportTerminated(str(errno)+msg) 598 raise #re-raise 599 except socket.error as se: 600 #for now, just document common errno's in code 601 (errno, msg) = se.args 602 if errno == 32: #broken pipe 603 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 604 self.close() 605 raise TransportTerminated(msg) 606 elif errno == 104: #connection reset by peer 607 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id) 608 self.close() 609 raise TransportTerminated(msg) 610 else: 611 rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 612 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg) 613 self.close() 614 raise TransportTerminated(str(errno)+' '+msg) 615 return True
616
617 - def receive_once(self):
618 """ 619 block until messages are read off of socket 620 @return: list of newly received messages 621 @rtype: [Msg] 622 @raise TransportException: if unable to receive message due to error 623 """ 624 sock = self.socket 625 if sock is None: 626 raise TransportException("connection not initialized") 627 b = self.read_buff 628 msg_queue = [] 629 p = self.protocol 630 try: 631 sock.setblocking(1) 632 while not msg_queue and not self.done and not is_shutdown(): 633 if b.tell() >= 4: 634 p.read_messages(b, msg_queue, sock) 635 if not msg_queue: 636 recv_buff(sock, b, p.buff_size) 637 self.stat_num_msg += len(msg_queue) #STATS 638 # set the _connection_header field 639 for m in msg_queue: 640 m._connection_header = self.header 641 642 # #1852: keep track of last latched message 643 if self.is_latched and msg_queue: 644 self.latch = msg_queue[-1] 645 646 return msg_queue 647 648 except DeserializationError as e: 649 rospyerr(traceback.format_exc()) 650 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 651 except TransportTerminated as e: 652 raise #reraise 653 except ServiceException as e: 654 raise 655 except Exception as e: 656 rospyerr(traceback.format_exc()) 657 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 658 return retval
659
660 - def _reconnect(self):
661 # This reconnection logic is very hacky right now. I need to 662 # rewrite the I/O core so that this is handled more centrally. 663 664 if self.dest_address is None: 665 raise ROSInitException("internal error with reconnection state: address not stored") 666 667 interval = 0.5 # seconds 668 while self.socket is None and not self.done and not rospy.is_shutdown(): 669 try: 670 # set a timeout so that we can continue polling for 671 # exit. 30. is a bit high, but I'm concerned about 672 # embedded platforms. To do this properly, we'd have 673 # to move to non-blocking routines. 674 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 675 except TransportInitError: 676 self.socket = None 677 678 if self.socket is None: 679 # exponential backoff 680 interval = interval * 2 681 682 time.sleep(interval)
683
684 - def receive_loop(self, msgs_callback):
685 """ 686 Receive messages until shutdown 687 @param msgs_callback: callback to invoke for new messages received 688 @type msgs_callback: fn([msg]) 689 """ 690 # - use assert here as this would be an internal error, aka bug 691 logger.debug("receive_loop for [%s]", self.name) 692 try: 693 while not self.done and not is_shutdown(): 694 try: 695 if self.socket is not None: 696 msgs = self.receive_once() 697 if not self.done and not is_shutdown(): 698 msgs_callback(msgs) 699 else: 700 self._reconnect() 701 702 except TransportException as e: 703 # set socket to None so we reconnect 704 try: 705 if self.socket is not None: 706 try: 707 self.socket.shutdown() 708 except: 709 pass 710 finally: 711 self.socket.close() 712 except: 713 pass 714 self.socket = None 715 716 except DeserializationError as e: 717 #TODO: how should we handle reconnect in this case? 718 719 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e)) 720 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc()) 721 except: 722 # in many cases this will be a normal hangup, but log internally 723 try: 724 #1467 sometimes we get exceptions due to 725 #interpreter shutdown, so blanket ignore those if 726 #the reporting fails 727 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) 728 except: pass 729 730 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) 731 finally: 732 if not self.done: 733 self.close()
734
735 - def close(self):
736 """close i/o and release resources""" 737 self.done = True 738 try: 739 if self.socket is not None: 740 try: 741 self.socket.shutdown() 742 except: 743 pass 744 finally: 745 self.socket.close() 746 finally: 747 self.socket = self.read_buff = self.write_buff = self.protocol = None 748 super(TCPROSTransport, self).close()
749