Package rospy :: Package impl :: Module tcpros_base

Source Code for Module rospy.impl.tcpros_base

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: tcpros_base.py 13839 2011-06-01 01:22:34Z kwc $ 
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37  from __future__ import with_statement 
 38   
 39  import cStringIO 
 40  import select 
 41  import socket 
 42  import logging 
 43  import thread 
 44  import threading 
 45  import time 
 46  import traceback 
 47   
 48  import roslib.rosenv 
 49  from roslib.message import DeserializationError, Message 
 50  from roslib.network import read_ros_handshake_header, write_ros_handshake_header 
 51   
 52  # TODO: remove * import from core 
 53  from rospy.core import * 
 54  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 55  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 56  from rospy.msg import deserialize_messages, serialize_message 
 57  from rospy.service import ServiceException 
 58   
 59  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 60   
 61  logger = logging.getLogger('rospy.tcpros') 
 62   
 63  # Receive buffer size for topics/services (in bytes) 
 64  DEFAULT_BUFF_SIZE = 65536 
 65   
 66  ## name of our customized TCP protocol for accepting flows over server socket 
 67  TCPROS = "TCPROS"  
 68   
 69  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 70  _use_tcp_keepalive = None 
 71  _use_tcp_keepalive_lock = threading.Lock() 
