Package rospy :: Package impl :: Module tcpros_base

Source Code for Module rospy.impl.tcpros_base

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37   
 38  try: 
 39      from cStringIO import StringIO #Python 2.x 
 40      python3 = 0 
 41  except ImportError: 
 42      from io import StringIO, BytesIO #Python 3.x 
 43      python3 = 1 
 44  import socket 
 45  import logging 
 46   
 47  try: 
 48      import _thread 
 49  except ImportError: 
 50      import thread as _thread 
 51       
 52  import threading 
 53  import time 
 54  import traceback 
 55  import select 
 56   
 57  import rosgraph 
 58  import rosgraph.network 
 59  from genpy import DeserializationError, Message 
 60  from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header 
 61   
 62  # TODO: remove * import from core 
 63  from rospy.core import * 
 64  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 65  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 66  from rospy.msg import deserialize_messages, serialize_message 
 67  from rospy.service import ServiceException 
 68   
 69  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 70   
 71  logger = logging.getLogger('rospy.tcpros') 
 72   
 73  # Receive buffer size for topics/services (in bytes) 
 74  DEFAULT_BUFF_SIZE = 65536 
 75   
 76  ## name of our customized TCP protocol for accepting flows over server socket 
 77  TCPROS = "TCPROS"  
 78   
 79  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 80  _use_tcp_keepalive = None 
 81  _use_tcp_keepalive_lock = threading.Lock() 