72 -def _is_use_tcp_keepalive():
73 global _use_tcp_keepalive 74 if _use_tcp_keepalive is not None: 75 return _use_tcp_keepalive 76 with _use_tcp_keepalive_lock: 77 if _use_tcp_keepalive is not None: 78 return _use_tcp_keepalive 79 # in order to prevent circular dependencies, this does not use the 80 # builtin libraries for interacting with the parameter server 81 m = rospy.core.xmlrpcapi(roslib.rosenv.get_master_uri()) 82 code, msg, val = m.getParam(rospy.names.get_caller_id(), _PARAM_TCP_KEEPALIVE) 83 _use_tcp_keepalive = val if code == 1 else True 84 return _use_tcp_keepalive
85
86 -def recv_buff(sock, b, buff_size):
87 """ 88 Read data from socket into buffer. 89 @param sock: socket to read from 90 @type sock: socket.socket 91 @param b: buffer to receive into 92 @type b: StringIO 93 @param buff_size: recv read size 94 @type buff_size: int 95 @return: number of bytes read 96 @rtype: int 97 """ 98 d = sock.recv(buff_size) 99 if d: 100 b.write(d) 101 return len(d) 102 else: #bomb out 103 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
104
105 -class TCPServer(object):
106 """ 107 Simple server that accepts inbound TCP/IP connections and hands 108 them off to a handler function. TCPServer obeys the 109 ROS_IP/ROS_HOSTNAME environment variables 110 """ 111
112 - def __init__(self, inbound_handler, port=0):
113 """ 114 Setup a server socket listening on the specified port. If the 115 port is omitted, will choose any open port. 116 @param inbound_handler: handler to invoke with 117 new connection 118 @type inbound_handler: fn(sock, addr) 119 @param port: port to bind to, omit/0 to bind to any 120 @type port: int 121 """ 122 self.port = port #will get overwritten if port=0 123 self.addr = None #set at socket bind 124 self.is_shutdown = False 125 self.inbound_handler = inbound_handler 126 try: 127 self.server_sock = self._create_server_sock() 128 except: 129 self.server_sock = None 130 raise
131
132 - def start(self):
133 """Runs the run() loop in a separate thread""" 134 thread.start_new_thread(self.run, ())
135
136 - def run(self):
137 """ 138 Main TCP receive loop. Should be run in a separate thread -- use start() 139 to do this automatically. 140 """ 141 self.is_shutdown = False 142 if not self.server_sock: 143 raise ROSInternalException("%s did not connect"%self.__class__.__name__) 144 while not self.is_shutdown: 145 try: 146 (client_sock, client_addr) = self.server_sock.accept() 147 except socket.timeout: 148 continue 149 except IOError, (errno, msg): 150 if errno == 4: #interrupted system call 151 continue 152 raise 153 if self.is_shutdown: 154 break 155 try: 156 #leave threading decisions up to inbound_handler 157 self.inbound_handler(client_sock, client_addr) 158 except socket.error, e: 159 if not self.is_shutdown: 160 traceback.print_exc() 161 logwarn("Failed to handle inbound connection due to socket error: %s"%e) 162 logdebug("TCPServer[%s] shutting down", self.port)
163 164
165 - def get_full_addr(self):
166 """ 167 @return: (ip address, port) of server socket binding 168 @rtype: (str, int) 169 """ 170 # return roslib.network.get_host_name() instead of address so that it 171 # obeys ROS_IP/ROS_HOSTNAME behavior 172 return (roslib.network.get_host_name(), self.port)
173
174 - def _create_server_sock(self):
175 """ 176 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 177 binding to loopback interface. 178 """ 179 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 180 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 181 server_sock.bind((roslib.network.get_bind_address(), self.port)) 182 (self.addr, self.port) = server_sock.getsockname() 183 server_sock.listen(5) 184 return server_sock
185
186 - def shutdown(self):
187 """shutdown I/O resources uses by this server""" 188 if not self.is_shutdown: 189 self.is_shutdown = True 190 self.server_sock.close()
191 192 # base maintains a tcpros_server singleton that is shared between 193 # services and topics for inbound connections. This global is set in 194 # the tcprosserver constructor. Constructor is called by init_tcpros() 195 _tcpros_server = None 196
197 -def init_tcpros_server():
198 """starts the TCPROS server socket for inbound connections""" 199 global _tcpros_server 200 if _tcpros_server is None: 201 _tcpros_server = TCPROSServer() 202 rospy.core.add_shutdown_hook(_tcpros_server.shutdown) 203 return _tcpros_server
204
205 -def start_tcpros_server():
206 """ 207 start the TCPROS server if it has not started already 208 """ 209 if _tcpros_server is None: 210 init_tcpros_server() 211 return _tcpros_server.start_server()
212 213 # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers 214
215 -def get_tcpros_server_address():
216 """ 217 get the address of the tcpros server. 218 @raise Exception: if tcpros server has not been started or created 219 """ 220 return _tcpros_server.get_address()
221
222 -def _error_connection_handler(sock, client_addr, header):
223 """ 224 utility handler that does nothing more than provide a rejection header 225 @param sock: socket connection 226 @type sock: socket.socket 227 @param client_addr: client address 228 @type client_addr: str 229 @param header: request header 230 @type header: dict 231 """ 232 return {'error': "unhandled connection"}
233
234 -class TCPROSServer(object):
235 """ 236 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 237 connections as well as ROS service connections over TCP. TCP server 238 socket is run once start_server() is called -- this is implicitly 239 called during init_publisher(). 240 """ 241
242 - def __init__(self):
243 """ctor.""" 244 self.tcp_ros_server = None #: server for receiving tcp conn 245 self.lock = threading.Lock() 246 # should be set to fn(sock, client_addr, header) for topic connections 247 self.topic_connection_handler = _error_connection_handler 248 # should be set to fn(sock, client_addr, header) for service connections 249 self.service_connection_handler = _error_connection_handler
250
251 - def start_server(self, port=0):
252 """ 253 Starts the TCP socket server if one is not already running 254 @param port: port number to bind to (default 0/any) 255 @type port: int 256 """ 257 if self.tcp_ros_server: 258 return 259 with self.lock: 260 try: 261 if not self.tcp_ros_server: 262 self.tcp_ros_server = TCPServer(self._tcp_server_callback, port) 263 self.tcp_ros_server.start() 264 except Exception, e: 265 self.tcp_ros_server = None 266 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 267 return 0, "unable to establish TCPROS server: %s"%e, []
268
269 - def get_address(self):
270 """ 271 @return: address and port of TCP server socket for accepting 272 inbound connections 273 @rtype: str, int 274 """ 275 if self.tcp_ros_server is not None: 276 return self.tcp_ros_server.get_full_addr() 277 return None, None
278
279 - def shutdown(self, reason=''):
280 """stops the TCP/IP server responsible for receiving inbound connections""" 281 if self.tcp_ros_server: 282 self.tcp_ros_server.shutdown()
283
284 - def _tcp_server_callback(self, sock, client_addr):
285 """ 286 TCPServer callback: detects incoming topic or service connection and passes connection accordingly 287 288 @param sock: socket connection 289 @type sock: socket.socket 290 @param client_addr: client address 291 @type client_addr: (str, int) 292 @raise TransportInitError: If transport cannot be succesfully initialized 293 """ 294 #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, 295 #and then use that to do the writing 296 try: 297 buff_size = 4096 # size of read buffer 298 header = read_ros_handshake_header(sock, cStringIO.StringIO(), buff_size) 299 if 'topic' in header: 300 err_msg = self.topic_connection_handler(sock, client_addr, header) 301 elif 'service' in header: 302 err_msg = self.service_connection_handler(sock, client_addr, header) 303 else: 304 err_msg = 'no topic or service name detected' 305 if err_msg: 306 # shutdown race condition: nodes that come up and down 307 # quickly can receive connections during teardown. 308 309 # We use is_shutdown_requested() because we can get 310 # into bad connection states during client shutdown 311 # hooks. 312 if not rospy.core.is_shutdown_requested(): 313 write_ros_handshake_header(sock, {'error' : err_msg}) 314 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 315 else: 316 write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 317 return 318 except rospy.exceptions.TransportInitError as e: 319 logwarn(str(e)) 320 if sock is not None: 321 sock.close() 322 except Exception as e: 323 # collect stack trace separately in local log file 324 if not rospy.core.is_shutdown_requested(): 325 logwarn("Inbound TCP/IP connection failed: %s", e) 326 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc(e)) 327 if sock is not None: 328 sock.close()
329
330 -class TCPROSTransportProtocol(object):
331 """ 332 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 333 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 334 well as which fields to send when creating a new connection (get_header_fields()). 335 """ 336
337 - def __init__(self, resolved_name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
338 """ 339 ctor 340 @param resolved_name: resolved service or topic name 341 @type resolved_name: str 342 @param recv_data_class: message class for deserializing inbound messages 343 @type recv_data_class: Class 344 @param queue_size: maximum number of inbound messages to maintain 345 @type queue_size: int 346 @param buff_size: receive buffer size (in bytes) for reading from the connection. 347 @type buff_size: int 348 """ 349 if recv_data_class and not issubclass(recv_data_class, Message): 350 raise TransportInitError("Unable to initialize transport: data class is not a message data class") 351 self.resolved_name = resolved_name 352 self.recv_data_class = recv_data_class 353 self.queue_size = queue_size 354 self.buff_size = buff_size 355 self.direction = BIDIRECTIONAL
356 357
358 - def read_messages(self, b, msg_queue, sock):
359 """ 360 @param b StringIO: read buffer 361 @param msg_queue [Message]: queue of deserialized messages 362 @type msg_queue: [Message] 363 @param sock socket: protocol can optionally read more data from 364 the socket, but in most cases the required data will already be 365 in b 366 """ 367 # default implementation 368 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
369
370 - def get_header_fields(self):
371 """ 372 Header fields that should be sent over the connection. The header fields 373 are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 374 @return: {str : str}: header fields to send over connection 375 @rtype: dict 376 """ 377 return {}
378 379 # TODO: this still isn't as clean and seamless as I want it to 380 # be. This code came from the merger of publisher, subscriber, and 381 # service code into a common TCPROS transport class. The transport is 382 # customized by a 'protocol' class, which is how the different 383 # pub/sub/service behaviors are achieved. More behavior needs to be 384 # transferred from the transport class into the protocol class, 385 # e.g. deserialization as the state each maintains is somewhat 386 # duplicative. I would also come up with a better name than 387 # protocol. 388
389 -class TCPROSTransport(Transport):
390 """ 391 Generic implementation of TCPROS exchange routines for both topics and services 392 """ 393 transport_type = 'TCPROS' 394
395 - def __init__(self, protocol, name, header=None):
396 """ 397 ctor 398 @param name str: identifier 399 @param protocol TCPROSTransportProtocol protocol implementation 400 @param header dict: (optional) handshake header if transport handshake header was 401 already read off of transport. 402 @raise TransportInitError if transport cannot be initialized according to arguments 403 """ 404 super(TCPROSTransport, self).__init__(protocol.direction, name=name) 405 if not name: 406 raise TransportInitError("Unable to initialize transport: name is not set") 407 408 self.protocol = protocol 409 410 self.socket = None 411 self.endpoint_id = 'unknown' 412 self.dest_address = None # for reconnection 413 self.read_buff = cStringIO.StringIO() 414 self.write_buff = cStringIO.StringIO() 415 416 self.header = header 417 418 # #1852 have to hold onto latched messages on subscriber side 419 self.is_latched = False 420 self.latch = None 421 422 # these fields are actually set by the remote 423 # publisher/service. they are set for tools that connect 424 # without knowing the actual field name 425 self.md5sum = None 426 self.type = None
427
428 - def set_socket(self, sock, endpoint_id):
429 """ 430 Set the socket for this transport 431 @param sock: socket 432 @type sock: socket.socket 433 @param endpoint_id: identifier for connection endpoint 434 @type endpoint_id: str 435 @raise TransportInitError: if socket has already been set 436 """ 437 if self.socket is not None: 438 raise TransportInitError("socket already initialized") 439 self.socket = sock 440 self.endpoint_id = endpoint_id
441
442 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
443 """ 444 Establish TCP connection to the specified 445 address/port. connect() always calls L{write_header()} and 446 L{read_header()} after the connection is made 447 @param dest_addr: destination IP address 448 @type dest_addr: str 449 @param dest_port: destination port 450 @type dest_port: int 451 @param endpoint_id: string identifier for connection (for statistics) 452 @type endpoint_id: str 453 @param timeout: (optional keyword) timeout in seconds 454 @type timeout: float 455 @raise TransportInitError: if unable to create connection 456 """ 457 try: 458 self.endpoint_id = endpoint_id 459 self.dest_address = (dest_addr, dest_port) 460 461 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 462 if _is_use_tcp_keepalive(): 463 # OSX (among others) does not define these options 464 if hasattr(socket, 'TCP_KEEPCNT') and \ 465 hasattr(socket, 'TCP_KEEPIDLE') and \ 466 hasattr(socket, 'TCP_KEEPINTVL'): 467 # turn on KEEPALIVE 468 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 469 # - # keepalive failures before actual connection failure 470 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 471 # - timeout before starting KEEPALIVE process 472 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 473 # - interval to send KEEPALIVE after IDLE timeout 474 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 475 if timeout is not None: 476 s.settimeout(timeout) 477 self.socket = s 478 self.socket.connect((dest_addr, dest_port)) 479 self.write_header() 480 self.read_header() 481 482 except TransportInitError, tie: 483 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 484 raise 485 except Exception, e: 486 # FATAL: no reconnection as error is unknown 487 self.done = True 488 if self.socket: 489 self.socket.close() 490 self.socket = None 491 492 #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e))) 493 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 494 raise TransportInitError(str(e)) #re-raise i/o error
495
496 - def _validate_header(self, header):
497 """ 498 Validate header and initialize fields accordingly 499 @param header: header fields from publisher 500 @type header: dict 501 @raise TransportInitError: if header fails to validate 502 """ 503 self.header = header 504 if 'error' in header: 505 raise TransportInitError("remote error reported: %s"%header['error']) 506 for required in ['md5sum', 'type']: 507 if not required in header: 508 raise TransportInitError("header missing required field [%s]"%required) 509 self.md5sum = header['md5sum'] 510 self.type = header['type'] 511 if header.get('latching', '0') == '1': 512 self.is_latched = True
513
514 - def write_header(self):
515 """Writes the TCPROS header to the active connection.""" 516 sock = self.socket 517 # socket may still be getting spun up, so wait for it to be writable 518 fileno = sock.fileno() 519 ready = None 520 while not ready: 521 _, ready, _ = select.select([], [fileno], []) 522 logger.debug("[%s]: writing header", self.name) 523 sock.setblocking(1) 524 self.stat_bytes += write_ros_handshake_header(sock, self.protocol.get_header_fields())
525
526 - def read_header(self):
527 """ 528 Read TCPROS header from active socket 529 @raise TransportInitError if header fails to validate 530 """ 531 self.socket.setblocking(1) 532 self._validate_header(read_ros_handshake_header(self.socket, self.read_buff, self.protocol.buff_size))
533
534 - def send_message(self, msg, seq):
535 """ 536 Convenience routine for services to send a message across a 537 particular connection. NOTE: write_data is much more efficient 538 if same message is being sent to multiple connections. Not 539 threadsafe. 540 @param msg: message to send 541 @type msg: Msg 542 @param seq: sequence number for message 543 @type seq: int 544 @raise TransportException: if error occurred sending message 545 """ 546 # this will call write_data(), so no need to keep track of stats 547 serialize_message(self.write_buff, seq, msg) 548 self.write_data(self.write_buff.getvalue()) 549 self.write_buff.truncate(0)
550
551 - def write_data(self, data):
552 """ 553 Write raw data to transport 554 @raise TransportInitialiationError: could not be initialized 555 @raise TransportTerminated: no longer open for publishing 556 """ 557 if not self.socket: 558 raise TransportInitError("TCPROS transport was not successfully initialized") 559 if self.done: 560 raise TransportTerminated("connection closed") 561 try: 562 #TODO: get rid of sendalls and replace with async-style publishing 563 self.socket.sendall(data) 564 self.stat_bytes += len(data) 565 self.stat_num_msg += 1 566 except IOError, (errno, msg): 567 #for now, just document common errno's in code 568 if errno == 32: #broken pipe 569 logdebug("ERROR: Broken Pipe") 570 self.close() 571 raise TransportTerminated(str(errno)+msg) 572 raise #re-raise 573 except socket.error, (errno, msg): 574 #for now, just document common errno's in code 575 if errno == 32: #broken pipe 576 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 577 self.close() 578 raise TransportTerminated(msg) 579 elif errno == 104: #connection reset by peer 580 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id) 581 self.close() 582 raise TransportTerminated(msg) 583 else: 584 rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 585 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg) 586 self.close() 587 raise TransportTerminated(str(errno)+' '+msg) 588 except: 589 #TODO: try to figure out common errors here 590 raise 591 return True
592
593 - def receive_once(self):
594 """ 595 block until messages are read off of socket 596 @return: list of newly received messages 597 @rtype: [Msg] 598 @raise TransportException: if unable to receive message due to error 599 """ 600 sock = self.socket 601 if sock is None: 602 raise TransportException("connection not initialized") 603 b = self.read_buff 604 msg_queue = [] 605 p = self.protocol 606 try: 607 sock.setblocking(1) 608 while not msg_queue and not self.done and not is_shutdown(): 609 if b.tell() >= 4: 610 p.read_messages(b, msg_queue, sock) 611 if not msg_queue: 612 recv_buff(sock, b, p.buff_size) 613 self.stat_num_msg += len(msg_queue) #STATS 614 # set the _connection_header field 615 for m in msg_queue: 616 m._connection_header = self.header 617 618 # #1852: keep track of last latched message 619 if self.is_latched and msg_queue: 620 self.latch = msg_queue[-1] 621 622 return msg_queue 623 624 except DeserializationError, e: 625 rospyerr(traceback.format_exc()) 626 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 627 except TransportTerminated, e: 628 raise #reraise 629 except ServiceException, e: 630 raise 631 except Exception, e: 632 rospyerr(traceback.format_exc()) 633 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 634 return retval
635
636 - def _reconnect(self):
637 # This reconnection logic is very hacky right now. I need to 638 # rewrite the I/O core so that this is handled more centrally. 639 640 if self.dest_address is None: 641 raise ROSInitException("internal error with reconnection state: address not stored") 642 643 interval = 0.5 # seconds 644 while self.socket is None and not self.done and not rospy.is_shutdown(): 645 try: 646 # set a timeout so that we can continue polling for 647 # exit. 30. is a bit high, but I'm concerned about 648 # embedded platforms. To do this properly, we'd have 649 # to move to non-blocking routines. 650 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 651 except TransportInitError: 652 self.socket = None 653 654 if self.socket is None: 655 # exponential backoff 656 interval = interval * 2 657 658 time.sleep(interval)
659
660 - def receive_loop(self, msgs_callback):
661 """ 662 Receive messages until shutdown 663 @param msgs_callback: callback to invoke for new messages received 664 @type msgs_callback: fn([msg]) 665 """ 666 # - use assert here as this would be an internal error, aka bug 667 logger.debug("receive_loop for [%s]", self.name) 668 try: 669 while not self.done and not is_shutdown(): 670 try: 671 if self.socket is not None: 672 msgs = self.receive_once() 673 if not self.done and not is_shutdown(): 674 msgs_callback(msgs) 675 else: 676 self._reconnect() 677 678 except TransportException, e: 679 # set socket to None so we reconnect 680 try: 681 if self.socket is not None: 682 self.socket.close() 683 except: 684 pass 685 self.socket = None 686 687 except DeserializationError, e: 688 #TODO: how should we handle reconnect in this case? 689 690 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e)) 691 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc()) 692 693 except: 694 # in many cases this will be a normal hangup, but log internally 695 try: 696 #1467 sometimes we get exceptions due to 697 #interpreter shutdown, so blanket ignore those if 698 #the reporting fails 699 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) 700 except: pass 701 702 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) 703 finally: 704 if not self.done: 705 self.close()
706
707 - def close(self):
708 """close i/o and release resources""" 709 self.done = True 710 try: 711 if self.socket is not None: 712 self.socket.close() 713 finally: 714 self.socket = self.read_buff = self.write_buff = self.protocol = None 715 super(TCPROSTransport, self).close()
716