82 -def _is_use_tcp_keepalive():
83 global _use_tcp_keepalive 84 if _use_tcp_keepalive is not None: 85 return _use_tcp_keepalive 86 with _use_tcp_keepalive_lock: 87 if _use_tcp_keepalive is not None: 88 return _use_tcp_keepalive 89 # in order to prevent circular dependencies, this does not use the 90 # builtin libraries for interacting with the parameter server 91 m = rospy.core.xmlrpcapi(rosgraph.get_master_uri()) 92 code, msg, val = m.getParam(rospy.names.get_caller_id(), _PARAM_TCP_KEEPALIVE) 93 _use_tcp_keepalive = val if code == 1 else True 94 return _use_tcp_keepalive
95
96 -def recv_buff(sock, b, buff_size):
97 """ 98 Read data from socket into buffer. 99 @param sock: socket to read from 100 @type sock: socket.socket 101 @param b: buffer to receive into 102 @type b: StringIO 103 @param buff_size: recv read size 104 @type buff_size: int 105 @return: number of bytes read 106 @rtype: int 107 """ 108 d = sock.recv(buff_size) 109 if d: 110 b.write(d) 111 return len(d) 112 else: #bomb out 113 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
114
115 -class TCPServer(object):
116 """ 117 Simple server that accepts inbound TCP/IP connections and hands 118 them off to a handler function. TCPServer obeys the 119 ROS_IP/ROS_HOSTNAME environment variables 120 """ 121
122 - def __init__(self, inbound_handler, port=0):
123 """ 124 Setup a server socket listening on the specified port. If the 125 port is omitted, will choose any open port. 126 @param inbound_handler: handler to invoke with 127 new connection 128 @type inbound_handler: fn(sock, addr) 129 @param port: port to bind to, omit/0 to bind to any 130 @type port: int 131 """ 132 self.port = port #will get overwritten if port=0 133 self.addr = None #set at socket bind 134 self.is_shutdown = False 135 self.inbound_handler = inbound_handler 136 try: 137 self.server_sock = self._create_server_sock() 138 except: 139 self.server_sock = None 140 raise
141
142 - def start(self):
143 """Runs the run() loop in a separate thread""" 144 _thread.start_new_thread(self.run, ())
145
146 - def run(self):
147 """ 148 Main TCP receive loop. Should be run in a separate thread -- use start() 149 to do this automatically. 150 """ 151 self.is_shutdown = False 152 if not self.server_sock: 153 raise ROSInternalException("%s did not connect"%self.__class__.__name__) 154 while not self.is_shutdown: 155 try: 156 (client_sock, client_addr) = self.server_sock.accept() 157 except socket.timeout: 158 continue 159 except IOError as e: 160 (errno, msg) = e.args 161 if errno == 4: #interrupted system call 162 continue 163 raise 164 if self.is_shutdown: 165 break 166 try: 167 #leave threading decisions up to inbound_handler 168 self.inbound_handler(client_sock, client_addr) 169 except socket.error as e: 170 if not self.is_shutdown: 171 traceback.print_exc() 172 logwarn("Failed to handle inbound connection due to socket error: %s"%e) 173 logdebug("TCPServer[%s] shutting down", self.port)
174 175
176 - def get_full_addr(self):
177 """ 178 @return: (ip address, port) of server socket binding 179 @rtype: (str, int) 180 """ 181 # return rosgraph.network.get_host_name() instead of address so that it 182 # obeys ROS_IP/ROS_HOSTNAME behavior 183 return (rosgraph.network.get_host_name(), self.port)
184
185 - def _create_server_sock(self):
186 """ 187 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 188 binding to loopback interface. 189 """ 190 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 191 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 192 server_sock.bind((rosgraph.network.get_bind_address(), self.port)) 193 (self.addr, self.port) = server_sock.getsockname() 194 server_sock.listen(5) 195 return server_sock
196
197 - def shutdown(self):
198 """shutdown I/O resources uses by this server""" 199 if not self.is_shutdown: 200 self.is_shutdown = True 201 self.server_sock.close()
202 203 # base maintains a tcpros_server singleton that is shared between 204 # services and topics for inbound connections. This global is set in 205 # the tcprosserver constructor. Constructor is called by init_tcpros() 206 _tcpros_server = None 207
208 -def init_tcpros_server():
209 """starts the TCPROS server socket for inbound connections""" 210 global _tcpros_server 211 if _tcpros_server is None: 212 _tcpros_server = TCPROSServer() 213 rospy.core.add_shutdown_hook(_tcpros_server.shutdown) 214 return _tcpros_server
215
216 -def start_tcpros_server():
217 """ 218 start the TCPROS server if it has not started already 219 """ 220 if _tcpros_server is None: 221 init_tcpros_server() 222 return _tcpros_server.start_server()
223 224 # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers 225
226 -def get_tcpros_server_address():
227 """ 228 get the address of the tcpros server. 229 @raise Exception: if tcpros server has not been started or created 230 """ 231 return _tcpros_server.get_address()
232
233 -def _error_connection_handler(sock, client_addr, header):
234 """ 235 utility handler that does nothing more than provide a rejection header 236 @param sock: socket connection 237 @type sock: socket.socket 238 @param client_addr: client address 239 @type client_addr: str 240 @param header: request header 241 @type header: dict 242 """ 243 return {'error': "unhandled connection"}
244
245 -class TCPROSServer(object):
246 """ 247 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 248 connections as well as ROS service connections over TCP. TCP server 249 socket is run once start_server() is called -- this is implicitly 250 called during init_publisher(). 251 """ 252
253 - def __init__(self):
254 """ctor.""" 255 self.tcp_ros_server = None #: server for receiving tcp conn 256 self.lock = threading.Lock() 257 # should be set to fn(sock, client_addr, header) for topic connections 258 self.topic_connection_handler = _error_connection_handler 259 # should be set to fn(sock, client_addr, header) for service connections 260 self.service_connection_handler = _error_connection_handler
261
262 - def start_server(self, port=0):
263 """ 264 Starts the TCP socket server if one is not already running 265 @param port: port number to bind to (default 0/any) 266 @type port: int 267 """ 268 if self.tcp_ros_server: 269 return 270 with self.lock: 271 try: 272 if not self.tcp_ros_server: 273 self.tcp_ros_server = TCPServer(self._tcp_server_callback, port) 274 self.tcp_ros_server.start() 275 except Exception as e: 276 self.tcp_ros_server = None 277 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 278 return 0, "unable to establish TCPROS server: %s"%e, []
279
280 - def get_address(self):
281 """ 282 @return: address and port of TCP server socket for accepting 283 inbound connections 284 @rtype: str, int 285 """ 286 if self.tcp_ros_server is not None: 287 return self.tcp_ros_server.get_full_addr() 288 return None, None
289
290 - def shutdown(self, reason=''):
291 """stops the TCP/IP server responsible for receiving inbound connections""" 292 if self.tcp_ros_server: 293 self.tcp_ros_server.shutdown()
294
295 - def _tcp_server_callback(self, sock, client_addr):
296 """ 297 TCPServer callback: detects incoming topic or service connection and passes connection accordingly 298 299 @param sock: socket connection 300 @type sock: socket.socket 301 @param client_addr: client address 302 @type client_addr: (str, int) 303 @raise TransportInitError: If transport cannot be succesfully initialized 304 """ 305 #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, 306 #and then use that to do the writing 307 try: 308 buff_size = 4096 # size of read buffer 309 if python3 == 0: 310 #initialize read_ros_handshake_header with BytesIO for Python 3 (instead of bytesarray()) 311 header = read_ros_handshake_header(sock, StringIO(), buff_size) 312 else: 313 header = read_ros_handshake_header(sock, BytesIO(), buff_size) 314 315 if 'topic' in header: 316 err_msg = self.topic_connection_handler(sock, client_addr, header) 317 elif 'service' in header: 318 err_msg = self.service_connection_handler(sock, client_addr, header) 319 else: 320 err_msg = 'no topic or service name detected' 321 if err_msg: 322 # shutdown race condition: nodes that come up and down 323 # quickly can receive connections during teardown. 324 325 # We use is_shutdown_requested() because we can get 326 # into bad connection states during client shutdown 327 # hooks. 328 if not rospy.core.is_shutdown_requested(): 329 write_ros_handshake_header(sock, {'error' : err_msg}) 330 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 331 else: 332 write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 333 return 334 except rospy.exceptions.TransportInitError as e: 335 logwarn(str(e)) 336 if sock is not None: 337 sock.close() 338 except Exception as e: 339 # collect stack trace separately in local log file 340 if not rospy.core.is_shutdown_requested(): 341 logwarn("Inbound TCP/IP connection failed: %s", e) 342 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc(e)) 343 if sock is not None: 344 sock.close()
345
346 -class TCPROSTransportProtocol(object):
347 """ 348 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 349 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 350 well as which fields to send when creating a new connection (get_header_fields()). 351 """ 352
353 - def __init__(self, resolved_name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
354 """ 355 ctor 356 @param resolved_name: resolved service or topic name 357 @type resolved_name: str 358 @param recv_data_class: message class for deserializing inbound messages 359 @type recv_data_class: Class 360 @param queue_size: maximum number of inbound messages to maintain 361 @type queue_size: int 362 @param buff_size: receive buffer size (in bytes) for reading from the connection. 363 @type buff_size: int 364 """ 365 if recv_data_class and not issubclass(recv_data_class, Message): 366 raise TransportInitError("Unable to initialize transport: data class is not a message data class") 367 self.resolved_name = resolved_name 368 self.recv_data_class = recv_data_class 369 self.queue_size = queue_size 370 self.buff_size = buff_size 371 self.direction = BIDIRECTIONAL
372 373
374 - def read_messages(self, b, msg_queue, sock):
375 """ 376 @param b StringIO: read buffer 377 @param msg_queue [Message]: queue of deserialized messages 378 @type msg_queue: [Message] 379 @param sock socket: protocol can optionally read more data from 380 the socket, but in most cases the required data will already be 381 in b 382 """ 383 # default implementation 384 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
385
386 - def get_header_fields(self):
387 """ 388 Header fields that should be sent over the connection. The header fields 389 are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 390 @return: {str : str}: header fields to send over connection 391 @rtype: dict 392 """ 393 return {}
394 395 # TODO: this still isn't as clean and seamless as I want it to 396 # be. This code came from the merger of publisher, subscriber, and 397 # service code into a common TCPROS transport class. The transport is 398 # customized by a 'protocol' class, which is how the different 399 # pub/sub/service behaviors are achieved. More behavior needs to be 400 # transferred from the transport class into the protocol class, 401 # e.g. deserialization as the state each maintains is somewhat 402 # duplicative. I would also come up with a better name than 403 # protocol. 404
405 -class TCPROSTransport(Transport):
406 """ 407 Generic implementation of TCPROS exchange routines for both topics and services 408 """ 409 transport_type = 'TCPROS' 410
411 - def __init__(self, protocol, name, header=None):
412 """ 413 ctor 414 @param name str: identifier 415 @param protocol TCPROSTransportProtocol protocol implementation 416 @param header dict: (optional) handshake header if transport handshake header was 417 already read off of transport. 418 @raise TransportInitError if transport cannot be initialized according to arguments 419 """ 420 super(TCPROSTransport, self).__init__(protocol.direction, name=name) 421 if not name: 422 raise TransportInitError("Unable to initialize transport: name is not set") 423 424 self.protocol = protocol 425 426 self.socket = None 427 self.endpoint_id = 'unknown' 428 self.dest_address = None # for reconnection 429 430 if python3 == 0: # Python 2.x 431 self.read_buff = StringIO() 432 self.write_buff = StringIO() 433 else: # Python 3.x 434 self.read_buff = BytesIO() 435 self.write_buff = BytesIO() 436 437 #self.write_buff = StringIO() 438 self.header = header 439 440 # #1852 have to hold onto latched messages on subscriber side 441 self.is_latched = False 442 self.latch = None 443 444 # save the fileno separately so we can garbage collect the 445 # socket but still unregister will poll objects 446 self._fileno = None 447 448 # these fields are actually set by the remote 449 # publisher/service. they are set for tools that connect 450 # without knowing the actual field name 451 self.md5sum = None 452 self.type = None
453
454 - def fileno(self):
455 """ 456 Get descriptor for select 457 """ 458 return self._fileno
459
460 - def set_socket(self, sock, endpoint_id):
461 """ 462 Set the socket for this transport 463 @param sock: socket 464 @type sock: socket.socket 465 @param endpoint_id: identifier for connection endpoint 466 @type endpoint_id: str 467 @raise TransportInitError: if socket has already been set 468 """ 469 if self.socket is not None: 470 raise TransportInitError("socket already initialized") 471 self.socket = sock 472 self.endpoint_id = endpoint_id 473 self._fileno = sock.fileno()
474
475 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
476 """ 477 Establish TCP connection to the specified 478 address/port. connect() always calls L{write_header()} and 479 L{read_header()} after the connection is made 480 @param dest_addr: destination IP address 481 @type dest_addr: str 482 @param dest_port: destination port 483 @type dest_port: int 484 @param endpoint_id: string identifier for connection (for statistics) 485 @type endpoint_id: str 486 @param timeout: (optional keyword) timeout in seconds 487 @type timeout: float 488 @raise TransportInitError: if unable to create connection 489 """ 490 try: 491 self.endpoint_id = endpoint_id 492 self.dest_address = (dest_addr, dest_port) 493 494 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 495 if _is_use_tcp_keepalive(): 496 # OSX (among others) does not define these options 497 if hasattr(socket, 'TCP_KEEPCNT') and \ 498 hasattr(socket, 'TCP_KEEPIDLE') and \ 499 hasattr(socket, 'TCP_KEEPINTVL'): 500 # turn on KEEPALIVE 501 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 502 # - # keepalive failures before actual connection failure 503 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 504 # - timeout before starting KEEPALIVE process 505 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 506 # - interval to send KEEPALIVE after IDLE timeout 507 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 508 if timeout is not None: 509 s.settimeout(timeout) 510 self.socket = s 511 self.socket.connect((dest_addr, dest_port)) 512 self.write_header() 513 self.read_header() 514 except TransportInitError as tie: 515 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 516 raise 517 except Exception as e: 518 #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e))) 519 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 520 521 # FATAL: no reconnection as error is unknown 522 self.done = True 523 if self.socket: 524 try: 525 self.socket.shutdown(socket.SHUT_RDWR) 526 except: 527 pass 528 finally: 529 self.socket.close() 530 self.socket = None 531 532 raise TransportInitError(str(e)) #re-raise i/o error
533
534 - def _validate_header(self, header):
535 """ 536 Validate header and initialize fields accordingly 537 @param header: header fields from publisher 538 @type header: dict 539 @raise TransportInitError: if header fails to validate 540 """ 541 self.header = header 542 if 'error' in header: 543 raise TransportInitError("remote error reported: %s"%header['error']) 544 for required in ['md5sum', 'type']: 545 if not required in header: 546 raise TransportInitError("header missing required field [%s]"%required) 547 self.md5sum = header['md5sum'] 548 self.type = header['type'] 549 if header.get('latching', '0') == '1': 550 self.is_latched = True
551
552 - def write_header(self):
553 """Writes the TCPROS header to the active connection.""" 554 # socket may still be getting spun up, so wait for it to be writable 555 sock = self.socket 556 protocol = self.protocol 557 # race condition on close, better fix is to pass these in, 558 # functional style, but right now trying to cause minimal 559 # perturbance to codebase. 560 if sock is None or protocol is None: 561 return 562 fileno = sock.fileno() 563 ready = None 564 while not ready: 565 _, ready, _ = select.select([], [fileno], []) 566 logger.debug("[%s]: writing header", self.name) 567 sock.setblocking(1) 568 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
569
570 - def read_header(self):
571 """ 572 Read TCPROS header from active socket 573 @raise TransportInitError if header fails to validate 574 """ 575 sock = self.socket 576 if sock is None: 577 return 578 sock.setblocking(1) 579 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
580
581 - def send_message(self, msg, seq):
582 """ 583 Convenience routine for services to send a message across a 584 particular connection. NOTE: write_data is much more efficient 585 if same message is being sent to multiple connections. Not 586 threadsafe. 587 @param msg: message to send 588 @type msg: Msg 589 @param seq: sequence number for message 590 @type seq: int 591 @raise TransportException: if error occurred sending message 592 """ 593 # this will call write_data(), so no need to keep track of stats 594 serialize_message(self.write_buff, seq, msg) 595 self.write_data(self.write_buff.getvalue()) 596 self.write_buff.truncate(0)
597
598 - def write_data(self, data):
599 """ 600 Write raw data to transport 601 @raise TransportInitialiationError: could not be initialized 602 @raise TransportTerminated: no longer open for publishing 603 """ 604 if not self.socket: 605 raise TransportInitError("TCPROS transport was not successfully initialized") 606 if self.done: 607 raise TransportTerminated("connection closed") 608 try: 609 #TODO: get rid of sendalls and replace with async-style publishing 610 self.socket.sendall(data) 611 self.stat_bytes += len(data) 612 self.stat_num_msg += 1 613 except IOError as ioe: 614 #for now, just document common errno's in code 615 (errno, msg) = ioe.args 616 if errno == 32: #broken pipe 617 logdebug("ERROR: Broken Pipe") 618 self.close() 619 raise TransportTerminated(str(errno)+msg) 620 raise #re-raise 621 except socket.error as se: 622 #for now, just document common errno's in code 623 (errno, msg) = se.args 624 if errno == 32: #broken pipe 625 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 626 self.close() 627 raise TransportTerminated(msg) 628 elif errno == 104: #connection reset by peer 629 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id) 630 self.close() 631 raise TransportTerminated(msg) 632 else: 633 rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 634 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg) 635 self.close() 636 raise TransportTerminated(str(errno)+' '+msg) 637 return True
638
639 - def receive_once(self):
640 """ 641 block until messages are read off of socket 642 @return: list of newly received messages 643 @rtype: [Msg] 644 @raise TransportException: if unable to receive message due to error 645 """ 646 sock = self.socket 647 if sock is None: 648 raise TransportException("connection not initialized") 649 b = self.read_buff 650 msg_queue = [] 651 p = self.protocol 652 try: 653 sock.setblocking(1) 654 while not msg_queue and not self.done and not is_shutdown(): 655 if b.tell() >= 4: 656 p.read_messages(b, msg_queue, sock) 657 if not msg_queue: 658 recv_buff(sock, b, p.buff_size) 659 self.stat_num_msg += len(msg_queue) #STATS 660 # set the _connection_header field 661 for m in msg_queue: 662 m._connection_header = self.header 663 664 # #1852: keep track of last latched message 665 if self.is_latched and msg_queue: 666 self.latch = msg_queue[-1] 667 668 return msg_queue 669 670 except DeserializationError as e: 671 rospyerr(traceback.format_exc()) 672 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 673 except TransportTerminated as e: 674 raise #reraise 675 except ServiceException as e: 676 raise 677 except Exception as e: 678 rospyerr(traceback.format_exc()) 679 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 680 return retval
681
682 - def _reconnect(self):
683 # This reconnection logic is very hacky right now. I need to 684 # rewrite the I/O core so that this is handled more centrally. 685 686 if self.dest_address is None: 687 raise ROSInitException("internal error with reconnection state: address not stored") 688 689 interval = 0.5 # seconds 690 while self.socket is None and not self.done and not rospy.is_shutdown(): 691 try: 692 # set a timeout so that we can continue polling for 693 # exit. 30. is a bit high, but I'm concerned about 694 # embedded platforms. To do this properly, we'd have 695 # to move to non-blocking routines. 696 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 697 except TransportInitError: 698 self.socket = None 699 700 if self.socket is None: 701 # exponential backoff 702 interval = interval * 2 703 704 time.sleep(interval)
705
706 - def receive_loop(self, msgs_callback):
707 """ 708 Receive messages until shutdown 709 @param msgs_callback: callback to invoke for new messages received 710 @type msgs_callback: fn([msg]) 711 """ 712 # - use assert here as this would be an internal error, aka bug 713 logger.debug("receive_loop for [%s]", self.name) 714 try: 715 while not self.done and not is_shutdown(): 716 try: 717 if self.socket is not None: 718 msgs = self.receive_once() 719 if not self.done and not is_shutdown(): 720 msgs_callback(msgs) 721 else: 722 self._reconnect() 723 724 except TransportException as e: 725 # set socket to None so we reconnect 726 try: 727 if self.socket is not None: 728 try: 729 self.socket.shutdown() 730 except: 731 pass 732 finally: 733 self.socket.close() 734 except: 735 pass 736 self.socket = None 737 738 except DeserializationError as e: 739 #TODO: how should we handle reconnect in this case? 740 741 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e)) 742 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc()) 743 except: 744 # in many cases this will be a normal hangup, but log internally 745 try: 746 #1467 sometimes we get exceptions due to 747 #interpreter shutdown, so blanket ignore those if 748 #the reporting fails 749 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) 750 except: pass 751 752 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) 753 finally: 754 if not self.done: 755 self.close()
756
757 - def close(self):
758 """close i/o and release resources""" 759 if not self.done: 760 try: 761 if self.socket is not None: 762 try: 763 self.socket.shutdown(socket.SHUT_RDWR) 764 except: 765 pass 766 finally: 767 self.socket.close() 768 finally: 769 self.socket = self.read_buff = self.write_buff = self.protocol = None 770 super(TCPROSTransport, self).close()
771